[
https://issues.apache.org/jira/browse/ARROW-2307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399687#comment-16399687
]
ASF GitHub Bot commented on ARROW-2307:
---------------------------------------
wesm closed pull request #1747: ARROW-2307: [Python] Allow reading record batch
streams with zero record batches
URL: https://github.com/apache/arrow/pull/1747
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ci/msvc-build.bat b/ci/msvc-build.bat
index beefee6c0..a29ef0bad 100644
--- a/ci/msvc-build.bat
+++ b/ci/msvc-build.bat
@@ -69,7 +69,8 @@ if "%JOB%" == "Build_Debug" (
)
conda create -n arrow -q -y python=%PYTHON% ^
- six pytest setuptools numpy pandas cython ^
+ six pytest setuptools numpy pandas ^
+ cython=0.27.3 ^
thrift-cpp=0.11.0
call activate arrow
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index a776c4263..247d10278 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -41,7 +41,7 @@ conda install -y -q pip \
cloudpickle \
numpy=1.13.1 \
pandas \
- cython
+ cython=0.27.3
# ARROW-2093: PyTorch increases the size of our conda dependency stack
# significantly, and so we have disabled these tests in Travis CI for now
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 24c8d5e15..b1cf6e59a 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -374,6 +374,17 @@ TEST_F(TestTable, FromRecordBatches) {
ASSERT_RAISES(Invalid, Table::FromRecordBatches({batch1, batch2}, &result));
}
+TEST_F(TestTable, FromRecordBatchesZeroLength) {
+ // ARROW-2307
+ MakeExample1(10);
+
+ std::shared_ptr<Table> result;
+ ASSERT_OK(Table::FromRecordBatches(schema_, {}, &result));
+
+ ASSERT_EQ(0, result->num_rows());
+ ASSERT_TRUE(result->schema()->Equals(*schema_));
+}
+
TEST_F(TestTable, ConcatenateTables) {
const int64_t length = 10;
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index ed5858624..f6ac6dd3b 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -297,14 +297,9 @@ std::shared_ptr<Table> Table::Make(const
std::shared_ptr<Schema>& schema,
return std::make_shared<SimpleTable>(schema, arrays, num_rows);
}
-Status Table::FromRecordBatches(const
std::vector<std::shared_ptr<RecordBatch>>& batches,
+Status Table::FromRecordBatches(const std::shared_ptr<Schema>& schema,
+ const
std::vector<std::shared_ptr<RecordBatch>>& batches,
std::shared_ptr<Table>* table) {
- if (batches.size() == 0) {
- return Status::Invalid("Must pass at least one record batch");
- }
-
- std::shared_ptr<Schema> schema = batches[0]->schema();
-
const int nbatches = static_cast<int>(batches.size());
const int ncolumns = static_cast<int>(schema->num_fields());
@@ -332,6 +327,15 @@ Status Table::FromRecordBatches(const
std::vector<std::shared_ptr<RecordBatch>>&
return Status::OK();
}
+Status Table::FromRecordBatches(const
std::vector<std::shared_ptr<RecordBatch>>& batches,
+ std::shared_ptr<Table>* table) {
+ if (batches.size() == 0) {
+ return Status::Invalid("Must pass at least one record batch");
+ }
+
+ return FromRecordBatches(batches[0]->schema(), batches, table);
+}
+
Status ConcatenateTables(const std::vector<std::shared_ptr<Table>>& tables,
std::shared_ptr<Table>* table) {
if (tables.size() == 0) {
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 7274fca4d..20d027d6a 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -169,9 +169,25 @@ class ARROW_EXPORT Table {
const
std::vector<std::shared_ptr<Array>>& arrays,
int64_t num_rows = -1);
- // Construct table from RecordBatch, but only if all of the batch schemas are
- // equal. Returns Status::Invalid if there is some problem
+ /// \brief Construct table from RecordBatches, using schema supplied by the
first
+ /// RecordBatch.
+ ///
+ /// \param[in] batches a std::vector of record batches
+ /// \param[out] table the returned table
+ /// \return Status Returns Status::Invalid if there is some problem
+ static Status FromRecordBatches(
+ const std::vector<std::shared_ptr<RecordBatch>>& batches,
+ std::shared_ptr<Table>* table);
+
+ /// Construct table from RecordBatches, using supplied schema. There may be
+ /// zero record batches
+ ///
+ /// \param[in] schema the arrow::Schema for each batch
+ /// \param[in] batches a std::vector of record batches
+ /// \param[out] table the returned table
+ /// \return Status
static Status FromRecordBatches(
+ const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<RecordBatch>>& batches,
std::shared_ptr<Table>* table);
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 3d0c02b89..01a641896 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -456,6 +456,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
@staticmethod
CStatus FromRecordBatches(
+ const shared_ptr[CSchema]& schema,
const vector[shared_ptr[CRecordBatch]]& batches,
shared_ptr[CTable]* table)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index a30a228ae..f2ba6b692 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -304,7 +304,8 @@ cdef class _RecordBatchReader:
break
batches.push_back(batch)
- check_status(CTable.FromRecordBatches(batches, &table))
+ check_status(CTable.FromRecordBatches(self.schema.sp_schema,
+ batches, &table))
return pyarrow_wrap_table(table)
@@ -386,7 +387,8 @@ cdef class _RecordBatchFileReader:
with nogil:
for i in range(nbatches):
check_status(self.reader.get().ReadRecordBatch(i, &batches[i]))
- check_status(CTable.FromRecordBatches(batches, &table))
+ check_status(CTable.FromRecordBatches(self.schema.sp_schema,
+ batches, &table))
return pyarrow_wrap_table(table)
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 94041e465..6cfa9873b 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -975,7 +975,7 @@ cdef class Table:
columns.reserve(K)
for i in range(K):
- if isinstance(arrays[i], Array):
+ if isinstance(arrays[i], (Array, list)):
columns.push_back(
make_shared[CColumn](
c_schema.get().field(i),
@@ -1003,26 +1003,41 @@ cdef class Table:
return pyarrow_wrap_table(CTable.Make(c_schema, columns))
@staticmethod
- def from_batches(batches):
+ def from_batches(batches, Schema schema=None):
"""
Construct a Table from a list of Arrow RecordBatches
Parameters
----------
-
batches: list of RecordBatch
- RecordBatch list to be converted, schemas must be equal
+ RecordBatch list to be converted, all schemas must be equal
+ schema : Schema, default None
+ If not passed, will be inferred from the first RecordBatch
+
+ Returns
+ -------
+ table : Table
"""
cdef:
vector[shared_ptr[CRecordBatch]] c_batches
shared_ptr[CTable] c_table
+ shared_ptr[CSchema] c_schema
RecordBatch batch
for batch in batches:
c_batches.push_back(batch.sp_batch)
+ if schema is None:
+ if len(batches) == 0:
+ raise ValueError('Must pass schema, or at least '
+ 'one RecordBatch')
+ c_schema = c_batches[0].get().schema()
+ else:
+ c_schema = schema.sp_schema
+
with nogil:
- check_status(CTable.FromRecordBatches(c_batches, &c_table))
+ check_status(CTable.FromRecordBatches(c_schema, c_batches,
+ &c_table))
return pyarrow_wrap_table(c_table)
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 9cd5f8076..c31c32251 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -379,6 +379,24 @@ def test_ipc_zero_copy_numpy():
assert_frame_equal(df, rdf)
+def test_ipc_stream_no_batches():
+ # ARROW-2307
+ table = pa.Table.from_arrays([pa.array([1, 2, 3, 4]),
+ pa.array(['foo', 'bar', 'baz', 'qux'])],
+ names=['a', 'b'])
+
+ sink = pa.BufferOutputStream()
+ writer = pa.RecordBatchStreamWriter(sink, table.schema)
+ writer.close()
+
+ source = sink.get_result()
+ reader = pa.open_stream(source)
+ result = reader.read_all()
+
+ assert result.schema.equals(table.schema)
+ assert len(result) == 0
+
+
def test_get_record_batch_size():
N = 10
itemsize = 8
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [Python] Unable to read arrow stream containing 0 record batches
> ----------------------------------------------------------------
>
> Key: ARROW-2307
> URL: https://issues.apache.org/jira/browse/ARROW-2307
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Affects Versions: 0.8.0
> Reporter: Benjamin Duffield
> Assignee: Wes McKinney
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.9.0
>
>
> Using java arrow I'm creating an arrow stream, using the stream writer.
>
> Sometimes I don't have anything to serialize, and so I don't write any record
> batches. My arrow stream thus consists of just a schema message.
> {code:java}
> <SCHEMA>
> <EOS [optional]: int32>
> {code}
> I am able to deserialize this arrow stream correctly using the java stream
> reader, but when reading it with python I instead hit an error
> {code}
> import pyarrow as pa
> # ...
> reader = pa.open_stream(stream)
> df = reader.read_all().to_pandas()
> {code}
> produces
> {code}
> File "ipc.pxi", line 307, in pyarrow.lib._RecordBatchReader.read_all
> File "error.pxi", line 77, in pyarrow.lib.check_status
> ArrowInvalid: Must pass at least one record batch
> {code}
> i.e. we're hitting the check in
> https://github.com/apache/arrow/blob/apache-arrow-0.8.0/cpp/src/arrow/table.cc#L284
> The workaround we're currently using is to always ensure we serialize at
> least one record batch, even if it's empty. However, I think it would be nice
> to either support a stream without record batches or explicitly disallow this
> and then match behaviour in java.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)