This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 97879eb ARROW-9761: [C/C++] Add experimental C stream inferface
97879eb is described below
commit 97879eb970bac52d93d2247200b9ca7acf6f3f93
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Oct 1 12:00:12 2020 +0200
ARROW-9761: [C/C++] Add experimental C stream inferface
The goal is to have a standardized ABI to communicate streams of
homogeneous arrays or record batches (for example for database result sets).
The trickiest part is error reporting. This proposal tries to strike a
compromise between simplicity (an integer error code mapping to errno values)
and expressivity (an optional description string for application-specific and
context-specific details).
Closes #8052 from pitrou/ARROW-9761-c-array-stream
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/c/abi.h | 38 +++++
cpp/src/arrow/c/bridge.cc | 194 ++++++++++++++++++++++
cpp/src/arrow/c/bridge.h | 34 ++++
cpp/src/arrow/c/bridge_test.cc | 252 ++++++++++++++++++++++++++++-
cpp/src/arrow/c/helpers.h | 30 ++++
cpp/src/arrow/c/util_internal.h | 7 +
cpp/src/arrow/python/CMakeLists.txt | 1 +
cpp/src/arrow/python/extension_type.h | 1 +
cpp/src/arrow/python/ipc.cc | 67 ++++++++
cpp/src/arrow/python/ipc.h | 52 ++++++
docs/source/cpp/api.rst | 1 +
docs/source/cpp/{api.rst => api/c_abi.rst} | 53 +++---
docs/source/format/CDataInterface.rst | 2 +
docs/source/format/CStreamInterface.rst | 218 +++++++++++++++++++++++++
docs/source/index.rst | 1 +
python/pyarrow/_csv.pyx | 4 +-
python/pyarrow/_flight.pyx | 15 +-
python/pyarrow/cffi.py | 12 ++
python/pyarrow/includes/libarrow.pxd | 18 ++-
python/pyarrow/ipc.pxi | 121 ++++++++++++--
python/pyarrow/ipc.py | 26 +--
python/pyarrow/lib.pxd | 2 +-
python/pyarrow/tests/test_cffi.py | 110 +++++++++++--
python/pyarrow/tests/test_ipc.py | 35 ++++
24 files changed, 1212 insertions(+), 82 deletions(-)
diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h
index 821bc96..a78170d 100644
--- a/cpp/src/arrow/c/abi.h
+++ b/cpp/src/arrow/c/abi.h
@@ -60,6 +60,44 @@ struct ArrowArray {
void* private_data;
};
+// EXPERIMENTAL: C stream interface
+
+struct ArrowArrayStream {
+ // Callback to get the stream type
+ // (will be the same for all arrays in the stream).
+ //
+ // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+ //
+ // If successful, the ArrowSchema must be released independently from the
stream.
+ int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+
+ // Callback to get the next array
+ // (if no error and the array is released, the stream has ended)
+ //
+ // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+ //
+ // If successful, the ArrowArray must be released independently from the
stream.
+ int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+ // Callback to get optional detailed error information.
+ // This must only be called if the last stream operation failed
+ // with a non-0 return code.
+ //
+ // Return value: pointer to a null-terminated character array describing
+ // the last error, or NULL if no description is available.
+ //
+ // The returned pointer is only valid until the next operation on this stream
+ // (including release).
+ const char* (*get_last_error)(struct ArrowArrayStream*);
+
+ // Release callback: release the stream's own resources.
+ // Note that arrays returned by `get_next` must be individually released.
+ void (*release)(struct ArrowArrayStream*);
+
+ // Opaque producer-specific data
+ void* private_data;
+};
+
#ifdef __cplusplus
}
#endif
diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc
index 1e602a6..5b360ab 100644
--- a/cpp/src/arrow/c/bridge.cc
+++ b/cpp/src/arrow/c/bridge.cc
@@ -18,6 +18,7 @@
#include "arrow/c/bridge.h"
#include <algorithm>
+#include <cerrno>
#include <cstring>
#include <string>
#include <utility>
@@ -1501,4 +1502,197 @@ Result<std::shared_ptr<RecordBatch>>
ImportRecordBatch(struct ArrowArray* array,
return ImportRecordBatch(array, *maybe_schema);
}
+//////////////////////////////////////////////////////////////////////////
+// C stream export
+
+namespace {
+
+class ExportedArrayStream {
+ public:
+ struct PrivateData {
+ explicit PrivateData(std::shared_ptr<RecordBatchReader> reader)
+ : reader_(std::move(reader)) {}
+
+ std::shared_ptr<RecordBatchReader> reader_;
+ std::string last_error_;
+
+ PrivateData() = default;
+ ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateData);
+ };
+
+ explicit ExportedArrayStream(struct ArrowArrayStream* stream) :
stream_(stream) {}
+
+ Status GetSchema(struct ArrowSchema* out_schema) {
+ return ExportSchema(*reader()->schema(), out_schema);
+ }
+
+ Status GetNext(struct ArrowArray* out_array) {
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(reader()->ReadNext(&batch));
+ if (batch == nullptr) {
+ // End of stream
+ ArrowArrayMarkReleased(out_array);
+ return Status::OK();
+ } else {
+ return ExportRecordBatch(*batch, out_array);
+ }
+ }
+
+ const char* GetLastError() {
+ const auto& last_error = private_data()->last_error_;
+ return last_error.empty() ? nullptr : last_error.c_str();
+ }
+
+ void Release() {
+ if (ArrowArrayStreamIsReleased(stream_)) {
+ return;
+ }
+ DCHECK_NE(private_data(), nullptr);
+ delete private_data();
+
+ ArrowArrayStreamMarkReleased(stream_);
+ }
+
+ // C-compatible callbacks
+
+ static int StaticGetSchema(struct ArrowArrayStream* stream,
+ struct ArrowSchema* out_schema) {
+ ExportedArrayStream self{stream};
+ return self.ToCError(self.GetSchema(out_schema));
+ }
+
+ static int StaticGetNext(struct ArrowArrayStream* stream,
+ struct ArrowArray* out_array) {
+ ExportedArrayStream self{stream};
+ return self.ToCError(self.GetNext(out_array));
+ }
+
+ static void StaticRelease(struct ArrowArrayStream* stream) {
+ ExportedArrayStream{stream}.Release();
+ }
+
+ static const char* StaticGetLastError(struct ArrowArrayStream* stream) {
+ return ExportedArrayStream{stream}.GetLastError();
+ }
+
+ private:
+ int ToCError(const Status& status) {
+ if (ARROW_PREDICT_TRUE(status.ok())) {
+ private_data()->last_error_.clear();
+ return 0;
+ }
+ private_data()->last_error_ = status.ToString();
+ switch (status.code()) {
+ case StatusCode::IOError:
+ return EIO;
+ case StatusCode::NotImplemented:
+ return ENOSYS;
+ case StatusCode::OutOfMemory:
+ return ENOMEM;
+ default:
+ return EINVAL; // Fallback for Invalid, TypeError, etc.
+ }
+ }
+
+ PrivateData* private_data() {
+ return reinterpret_cast<PrivateData*>(stream_->private_data);
+ }
+
+ const std::shared_ptr<RecordBatchReader>& reader() { return
private_data()->reader_; }
+
+ struct ArrowArrayStream* stream_;
+};
+
+} // namespace
+
+Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
+ struct ArrowArrayStream* out) {
+ out->get_schema = ExportedArrayStream::StaticGetSchema;
+ out->get_next = ExportedArrayStream::StaticGetNext;
+ out->get_last_error = ExportedArrayStream::StaticGetLastError;
+ out->release = ExportedArrayStream::StaticRelease;
+ out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)};
+ return Status::OK();
+}
+
+//////////////////////////////////////////////////////////////////////////
+// C stream import
+
+namespace {
+
+class ArrayStreamBatchReader : public RecordBatchReader {
+ public:
+ explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) {
+ ArrowArrayStreamMove(stream, &stream_);
+ DCHECK(!ArrowArrayStreamIsReleased(&stream_));
+ }
+
+ ~ArrayStreamBatchReader() {
+ ArrowArrayStreamRelease(&stream_);
+ DCHECK(ArrowArrayStreamIsReleased(&stream_));
+ }
+
+ std::shared_ptr<Schema> schema() const override { return CacheSchema(); }
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+ struct ArrowArray c_array;
+ RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array)));
+ if (ArrowArrayIsReleased(&c_array)) {
+ // End of stream
+ batch->reset();
+ return Status::OK();
+ } else {
+ return ImportRecordBatch(&c_array, CacheSchema()).Value(batch);
+ }
+ }
+
+ private:
+ std::shared_ptr<Schema> CacheSchema() const {
+ if (!schema_) {
+ struct ArrowSchema c_schema;
+ ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_,
&c_schema)));
+ schema_ = ImportSchema(&c_schema).ValueOrDie();
+ }
+ return schema_;
+ }
+
+ Status StatusFromCError(int errno_like) const {
+ if (ARROW_PREDICT_TRUE(errno_like == 0)) {
+ return Status::OK();
+ }
+ StatusCode code;
+ switch (errno_like) {
+ case EDOM:
+ case EINVAL:
+ case ERANGE:
+ code = StatusCode::Invalid;
+ break;
+ case ENOMEM:
+ code = StatusCode::OutOfMemory;
+ break;
+ case ENOSYS:
+ code = StatusCode::NotImplemented;
+ default:
+ code = StatusCode::IOError;
+ break;
+ }
+ const char* last_error = stream_.get_last_error(&stream_);
+ return Status(code, last_error ? std::string(last_error) : "");
+ }
+
+ mutable struct ArrowArrayStream stream_;
+ mutable std::shared_ptr<Schema> schema_;
+};
+
+} // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
+ struct ArrowArrayStream* stream) {
+ if (ArrowArrayStreamIsReleased(stream)) {
+ return Status::Invalid("Cannot import released ArrowArrayStream");
+ }
+ // XXX should we call get_schema() here to avoid crashing on error?
+ return std::make_shared<ArrayStreamBatchReader>(stream);
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h
index 8efb5d9..294f53e 100644
--- a/cpp/src/arrow/c/bridge.h
+++ b/cpp/src/arrow/c/bridge.h
@@ -29,6 +29,10 @@
namespace arrow {
+/// \defgroup c-data-interface Functions for working with the C data interface.
+///
+/// @{
+
/// \brief Export C++ DataType using the C data interface format.
///
/// The root type is considered to have empty name and metadata.
@@ -160,4 +164,34 @@ ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray*
array,
struct ArrowSchema*
schema);
+/// @}
+
+/// \defgroup c-stream-interface Functions for working with the C data
interface.
+///
+/// @{
+
+/// \brief EXPERIMENTAL: Export C++ RecordBatchReader using the C stream
interface.
+///
+/// The resulting ArrowArrayStream struct keeps the record batch reader alive
+/// until its release callback is called by the consumer.
+///
+/// \param[in] reader RecordBatchReader object to export
+/// \param[out] out C struct where to export the stream
+ARROW_EXPORT
+Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
+ struct ArrowArrayStream* out);
+
+/// \brief EXPERIMENTAL: Import C++ RecordBatchReader from the C stream
interface.
+///
+/// The ArrowArrayStream struct has its contents moved to a private object
+/// held alive by the resulting record batch reader.
+///
+/// \param[in,out] stream C stream interface struct
+/// \return Imported RecordBatchReader object
+ARROW_EXPORT
+Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
+ struct ArrowArrayStream* stream);
+
+/// @}
+
} // namespace arrow
diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc
index 6695d6e..3f84edf 100644
--- a/cpp/src/arrow/c/bridge_test.cc
+++ b/cpp/src/arrow/c/bridge_test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <cerrno>
#include <deque>
#include <functional>
#include <string>
@@ -22,6 +23,7 @@
#include <utility>
#include <vector>
+#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#include "arrow/c/bridge.h"
@@ -40,6 +42,8 @@ namespace arrow {
using internal::ArrayExportGuard;
using internal::ArrayExportTraits;
+using internal::ArrayStreamExportGuard;
+using internal::ArrayStreamExportTraits;
using internal::SchemaExportGuard;
using internal::SchemaExportTraits;
@@ -78,11 +82,11 @@ class ReleaseCallback {
explicit ReleaseCallback(CType* c_struct) : called_(false) {
orig_release_ = c_struct->release;
orig_private_data_ = c_struct->private_data;
- c_struct->release = ReleaseUnbound;
+ c_struct->release = StaticRelease;
c_struct->private_data = this;
}
- static void ReleaseUnbound(CType* c_struct) {
+ static void StaticRelease(CType* c_struct) {
reinterpret_cast<ReleaseCallback*>(c_struct->private_data)->Release(c_struct);
}
@@ -2678,4 +2682,248 @@ TEST_F(TestArrayRoundtrip, RecordBatch) {
// TODO C -> C++ -> C roundtripping tests?
+////////////////////////////////////////////////////////////////////////////
+// Array stream export tests
+
+class FailingRecordBatchReader : public RecordBatchReader {
+ public:
+ explicit FailingRecordBatchReader(Status error) : error_(std::move(error)) {}
+
+ static std::shared_ptr<Schema> expected_schema() { return arrow::schema({});
}
+
+ std::shared_ptr<Schema> schema() const override { return expected_schema(); }
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { return
error_; }
+
+ protected:
+ Status error_;
+};
+
+class BaseArrayStreamTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ pool_ = default_memory_pool();
+ orig_allocated_ = pool_->bytes_allocated();
+ }
+
+ void TearDown() override { ASSERT_EQ(pool_->bytes_allocated(),
orig_allocated_); }
+
+ RecordBatchVector MakeBatches(std::shared_ptr<Schema> schema, ArrayVector
arrays) {
+ DCHECK_EQ(schema->num_fields(), 1);
+ RecordBatchVector batches;
+ for (const auto& array : arrays) {
+ batches.push_back(RecordBatch::Make(schema, array->length(), {array}));
+ }
+ return batches;
+ }
+
+ protected:
+ MemoryPool* pool_;
+ int64_t orig_allocated_;
+};
+
+class TestArrayStreamExport : public BaseArrayStreamTest {
+ public:
+ void AssertStreamSchema(struct ArrowArrayStream* c_stream, const Schema&
expected) {
+ struct ArrowSchema c_schema;
+ ASSERT_EQ(0, c_stream->get_schema(c_stream, &c_schema));
+
+ SchemaExportGuard schema_guard(&c_schema);
+ ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema));
+ ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema));
+ AssertSchemaEqual(expected, *schema);
+ }
+
+ void AssertStreamEnd(struct ArrowArrayStream* c_stream) {
+ struct ArrowArray c_array;
+ ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array));
+
+ ArrayExportGuard guard(&c_array);
+ ASSERT_TRUE(ArrowArrayIsReleased(&c_array));
+ }
+
+ void AssertStreamNext(struct ArrowArrayStream* c_stream, const RecordBatch&
expected) {
+ struct ArrowArray c_array;
+ ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array));
+
+ ArrayExportGuard guard(&c_array);
+ ASSERT_FALSE(ArrowArrayIsReleased(&c_array));
+
+ ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array,
expected.schema()));
+ AssertBatchesEqual(expected, *batch);
+ }
+};
+
+TEST_F(TestArrayStreamExport, Empty) {
+ auto schema = arrow::schema({field("ints", int32())});
+ auto batches = MakeBatches(schema, {});
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema));
+
+ struct ArrowArrayStream c_stream;
+
+ ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+ ArrayStreamExportGuard guard(&c_stream);
+
+ ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+ AssertStreamSchema(&c_stream, *schema);
+ AssertStreamEnd(&c_stream);
+ AssertStreamEnd(&c_stream);
+}
+
+TEST_F(TestArrayStreamExport, Simple) {
+ auto schema = arrow::schema({field("ints", int32())});
+ auto batches = MakeBatches(
+ schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4,
5, null]")});
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema));
+
+ struct ArrowArrayStream c_stream;
+
+ ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+ ArrayStreamExportGuard guard(&c_stream);
+
+ ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+ AssertStreamSchema(&c_stream, *schema);
+ AssertStreamNext(&c_stream, *batches[0]);
+ AssertStreamNext(&c_stream, *batches[1]);
+ AssertStreamEnd(&c_stream);
+ AssertStreamEnd(&c_stream);
+}
+
+TEST_F(TestArrayStreamExport, ArrayLifetime) {
+ auto schema = arrow::schema({field("ints", int32())});
+ auto batches = MakeBatches(
+ schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4,
5, null]")});
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema));
+
+ struct ArrowArrayStream c_stream;
+ struct ArrowSchema c_schema;
+ struct ArrowArray c_array0, c_array1;
+
+ ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+ {
+ ArrayStreamExportGuard guard(&c_stream);
+ ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+
+ ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema));
+ ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array0));
+ ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array1));
+ AssertStreamEnd(&c_stream);
+ }
+
+ ArrayExportGuard guard0(&c_array0), guard1(&c_array1);
+
+ {
+ SchemaExportGuard schema_guard(&c_schema);
+ ASSERT_OK_AND_ASSIGN(auto got_schema, ImportSchema(&c_schema));
+ AssertSchemaEqual(*schema, *got_schema);
+ }
+
+ ASSERT_GT(pool_->bytes_allocated(), orig_allocated_);
+ ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array1, schema));
+ AssertBatchesEqual(*batches[1], *batch);
+ ASSERT_OK_AND_ASSIGN(batch, ImportRecordBatch(&c_array0, schema));
+ AssertBatchesEqual(*batches[0], *batch);
+}
+
+TEST_F(TestArrayStreamExport, Errors) {
+ auto reader =
+ std::make_shared<FailingRecordBatchReader>(Status::Invalid("some example
error"));
+
+ struct ArrowArrayStream c_stream;
+
+ ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+ ArrayStreamExportGuard guard(&c_stream);
+
+ struct ArrowSchema c_schema;
+ ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema));
+ ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema));
+ {
+ SchemaExportGuard schema_guard(&c_schema);
+ ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema));
+ AssertSchemaEqual(schema, arrow::schema({}));
+ }
+
+ struct ArrowArray c_array;
+ ASSERT_EQ(EINVAL, c_stream.get_next(&c_stream, &c_array));
+}
+
+////////////////////////////////////////////////////////////////////////////
+// Array stream roundtrip tests
+
+class TestArrayStreamRoundtrip : public BaseArrayStreamTest {
+ public:
+ void Roundtrip(std::shared_ptr<RecordBatchReader>* reader,
+ struct ArrowArrayStream* c_stream) {
+ ASSERT_OK(ExportRecordBatchReader(*reader, c_stream));
+ ASSERT_FALSE(ArrowArrayStreamIsReleased(c_stream));
+
+ ASSERT_OK_AND_ASSIGN(auto got_reader, ImportRecordBatchReader(c_stream));
+ *reader = std::move(got_reader);
+ }
+
+ void Roundtrip(
+ std::shared_ptr<RecordBatchReader> reader,
+ std::function<void(const std::shared_ptr<RecordBatchReader>&)>
check_func) {
+ ArrowArrayStream c_stream;
+
+ // NOTE: ReleaseCallback<> is not immediately usable with ArrowArrayStream,
+ // because get_next and get_schema need the original private_data.
+ std::weak_ptr<RecordBatchReader> weak_reader(reader);
+ ASSERT_EQ(weak_reader.use_count(), 1); // Expiration check will fail
otherwise
+
+ ASSERT_OK(ExportRecordBatchReader(std::move(reader), &c_stream));
+ ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+
+ {
+ ASSERT_OK_AND_ASSIGN(auto new_reader,
ImportRecordBatchReader(&c_stream));
+ // Stream was moved
+ ASSERT_TRUE(ArrowArrayStreamIsReleased(&c_stream));
+ ASSERT_FALSE(weak_reader.expired());
+
+ check_func(new_reader);
+ }
+ // Stream was released when `new_reader` was destroyed
+ ASSERT_TRUE(weak_reader.expired());
+ }
+
+ void AssertReaderNext(const std::shared_ptr<RecordBatchReader>& reader,
+ const RecordBatch& expected) {
+ ASSERT_OK_AND_ASSIGN(auto batch, reader->Next());
+ ASSERT_NE(batch, nullptr);
+ AssertBatchesEqual(expected, *batch);
+ }
+
+ void AssertReaderEnd(const std::shared_ptr<RecordBatchReader>& reader) {
+ ASSERT_OK_AND_ASSIGN(auto batch, reader->Next());
+ ASSERT_EQ(batch, nullptr);
+ }
+};
+
+TEST_F(TestArrayStreamRoundtrip, Simple) {
+ auto orig_schema = arrow::schema({field("ints", int32())});
+ auto batches = MakeBatches(orig_schema, {ArrayFromJSON(int32(), "[1, 2]"),
+ ArrayFromJSON(int32(), "[4, 5,
null]")});
+
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches,
orig_schema));
+
+ Roundtrip(std::move(reader), [&](const std::shared_ptr<RecordBatchReader>&
reader) {
+ AssertSchemaEqual(*orig_schema, *reader->schema());
+ AssertReaderNext(reader, *batches[0]);
+ AssertReaderNext(reader, *batches[1]);
+ AssertReaderEnd(reader);
+ AssertReaderEnd(reader);
+ });
+}
+
+TEST_F(TestArrayStreamRoundtrip, Errors) {
+ auto reader = std::make_shared<FailingRecordBatchReader>(
+ Status::Invalid("roundtrip error example"));
+
+ Roundtrip(std::move(reader), [&](const std::shared_ptr<RecordBatchReader>&
reader) {
+ auto status = reader->Next().status();
+ ASSERT_RAISES(Invalid, status);
+ ASSERT_THAT(status.message(), ::testing::HasSubstr("roundtrip error
example"));
+ });
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/c/helpers.h b/cpp/src/arrow/c/helpers.h
index a1a1240..a5c1f6f 100644
--- a/cpp/src/arrow/c/helpers.h
+++ b/cpp/src/arrow/c/helpers.h
@@ -82,6 +82,36 @@ inline void ArrowArrayRelease(struct ArrowArray* array) {
}
}
+/// Query whether the C array stream is released
+inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) {
+ return stream->release == NULL;
+}
+
+/// Mark the C array stream released (for use in release callbacks)
+inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) {
+ stream->release = NULL;
+}
+
+/// Move the C array stream from `src` to `dest`
+///
+/// Note `dest` must *not* point to a valid stream already, otherwise there
+/// will be a memory leak.
+inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
+ struct ArrowArrayStream* dest) {
+ assert(dest != src);
+ assert(!ArrowArrayStreamIsReleased(src));
+ memcpy(dest, src, sizeof(struct ArrowArrayStream));
+ ArrowArrayStreamMarkReleased(src);
+}
+
+/// Release the C array stream, if necessary, by calling its release callback
+inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
+ if (!ArrowArrayStreamIsReleased(stream)) {
+ stream->release(stream);
+ assert(ArrowArrayStreamIsReleased(stream));
+ }
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/cpp/src/arrow/c/util_internal.h b/cpp/src/arrow/c/util_internal.h
index 3ece524..6a33be9 100644
--- a/cpp/src/arrow/c/util_internal.h
+++ b/cpp/src/arrow/c/util_internal.h
@@ -34,6 +34,12 @@ struct ArrayExportTraits {
static constexpr auto ReleaseFunc = &ArrowArrayRelease;
};
+struct ArrayStreamExportTraits {
+ typedef struct ArrowArrayStream CType;
+ static constexpr auto IsReleasedFunc = &ArrowArrayStreamIsReleased;
+ static constexpr auto ReleaseFunc = &ArrowArrayStreamRelease;
+};
+
// A RAII-style object to release a C Array / Schema struct at block scope
exit.
template <typename Traits>
class ExportGuard {
@@ -73,6 +79,7 @@ class ExportGuard {
using SchemaExportGuard = ExportGuard<SchemaExportTraits>;
using ArrayExportGuard = ExportGuard<ArrayExportTraits>;
+using ArrayStreamExportGuard = ExportGuard<ArrayStreamExportTraits>;
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/arrow/python/CMakeLists.txt
b/cpp/src/arrow/python/CMakeLists.txt
index b972b0d..9601557 100644
--- a/cpp/src/arrow/python/CMakeLists.txt
+++ b/cpp/src/arrow/python/CMakeLists.txt
@@ -38,6 +38,7 @@ set(ARROW_PYTHON_SRCS
inference.cc
init.cc
io.cc
+ ipc.cc
numpy_convert.cc
numpy_to_arrow.cc
python_to_arrow.cc
diff --git a/cpp/src/arrow/python/extension_type.h
b/cpp/src/arrow/python/extension_type.h
index 0041c8a..f5b12e6 100644
--- a/cpp/src/arrow/python/extension_type.h
+++ b/cpp/src/arrow/python/extension_type.h
@@ -44,6 +44,7 @@ class ARROW_PYTHON_EXPORT PyExtensionType : public
ExtensionType {
std::string Serialize() const override;
// For use from Cython
+ // Assumes that `typ` is borrowed
static Status FromClass(const std::shared_ptr<DataType> storage_type,
const std::string extension_name, PyObject* typ,
std::shared_ptr<ExtensionType>* out);
diff --git a/cpp/src/arrow/python/ipc.cc b/cpp/src/arrow/python/ipc.cc
new file mode 100644
index 0000000..2e6c9d9
--- /dev/null
+++ b/cpp/src/arrow/python/ipc.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/ipc.h"
+
+#include <memory>
+
+#include "arrow/python/pyarrow.h"
+
+namespace arrow {
+namespace py {
+
+PyRecordBatchReader::PyRecordBatchReader() {}
+
+Status PyRecordBatchReader::Init(std::shared_ptr<Schema> schema, PyObject*
iterable) {
+ schema_ = std::move(schema);
+
+ iterator_.reset(PyObject_GetIter(iterable));
+ return CheckPyError();
+}
+
+std::shared_ptr<Schema> PyRecordBatchReader::schema() const { return schema_; }
+
+Status PyRecordBatchReader::ReadNext(std::shared_ptr<RecordBatch>* batch) {
+ PyAcquireGIL lock;
+
+ if (!iterator_) {
+ // End of stream
+ batch->reset();
+ return Status::OK();
+ }
+
+ OwnedRef py_batch(PyIter_Next(iterator_.obj()));
+ if (!py_batch) {
+ RETURN_IF_PYERROR();
+ // End of stream
+ batch->reset();
+ iterator_.reset();
+ return Status::OK();
+ }
+
+ return unwrap_batch(py_batch.obj()).Value(batch);
+}
+
+Result<std::shared_ptr<RecordBatchReader>> PyRecordBatchReader::Make(
+ std::shared_ptr<Schema> schema, PyObject* iterable) {
+ auto reader = std::shared_ptr<PyRecordBatchReader>(new
PyRecordBatchReader());
+ RETURN_NOT_OK(reader->Init(std::move(schema), iterable));
+ return reader;
+}
+
+} // namespace py
+} // namespace arrow
diff --git a/cpp/src/arrow/python/ipc.h b/cpp/src/arrow/python/ipc.h
new file mode 100644
index 0000000..92232ed
--- /dev/null
+++ b/cpp/src/arrow/python/ipc.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/python/common.h"
+#include "arrow/python/visibility.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+namespace py {
+
+class ARROW_PYTHON_EXPORT PyRecordBatchReader : public RecordBatchReader {
+ public:
+ std::shared_ptr<Schema> schema() const override;
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* batch) override;
+
+ // For use from Cython
+ // Assumes that `iterable` is borrowed
+ static Result<std::shared_ptr<RecordBatchReader>>
Make(std::shared_ptr<Schema>,
+ PyObject* iterable);
+
+ protected:
+ PyRecordBatchReader();
+
+ Status Init(std::shared_ptr<Schema>, PyObject* iterable);
+
+ std::shared_ptr<Schema> schema_;
+ OwnedRefNoGIL iterator_;
+};
+
+} // namespace py
+} // namespace arrow
diff --git a/docs/source/cpp/api.rst b/docs/source/cpp/api.rst
index 59d2210..626b388 100644
--- a/docs/source/cpp/api.rst
+++ b/docs/source/cpp/api.rst
@@ -29,6 +29,7 @@ API Reference
api/scalar
api/builder
api/table
+ api/c_abi
api/compute
api/tensor
api/utilities
diff --git a/docs/source/cpp/api.rst b/docs/source/cpp/api/c_abi.rst
similarity index 59%
copy from docs/source/cpp/api.rst
copy to docs/source/cpp/api/c_abi.rst
index 59d2210..4e451c3 100644
--- a/docs/source/cpp/api.rst
+++ b/docs/source/cpp/api/c_abi.rst
@@ -15,25 +15,34 @@
.. specific language governing permissions and limitations
.. under the License.
-*************
-API Reference
-*************
-
-.. toctree::
- :maxdepth: 3
-
- api/support
- api/memory
- api/datatype
- api/array
- api/scalar
- api/builder
- api/table
- api/compute
- api/tensor
- api/utilities
- api/io
- api/formats
- api/cuda
- api/flight
- api/filesystem
+============
+C Interfaces
+============
+
+.. seealso::
+ The :ref:`C data interface <c-data-interface>` and
+ :ref:`C stream interface <c-stream-interface>` specifications.
+
+ABI Structures
+==============
+
+.. doxygenstruct:: ArrowSchema
+ :project: arrow_cpp
+
+.. doxygenstruct:: ArrowArray
+ :project: arrow_cpp
+
+.. doxygenstruct:: ArrowArrayStream
+ :project: arrow_cpp
+
+C Data Interface
+================
+
+.. doxygengroup:: c-data-interface
+ :content-only:
+
+C Stream Interface
+==================
+
+.. doxygengroup:: c-stream-interface
+ :content-only:
diff --git a/docs/source/format/CDataInterface.rst
b/docs/source/format/CDataInterface.rst
index 768dc47..dbecf30 100644
--- a/docs/source/format/CDataInterface.rst
+++ b/docs/source/format/CDataInterface.rst
@@ -535,6 +535,8 @@ Therefore, the consumer MUST not try to interfere with the
producer's
handling of these members' lifetime. The only way the consumer influences
data lifetime is by calling the base structure's ``release`` callback.
+.. _c-data-interface-released:
+
Released structure
''''''''''''''''''
diff --git a/docs/source/format/CStreamInterface.rst
b/docs/source/format/CStreamInterface.rst
new file mode 100644
index 0000000..b8ccce3
--- /dev/null
+++ b/docs/source/format/CStreamInterface.rst
@@ -0,0 +1,218 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied. See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. highlight:: c
+
+.. _c-stream-interface:
+
+============================
+The Arrow C stream interface
+============================
+
+.. warning::
+ This interface is experimental and may evolve based on feedback from
+ early users. ABI stability is not guaranteed yet. Feel free to
+ `contact us <https://arrow.apache.org/community/>`__.
+
+The C stream interface builds on the structures defined in the
+:ref:`C data interface <c-data-interface>` and combines them into a
higher-level
+specification so as to ease the communication of streaming data within a single
+process.
+
+Semantics
+=========
+
+An Arrow C stream exposes a streaming source of data chunks, each with the
+same schema. Chunks are obtained by calling a blocking pull-style iteration
+function.
+
+Structure definition
+====================
+
+The C stream interface is defined by a single ``struct`` definition::
+
+ struct ArrowArrayStream {
+ // Callbacks providing stream functionality
+ int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+ int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+ const char* (*get_last_error)(struct ArrowArrayStream*);
+
+ // Release callback
+ void (*release)(struct ArrowArrayStream*);
+
+ // Opaque producer-specific data
+ void* private_data;
+ };
+
+The ArrowArrayStream structure
+------------------------------
+
+The ``ArrowArrayStream`` provides the required callbacks to interact with a
+streaming source of Arrow arrays. It has the following fields:
+
+.. c:member:: int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*,
struct ArrowSchema* out)
+
+ *Mandatory.* This callback allows the consumer to query the schema of
+ the chunks of data in the stream. The schema is the same for all
+ data chunks.
+
+ This callback must NOT be called on a released ``ArrowArrayStream``.
+
+ *Return value:* 0 on success, a non-zero
+ :ref:`error code <c-stream-interface-error-codes>` otherwise.
+
+.. c:member:: int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*,
struct ArrowArray* out)
+
+ *Mandatory.* This callback allows the consumer to get the next chunk
+ of data in the stream.
+
+ This callback must NOT be called on a released ``ArrowArrayStream``.
+
+ *Return value:* 0 on success, a non-zero
+ :ref:`error code <c-stream-interface-error-codes>` otherwise.
+
+ On success, the consumer must check whether the ``ArrowArray`` is
+ marked :ref:`released <c-data-interface-released>`. If the
+ ``ArrowArray`` is released, then the end of stream has been reached.
+ Otherwise, the ``ArrowArray`` contains a valid data chunk.
+
+.. c:member:: const char* (*ArrowArrayStream.get_last_error)(struct
ArrowArrayStream*)
+
+ *Mandatory.* This callback allows the consumer to get a textual description
+ of the last error.
+
+ This callback must ONLY be called if the last operation on the
+ ``ArrowArrayStream`` returned an error. It must NOT be called on a
+ released ``ArrowArrayStream``.
+
+ *Return value:* a pointer to a NULL-terminated character string
(UTF8-encoded).
+ NULL can also be returned if no detailed description is available.
+
+ The returned pointer is only guaranteed to be valid until the next call of
+ one of the stream's callbacks. The character string it points to should
+ be copied to consumer-managed storage if it is intended to survive longer.
+
+.. c:member:: void (*ArrowArrayStream.release)(struct ArrowArrayStream*)
+
+ *Mandatory.* A pointer to a producer-provided release callback.
+
+.. c:member:: void* ArrowArrayStream.private_data
+
+ *Optional.* An opaque pointer to producer-provided private data.
+
+ Consumers MUST not process this member. Lifetime of this member
+ is handled by the producer, and especially by the release callback.
+
+
+.. _c-stream-interface-error-codes:
+
+Error codes
+-----------
+
+The ``get_schema`` and ``get_next`` callbacks may return an error under the
form
+of a non-zero integer code. Such error codes should be interpreted like
+``errno`` numbers (as defined by the local platform). Note that the symbolic
+forms of these constants are stable from platform to platform, but their
numeric
+values are platform-specific.
+
+In particular, it is recommended to recognize the following values:
+
+* ``EINVAL``: for a parameter or input validation error
+* ``ENOMEM``: for a memory allocation failure (out of memory)
+* ``EIO``: for a generic input/output error
+
+.. seealso::
+ `Standard POSIX error codes
<https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/errno.h.html>`__.
+
+ `Error codes recognized by the Windows C runtime library
+
<https://docs.microsoft.com/en-us/cpp/c-runtime-library/errno-doserrno-sys-errlist-and-sys-nerr>`__.
+
+Result lifetimes
+----------------
+
+The data returned by the ``get_schema`` and ``get_next`` callbacks must be
+released independently. Their lifetimes are not tied to that of the
+``ArrowArrayStream``.
+
+Stream lifetime
+---------------
+
+Lifetime of the C stream is managed using a release callback with similar
+usage as in the :ref:`C data interface <c-data-interface-released>`.
+
+
+C consumer example
+==================
+
+Let's say a particular database provides the following C API to execute
+a SQL query and return the result set as a Arrow C stream::
+
+ void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);
+
+Then a consumer could use the following code to iterate over the results::
+
+ static void handle_error(int errcode, struct ArrowArrayStream* stream) {
+ // Print stream error
+ const char* errdesc = stream->get_last_error(stream);
+ if (errdesc != NULL) {
+ fputs(errdesc, stderr);
+ } else {
+ fputs(strerror(errcode), stderr);
+ }
+ // Release stream and abort
+ stream->release(stream),
+ exit(1);
+ }
+
+ void run_query() {
+ struct ArrowArrayStream stream;
+ struct ArrowSchema schema;
+ struct ArrowArray chunk;
+ int errcode;
+
+ MyDB_Query("SELECT * FROM my_table", &stream);
+
+ // Query result set schema
+ errcode = stream.get_schema(&stream, &schema);
+ if (errcode != 0) {
+ handle_error(errcode, &stream);
+ }
+
+ int64_t num_rows = 0;
+
+ // Iterate over results: loop until error or end of stream
+ while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
+ chunk.release != NULL) {
+ // Do something with chunk...
+ fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
+ num_rows += chunk.length;
+
+ // Release chunk
+ chunk.release(&chunk);
+ }
+
+ // Was it an error?
+ if (errcode != 0) {
+ handle_error(errcode, &stream);
+ }
+
+ fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);
+
+ // Release schema and stream
+ schema.release(&schema);
+ stream.release(&stream);
+ }
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 2d95e22..cfcf865 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -43,6 +43,7 @@ such topics as:
format/Flight
format/Integration
format/CDataInterface
+ format/CStreamInterface
format/Other
.. _toc.usage:
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 028ddc6..34c6693 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -28,7 +28,7 @@ from collections.abc import Mapping
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
- _CRecordBatchReader, ensure_type,
+ RecordBatchReader, ensure_type,
maybe_unbox_memory_pool, get_input_stream,
native_transcoding_input_stream,
pyarrow_wrap_schema, pyarrow_wrap_table,
@@ -633,7 +633,7 @@ cdef _get_convert_options(ConvertOptions convert_options,
out[0] = convert_options.options
-cdef class CSVStreamingReader(_CRecordBatchReader):
+cdef class CSVStreamingReader(RecordBatchReader):
"""An object that reads record batches incrementally from a CSV file.
Should not be instantiated directly by user code.
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index 484fbb2..3950cee 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -35,7 +35,7 @@ from pyarrow.lib cimport *
from pyarrow.lib import ArrowException, ArrowInvalid
from pyarrow.lib import as_buffer, frombytes, tobytes
from pyarrow.includes.libarrow_flight cimport *
-from pyarrow.ipc import _ReadPandasOption, _get_legacy_format_default
+from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin
import pyarrow.lib as lib
@@ -812,7 +812,7 @@ cdef class FlightStreamChunk(_Weakrefable):
self.chunk.data != NULL, self.chunk.app_metadata != NULL)
-cdef class _MetadataRecordBatchReader(_Weakrefable):
+cdef class _MetadataRecordBatchReader(_Weakrefable, _ReadPandasMixin):
"""A reader for Flight streams."""
# Needs to be separate class so the "real" class can subclass the
@@ -869,8 +869,7 @@ cdef class _MetadataRecordBatchReader(_Weakrefable):
return chunk
-cdef class MetadataRecordBatchReader(_MetadataRecordBatchReader,
- _ReadPandasOption):
+cdef class MetadataRecordBatchReader(_MetadataRecordBatchReader):
"""The virtual base class for readers for Flight streams."""
@@ -1365,7 +1364,7 @@ cdef class RecordBatchStream(FlightDataStream):
data_source : RecordBatchReader or Table
options : pyarrow.ipc.IpcWriteOptions, optional
"""
- if (not isinstance(data_source, _CRecordBatchReader) and
+ if (not isinstance(data_source, RecordBatchReader) and
not isinstance(data_source, lib.Table)):
raise TypeError("Expected RecordBatchReader or Table, "
"but got: {}".format(type(data_source)))
@@ -1375,8 +1374,8 @@ cdef class RecordBatchStream(FlightDataStream):
cdef CFlightDataStream* to_stream(self) except *:
cdef:
shared_ptr[CRecordBatchReader] reader
- if isinstance(self.data_source, _CRecordBatchReader):
- reader = (<_CRecordBatchReader> self.data_source).reader
+ if isinstance(self.data_source, RecordBatchReader):
+ reader = (<RecordBatchReader> self.data_source).reader
elif isinstance(self.data_source, lib.Table):
table = (<Table> self.data_source).table
reader.reset(new TableBatchReader(deref(table)))
@@ -1617,7 +1616,7 @@ cdef CStatus _data_stream_next(void* self,
CFlightPayload* payload) except *:
else:
result, metadata = result, None
- if isinstance(result, (Table, _CRecordBatchReader)):
+ if isinstance(result, (Table, RecordBatchReader)):
if metadata:
raise ValueError("Can only return metadata alongside a "
"RecordBatch.")
diff --git a/python/pyarrow/cffi.py b/python/pyarrow/cffi.py
index 8880c25..961b61d 100644
--- a/python/pyarrow/cffi.py
+++ b/python/pyarrow/cffi.py
@@ -52,6 +52,18 @@ c_source = """
// Opaque producer-specific data
void* private_data;
};
+
+ struct ArrowArrayStream {
+ int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+ int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+ const char* (*get_last_error)(struct ArrowArrayStream*);
+
+ // Release callback
+ void (*release)(struct ArrowArrayStream*);
+ // Opaque producer-specific data
+ void* private_data;
+ };
"""
# TODO use out-of-line mode for faster import and avoid C parsing
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index d1c4110..5d5800e 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1393,7 +1393,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc"
nogil:
" arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
@staticmethod
CResult[shared_ptr[CRecordBatchReader]] Open(
- const CInputStream* stream, const CIpcReadOptions& options)
+ const shared_ptr[CInputStream], const CIpcReadOptions&)
@staticmethod
CResult[shared_ptr[CRecordBatchReader]] Open2" Open"(
@@ -1965,6 +1965,14 @@ cdef extern from 'arrow/python/inference.h' namespace
'arrow::py':
c_bool IsPyFloat(object o)
+cdef extern from 'arrow/python/ipc.h' namespace 'arrow::py':
+ cdef cppclass CPyRecordBatchReader" arrow::py::PyRecordBatchReader" \
+ (CRecordBatchReader):
+ @staticmethod
+ CResult[shared_ptr[CRecordBatchReader]] Make(shared_ptr[CSchema],
+ object)
+
+
cdef extern from 'arrow/extension_type.h' namespace 'arrow':
cdef cppclass CExtensionTypeRegistry" arrow::ExtensionTypeRegistry":
@staticmethod
@@ -2064,6 +2072,9 @@ cdef extern from 'arrow/c/abi.h':
cdef struct ArrowArray:
pass
+ cdef struct ArrowArrayStream:
+ pass
+
cdef extern from 'arrow/c/bridge.h' namespace 'arrow' nogil:
CStatus ExportType(CDataType&, ArrowSchema* out)
CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*)
@@ -2084,3 +2095,8 @@ cdef extern from 'arrow/c/bridge.h' namespace 'arrow'
nogil:
shared_ptr[CSchema])
CResult[shared_ptr[CRecordBatch]] ImportRecordBatch(ArrowArray*,
ArrowSchema*)
+
+ CStatus ExportRecordBatchReader(shared_ptr[CRecordBatchReader],
+ ArrowArrayStream*)
+ CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader(
+ ArrowArrayStream*)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index bcb7477..74a81c6 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -400,8 +400,29 @@ cdef _get_input_stream(object source,
shared_ptr[CInputStream]* out):
get_input_stream(source, True, out)
-cdef class _CRecordBatchReader(_Weakrefable):
- """The base RecordBatchReader wrapper.
+class _ReadPandasMixin:
+
+ def read_pandas(self, **options):
+ """
+ Read contents of stream to a pandas.DataFrame.
+
+ Read all record batches as a pyarrow.Table then convert it to a
+ pandas.DataFrame using Table.to_pandas.
+
+ Parameters
+ ----------
+ **options : arguments to forward to Table.to_pandas
+
+ Returns
+ -------
+ df : pandas.DataFrame
+ """
+ table = self.read_all()
+ return table.to_pandas(**options)
+
+
+cdef class RecordBatchReader(_Weakrefable):
+ """Base class for reading stream of record batches.
Provides common implementations of convenience methods. Should not
be instantiated directly by user code.
@@ -413,6 +434,18 @@ cdef class _CRecordBatchReader(_Weakrefable):
while True:
yield self.read_next_batch()
+ @property
+ def schema(self):
+ """
+ Shared schema of the record batches in the stream.
+ """
+ cdef shared_ptr[CSchema] c_schema
+
+ with nogil:
+ c_schema = self.reader.get().schema()
+
+ return pyarrow_wrap_schema(c_schema)
+
def get_next_batch(self):
import warnings
warnings.warn('Please use read_next_batch instead of '
@@ -447,21 +480,91 @@ cdef class _CRecordBatchReader(_Weakrefable):
check_status(self.reader.get().ReadAll(&table))
return pyarrow_wrap_table(table)
+ read_pandas = _ReadPandasMixin.read_pandas
+
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
+ def _export_to_c(self, uintptr_t out_ptr):
+ """
+ Export to a C ArrowArrayStream struct, given its pointer.
+
+ Parameters
+ ----------
+ out_ptr: int
+ The raw pointer to a C ArrowArrayStream struct.
-cdef class _RecordBatchStreamReader(_CRecordBatchReader):
+ Be careful: if you don't pass the ArrowArrayStream struct to a
+ consumer, array memory will leak. This is a low-level function
+ intended for expert users.
+ """
+ with nogil:
+ check_status(ExportRecordBatchReader(
+ self.reader, <ArrowArrayStream*> out_ptr))
+
+ @staticmethod
+ def _import_from_c(uintptr_t in_ptr):
+ """
+ Import RecordBatchReader from a C ArrowArrayStream struct,
+ given its pointer.
+
+ Parameters
+ ----------
+ in_ptr: int
+ The raw pointer to a C ArrowArrayStream struct.
+
+ This is a low-level function intended for expert users.
+ """
+ cdef:
+ shared_ptr[CRecordBatchReader] c_reader
+ RecordBatchReader self
+
+ with nogil:
+ c_reader = GetResultValue(ImportRecordBatchReader(
+ <ArrowArrayStream*> in_ptr))
+
+ self = RecordBatchReader.__new__(RecordBatchReader)
+ self.reader = c_reader
+ return self
+
+ @staticmethod
+ def from_batches(schema, batches):
+ """
+ Create RecordBatchReader from an iterable of batches.
+
+ Parameters
+ ----------
+ schema : Schema
+ The shared schema of the record batches
+ batches : Iterable[RecordBatch]
+ The batches that this reader will return.
+
+ Returns
+ -------
+ reader : RecordBatchReader
+ """
+ cdef:
+ shared_ptr[CSchema] c_schema
+ shared_ptr[CRecordBatchReader] c_reader
+ RecordBatchReader self
+
+ c_schema = pyarrow_unwrap_schema(schema)
+ c_reader = GetResultValue(CPyRecordBatchReader.Make(
+ c_schema, batches))
+
+ self = RecordBatchReader.__new__(RecordBatchReader)
+ self.reader = c_reader
+ return self
+
+
+cdef class _RecordBatchStreamReader(RecordBatchReader):
cdef:
shared_ptr[CInputStream] in_stream
CIpcReadOptions options
- cdef readonly:
- Schema schema
-
def __cinit__(self):
pass
@@ -469,9 +572,7 @@ cdef class _RecordBatchStreamReader(_CRecordBatchReader):
_get_input_stream(source, &self.in_stream)
with nogil:
self.reader = GetResultValue(CRecordBatchStreamReader.Open(
- self.in_stream.get(), self.options))
-
- self.schema = pyarrow_wrap_schema(self.reader.get().schema())
+ self.in_stream, self.options))
cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
@@ -565,6 +666,8 @@ cdef class _RecordBatchFileReader(_Weakrefable):
return pyarrow_wrap_table(table)
+ read_pandas = _ReadPandasMixin.read_pandas
+
def __enter__(self):
return self
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 19e80ba..65325c4 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -22,6 +22,7 @@ import os
import pyarrow as pa
from pyarrow.lib import (IpcWriteOptions, Message, MessageReader, # noqa
+ RecordBatchReader, _ReadPandasMixin,
MetadataVersion,
read_message, read_record_batch, read_schema,
read_tensor, write_tensor,
@@ -29,28 +30,7 @@ from pyarrow.lib import (IpcWriteOptions, Message,
MessageReader, # noqa
import pyarrow.lib as lib
-class _ReadPandasOption:
-
- def read_pandas(self, **options):
- """
- Read contents of stream to a pandas.DataFrame.
-
- Read all record batches as a pyarrow.Table then convert it to a
- pandas.DataFrame using Table.to_pandas.
-
- Parameters
- ----------
- **options : arguments to forward to Table.to_pandas
-
- Returns
- -------
- df : pandas.DataFrame
- """
- table = self.read_all()
- return table.to_pandas(**options)
-
-
-class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption):
+class RecordBatchStreamReader(lib._RecordBatchStreamReader):
"""
Reader for the Arrow streaming binary format.
@@ -97,7 +77,7 @@ class RecordBatchStreamWriter(lib._RecordBatchStreamWriter):
self._open(sink, schema, options=options)
-class RecordBatchFileReader(lib._RecordBatchFileReader, _ReadPandasOption):
+class RecordBatchFileReader(lib._RecordBatchFileReader):
"""
Class for reading Arrow record batch data from the Arrow binary file format
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 8e06dcd..5b2958a 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -483,7 +483,7 @@ cdef class _CRecordBatchWriter(_Weakrefable):
shared_ptr[CRecordBatchWriter] writer
-cdef class _CRecordBatchReader(_Weakrefable):
+cdef class RecordBatchReader(_Weakrefable):
cdef:
shared_ptr[CRecordBatchReader] reader
diff --git a/python/pyarrow/tests/test_cffi.py
b/python/pyarrow/tests/test_cffi.py
index bcf3c72..5505e57 100644
--- a/python/pyarrow/tests/test_cffi.py
+++ b/python/pyarrow/tests/test_cffi.py
@@ -26,6 +26,13 @@ except ImportError:
import pytest
+try:
+ import pandas as pd
+ import pandas.testing as tm
+except ImportError:
+ pd = tm = None
+
+
needs_cffi = pytest.mark.skipif(ffi is None,
reason="test needs cffi package installed")
@@ -36,6 +43,34 @@ assert_schema_released = pytest.raises(
assert_array_released = pytest.raises(
ValueError, match="Cannot import released ArrowArray")
+assert_stream_released = pytest.raises(
+ ValueError, match="Cannot import released ArrowArrayStream")
+
+
+def make_schema():
+ return pa.schema([('ints', pa.list_(pa.int32()))],
+ metadata={b'key1': b'value1'})
+
+
+def make_batch():
+ return pa.record_batch([[[1], [2, 42]]], make_schema())
+
+
+def make_batches():
+ schema = make_schema()
+ return [
+ pa.record_batch([[[1], [2, 42]]], schema),
+ pa.record_batch([[None, [], [5, 6]]], schema),
+ ]
+
+
+def make_serialized(schema, batches):
+ with pa.BufferOutputStream() as sink:
+ with pa.ipc.new_stream(sink, schema) as out:
+ for batch in batches:
+ out.write(batch)
+ return sink.getvalue()
+
@needs_cffi
def test_export_import_type():
@@ -120,10 +155,6 @@ def test_export_import_schema():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
- def make_schema():
- return pa.schema([('ints', pa.list_(pa.int32()))],
- metadata={b'key1': b'value1'})
-
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
@@ -156,13 +187,6 @@ def test_export_import_batch():
c_array = ffi.new("struct ArrowArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
- def make_schema():
- return pa.schema([('ints', pa.list_(pa.int32()))],
- metadata={b'key1': b'value1'})
-
- def make_batch():
- return pa.record_batch([[[1], [2, 42]]], make_schema())
-
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
@@ -172,7 +196,7 @@ def test_export_import_batch():
py_value = batch.to_pydict()
batch._export_to_c(ptr_array)
assert pa.total_allocated_bytes() > old_allocated
- # Delete recreate C++ object from exported pointer
+ # Delete and recreate C++ object from exported pointer
del batch
batch_new = pa.RecordBatch._import_from_c(ptr_array, schema)
assert batch_new.to_pydict() == py_value
@@ -192,8 +216,6 @@ def test_export_import_batch():
del batch
batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
assert batch_new.to_pydict() == py_value
- print(batch_new.schema)
- print(make_schema())
assert batch_new.schema == make_schema()
assert pa.total_allocated_bytes() > old_allocated
del batch_new
@@ -211,3 +233,63 @@ def test_export_import_batch():
# Now released
with assert_schema_released:
pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
+
+
+def _export_import_batch_reader(ptr_stream, reader_factory):
+ # Prepare input
+ batches = make_batches()
+ schema = batches[0].schema
+
+ reader = reader_factory(schema, batches)
+ reader._export_to_c(ptr_stream)
+ # Delete and recreate C++ object from exported pointer
+ del reader, batches
+
+ reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
+ assert reader_new.schema == schema
+ got_batches = list(reader_new)
+ del reader_new
+ assert got_batches == make_batches()
+
+ # Test read_pandas()
+ if pd is not None:
+ batches = make_batches()
+ schema = batches[0].schema
+ expected_df = pa.Table.from_batches(batches).to_pandas()
+
+ reader = reader_factory(schema, batches)
+ reader._export_to_c(ptr_stream)
+ del reader, batches
+
+ reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
+ got_df = reader_new.read_pandas()
+ del reader_new
+ tm.assert_frame_equal(expected_df, got_df)
+
+
+def make_ipc_stream_reader(schema, batches):
+ return pa.ipc.open_stream(make_serialized(schema, batches))
+
+
+def make_py_record_batch_reader(schema, batches):
+ return pa.ipc.RecordBatchReader.from_batches(schema, batches)
+
+
+@needs_cffi
[email protected]('reader_factory',
+ [make_ipc_stream_reader,
+ make_py_record_batch_reader])
+def test_export_import_batch_reader(reader_factory):
+ c_stream = ffi.new("struct ArrowArrayStream*")
+ ptr_stream = int(ffi.cast("uintptr_t", c_stream))
+
+ gc.collect() # Make sure no Arrow data dangles in a ref cycle
+ old_allocated = pa.total_allocated_bytes()
+
+ _export_import_batch_reader(ptr_stream, reader_factory)
+
+ assert pa.total_allocated_bytes() == old_allocated
+
+ # Now released
+ with assert_stream_released:
+ pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 44f8499..3d3e72e 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -15,11 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+from collections import UserList
import io
import pytest
import socket
import sys
import threading
+import weakref
import numpy as np
@@ -858,3 +860,36 @@ def test_write_empty_ipc_file():
table = reader.read_all()
assert len(table) == 0
assert table.schema.equals(schema)
+
+
+def test_py_record_batch_reader():
+ def make_schema():
+ return pa.schema([('field', pa.int64())])
+
+ def make_batches():
+ schema = make_schema()
+ batch1 = pa.record_batch([[1, 2, 3]], schema=schema)
+ batch2 = pa.record_batch([[4, 5]], schema=schema)
+ return [batch1, batch2]
+
+ # With iterable
+ batches = UserList(make_batches()) # weakrefable
+ wr = weakref.ref(batches)
+
+ with pa.ipc.RecordBatchReader.from_batches(make_schema(),
+ batches) as reader:
+ batches = None
+ assert wr() is not None
+ assert list(reader) == make_batches()
+ assert wr() is None
+
+ # With iterator
+ batches = iter(UserList(make_batches())) # weakrefable
+ wr = weakref.ref(batches)
+
+ with pa.ipc.RecordBatchReader.from_batches(make_schema(),
+ batches) as reader:
+ batches = None
+ assert wr() is not None
+ assert list(reader) == make_batches()
+ assert wr() is None