This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 97879eb  ARROW-9761: [C/C++] Add experimental C stream inferface
97879eb is described below

commit 97879eb970bac52d93d2247200b9ca7acf6f3f93
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Oct 1 12:00:12 2020 +0200

    ARROW-9761: [C/C++] Add experimental C stream inferface
    
    The goal is to have a standardized ABI to communicate streams of 
homogeneous arrays or record batches (for example for database result sets).
    
    The trickiest part is error reporting.  This proposal tries to strike a 
compromise between simplicity (an integer error code mapping to errno values) 
and expressivity (an optional description string for application-specific and 
context-specific details).
    
    Closes #8052 from pitrou/ARROW-9761-c-array-stream
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/c/abi.h                      |  38 +++++
 cpp/src/arrow/c/bridge.cc                  | 194 ++++++++++++++++++++++
 cpp/src/arrow/c/bridge.h                   |  34 ++++
 cpp/src/arrow/c/bridge_test.cc             | 252 ++++++++++++++++++++++++++++-
 cpp/src/arrow/c/helpers.h                  |  30 ++++
 cpp/src/arrow/c/util_internal.h            |   7 +
 cpp/src/arrow/python/CMakeLists.txt        |   1 +
 cpp/src/arrow/python/extension_type.h      |   1 +
 cpp/src/arrow/python/ipc.cc                |  67 ++++++++
 cpp/src/arrow/python/ipc.h                 |  52 ++++++
 docs/source/cpp/api.rst                    |   1 +
 docs/source/cpp/{api.rst => api/c_abi.rst} |  53 +++---
 docs/source/format/CDataInterface.rst      |   2 +
 docs/source/format/CStreamInterface.rst    | 218 +++++++++++++++++++++++++
 docs/source/index.rst                      |   1 +
 python/pyarrow/_csv.pyx                    |   4 +-
 python/pyarrow/_flight.pyx                 |  15 +-
 python/pyarrow/cffi.py                     |  12 ++
 python/pyarrow/includes/libarrow.pxd       |  18 ++-
 python/pyarrow/ipc.pxi                     | 121 ++++++++++++--
 python/pyarrow/ipc.py                      |  26 +--
 python/pyarrow/lib.pxd                     |   2 +-
 python/pyarrow/tests/test_cffi.py          | 110 +++++++++++--
 python/pyarrow/tests/test_ipc.py           |  35 ++++
 24 files changed, 1212 insertions(+), 82 deletions(-)

diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h
index 821bc96..a78170d 100644
--- a/cpp/src/arrow/c/abi.h
+++ b/cpp/src/arrow/c/abi.h
@@ -60,6 +60,44 @@ struct ArrowArray {
   void* private_data;
 };
 
+// EXPERIMENTAL: C stream interface
+
+struct ArrowArrayStream {
+  // Callback to get the stream type
+  // (will be the same for all arrays in the stream).
+  //
+  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+  //
+  // If successful, the ArrowSchema must be released independently from the 
stream.
+  int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+
+  // Callback to get the next array
+  // (if no error and the array is released, the stream has ended)
+  //
+  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+  //
+  // If successful, the ArrowArray must be released independently from the 
stream.
+  int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+  // Callback to get optional detailed error information.
+  // This must only be called if the last stream operation failed
+  // with a non-0 return code.
+  //
+  // Return value: pointer to a null-terminated character array describing
+  // the last error, or NULL if no description is available.
+  //
+  // The returned pointer is only valid until the next operation on this stream
+  // (including release).
+  const char* (*get_last_error)(struct ArrowArrayStream*);
+
+  // Release callback: release the stream's own resources.
+  // Note that arrays returned by `get_next` must be individually released.
+  void (*release)(struct ArrowArrayStream*);
+
+  // Opaque producer-specific data
+  void* private_data;
+};
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc
index 1e602a6..5b360ab 100644
--- a/cpp/src/arrow/c/bridge.cc
+++ b/cpp/src/arrow/c/bridge.cc
@@ -18,6 +18,7 @@
 #include "arrow/c/bridge.h"
 
 #include <algorithm>
+#include <cerrno>
 #include <cstring>
 #include <string>
 #include <utility>
@@ -1501,4 +1502,197 @@ Result<std::shared_ptr<RecordBatch>> 
ImportRecordBatch(struct ArrowArray* array,
   return ImportRecordBatch(array, *maybe_schema);
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C stream export
+
+namespace {
+
+class ExportedArrayStream {
+ public:
+  struct PrivateData {
+    explicit PrivateData(std::shared_ptr<RecordBatchReader> reader)
+        : reader_(std::move(reader)) {}
+
+    std::shared_ptr<RecordBatchReader> reader_;
+    std::string last_error_;
+
+    PrivateData() = default;
+    ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateData);
+  };
+
+  explicit ExportedArrayStream(struct ArrowArrayStream* stream) : 
stream_(stream) {}
+
+  Status GetSchema(struct ArrowSchema* out_schema) {
+    return ExportSchema(*reader()->schema(), out_schema);
+  }
+
+  Status GetNext(struct ArrowArray* out_array) {
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(reader()->ReadNext(&batch));
+    if (batch == nullptr) {
+      // End of stream
+      ArrowArrayMarkReleased(out_array);
+      return Status::OK();
+    } else {
+      return ExportRecordBatch(*batch, out_array);
+    }
+  }
+
+  const char* GetLastError() {
+    const auto& last_error = private_data()->last_error_;
+    return last_error.empty() ? nullptr : last_error.c_str();
+  }
+
+  void Release() {
+    if (ArrowArrayStreamIsReleased(stream_)) {
+      return;
+    }
+    DCHECK_NE(private_data(), nullptr);
+    delete private_data();
+
+    ArrowArrayStreamMarkReleased(stream_);
+  }
+
+  // C-compatible callbacks
+
+  static int StaticGetSchema(struct ArrowArrayStream* stream,
+                             struct ArrowSchema* out_schema) {
+    ExportedArrayStream self{stream};
+    return self.ToCError(self.GetSchema(out_schema));
+  }
+
+  static int StaticGetNext(struct ArrowArrayStream* stream,
+                           struct ArrowArray* out_array) {
+    ExportedArrayStream self{stream};
+    return self.ToCError(self.GetNext(out_array));
+  }
+
+  static void StaticRelease(struct ArrowArrayStream* stream) {
+    ExportedArrayStream{stream}.Release();
+  }
+
+  static const char* StaticGetLastError(struct ArrowArrayStream* stream) {
+    return ExportedArrayStream{stream}.GetLastError();
+  }
+
+ private:
+  int ToCError(const Status& status) {
+    if (ARROW_PREDICT_TRUE(status.ok())) {
+      private_data()->last_error_.clear();
+      return 0;
+    }
+    private_data()->last_error_ = status.ToString();
+    switch (status.code()) {
+      case StatusCode::IOError:
+        return EIO;
+      case StatusCode::NotImplemented:
+        return ENOSYS;
+      case StatusCode::OutOfMemory:
+        return ENOMEM;
+      default:
+        return EINVAL;  // Fallback for Invalid, TypeError, etc.
+    }
+  }
+
+  PrivateData* private_data() {
+    return reinterpret_cast<PrivateData*>(stream_->private_data);
+  }
+
+  const std::shared_ptr<RecordBatchReader>& reader() { return 
private_data()->reader_; }
+
+  struct ArrowArrayStream* stream_;
+};
+
+}  // namespace
+
+Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
+                               struct ArrowArrayStream* out) {
+  out->get_schema = ExportedArrayStream::StaticGetSchema;
+  out->get_next = ExportedArrayStream::StaticGetNext;
+  out->get_last_error = ExportedArrayStream::StaticGetLastError;
+  out->release = ExportedArrayStream::StaticRelease;
+  out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)};
+  return Status::OK();
+}
+
+//////////////////////////////////////////////////////////////////////////
+// C stream import
+
+namespace {
+
+class ArrayStreamBatchReader : public RecordBatchReader {
+ public:
+  explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) {
+    ArrowArrayStreamMove(stream, &stream_);
+    DCHECK(!ArrowArrayStreamIsReleased(&stream_));
+  }
+
+  ~ArrayStreamBatchReader() {
+    ArrowArrayStreamRelease(&stream_);
+    DCHECK(ArrowArrayStreamIsReleased(&stream_));
+  }
+
+  std::shared_ptr<Schema> schema() const override { return CacheSchema(); }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    struct ArrowArray c_array;
+    RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array)));
+    if (ArrowArrayIsReleased(&c_array)) {
+      // End of stream
+      batch->reset();
+      return Status::OK();
+    } else {
+      return ImportRecordBatch(&c_array, CacheSchema()).Value(batch);
+    }
+  }
+
+ private:
+  std::shared_ptr<Schema> CacheSchema() const {
+    if (!schema_) {
+      struct ArrowSchema c_schema;
+      ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_, 
&c_schema)));
+      schema_ = ImportSchema(&c_schema).ValueOrDie();
+    }
+    return schema_;
+  }
+
+  Status StatusFromCError(int errno_like) const {
+    if (ARROW_PREDICT_TRUE(errno_like == 0)) {
+      return Status::OK();
+    }
+    StatusCode code;
+    switch (errno_like) {
+      case EDOM:
+      case EINVAL:
+      case ERANGE:
+        code = StatusCode::Invalid;
+        break;
+      case ENOMEM:
+        code = StatusCode::OutOfMemory;
+        break;
+      case ENOSYS:
+        code = StatusCode::NotImplemented;
+      default:
+        code = StatusCode::IOError;
+        break;
+    }
+    const char* last_error = stream_.get_last_error(&stream_);
+    return Status(code, last_error ? std::string(last_error) : "");
+  }
+
+  mutable struct ArrowArrayStream stream_;
+  mutable std::shared_ptr<Schema> schema_;
+};
+
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
+    struct ArrowArrayStream* stream) {
+  if (ArrowArrayStreamIsReleased(stream)) {
+    return Status::Invalid("Cannot import released ArrowArrayStream");
+  }
+  // XXX should we call get_schema() here to avoid crashing on error?
+  return std::make_shared<ArrayStreamBatchReader>(stream);
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h
index 8efb5d9..294f53e 100644
--- a/cpp/src/arrow/c/bridge.h
+++ b/cpp/src/arrow/c/bridge.h
@@ -29,6 +29,10 @@
 
 namespace arrow {
 
+/// \defgroup c-data-interface Functions for working with the C data interface.
+///
+/// @{
+
 /// \brief Export C++ DataType using the C data interface format.
 ///
 /// The root type is considered to have empty name and metadata.
@@ -160,4 +164,34 @@ ARROW_EXPORT
 Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* 
array,
                                                        struct ArrowSchema* 
schema);
 
+/// @}
+
+/// \defgroup c-stream-interface Functions for working with the C data 
interface.
+///
+/// @{
+
+/// \brief EXPERIMENTAL: Export C++ RecordBatchReader using the C stream 
interface.
+///
+/// The resulting ArrowArrayStream struct keeps the record batch reader alive
+/// until its release callback is called by the consumer.
+///
+/// \param[in] reader RecordBatchReader object to export
+/// \param[out] out C struct where to export the stream
+ARROW_EXPORT
+Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
+                               struct ArrowArrayStream* out);
+
+/// \brief EXPERIMENTAL: Import C++ RecordBatchReader from the C stream 
interface.
+///
+/// The ArrowArrayStream struct has its contents moved to a private object
+/// held alive by the resulting record batch reader.
+///
+/// \param[in,out] stream C stream interface struct
+/// \return Imported RecordBatchReader object
+ARROW_EXPORT
+Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
+    struct ArrowArrayStream* stream);
+
+/// @}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc
index 6695d6e..3f84edf 100644
--- a/cpp/src/arrow/c/bridge_test.cc
+++ b/cpp/src/arrow/c/bridge_test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cerrno>
 #include <deque>
 #include <functional>
 #include <string>
@@ -22,6 +23,7 @@
 #include <utility>
 #include <vector>
 
+#include <gmock/gmock-matchers.h>
 #include <gtest/gtest.h>
 
 #include "arrow/c/bridge.h"
@@ -40,6 +42,8 @@ namespace arrow {
 
 using internal::ArrayExportGuard;
 using internal::ArrayExportTraits;
+using internal::ArrayStreamExportGuard;
+using internal::ArrayStreamExportTraits;
 using internal::SchemaExportGuard;
 using internal::SchemaExportTraits;
 
@@ -78,11 +82,11 @@ class ReleaseCallback {
   explicit ReleaseCallback(CType* c_struct) : called_(false) {
     orig_release_ = c_struct->release;
     orig_private_data_ = c_struct->private_data;
-    c_struct->release = ReleaseUnbound;
+    c_struct->release = StaticRelease;
     c_struct->private_data = this;
   }
 
-  static void ReleaseUnbound(CType* c_struct) {
+  static void StaticRelease(CType* c_struct) {
     
reinterpret_cast<ReleaseCallback*>(c_struct->private_data)->Release(c_struct);
   }
 
@@ -2678,4 +2682,248 @@ TEST_F(TestArrayRoundtrip, RecordBatch) {
 
 // TODO C -> C++ -> C roundtripping tests?
 
+////////////////////////////////////////////////////////////////////////////
+// Array stream export tests
+
+class FailingRecordBatchReader : public RecordBatchReader {
+ public:
+  explicit FailingRecordBatchReader(Status error) : error_(std::move(error)) {}
+
+  static std::shared_ptr<Schema> expected_schema() { return arrow::schema({}); 
}
+
+  std::shared_ptr<Schema> schema() const override { return expected_schema(); }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { return 
error_; }
+
+ protected:
+  Status error_;
+};
+
+class BaseArrayStreamTest : public ::testing::Test {
+ public:
+  void SetUp() override {
+    pool_ = default_memory_pool();
+    orig_allocated_ = pool_->bytes_allocated();
+  }
+
+  void TearDown() override { ASSERT_EQ(pool_->bytes_allocated(), 
orig_allocated_); }
+
+  RecordBatchVector MakeBatches(std::shared_ptr<Schema> schema, ArrayVector 
arrays) {
+    DCHECK_EQ(schema->num_fields(), 1);
+    RecordBatchVector batches;
+    for (const auto& array : arrays) {
+      batches.push_back(RecordBatch::Make(schema, array->length(), {array}));
+    }
+    return batches;
+  }
+
+ protected:
+  MemoryPool* pool_;
+  int64_t orig_allocated_;
+};
+
+class TestArrayStreamExport : public BaseArrayStreamTest {
+ public:
+  void AssertStreamSchema(struct ArrowArrayStream* c_stream, const Schema& 
expected) {
+    struct ArrowSchema c_schema;
+    ASSERT_EQ(0, c_stream->get_schema(c_stream, &c_schema));
+
+    SchemaExportGuard schema_guard(&c_schema);
+    ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema));
+    ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema));
+    AssertSchemaEqual(expected, *schema);
+  }
+
+  void AssertStreamEnd(struct ArrowArrayStream* c_stream) {
+    struct ArrowArray c_array;
+    ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array));
+
+    ArrayExportGuard guard(&c_array);
+    ASSERT_TRUE(ArrowArrayIsReleased(&c_array));
+  }
+
+  void AssertStreamNext(struct ArrowArrayStream* c_stream, const RecordBatch& 
expected) {
+    struct ArrowArray c_array;
+    ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array));
+
+    ArrayExportGuard guard(&c_array);
+    ASSERT_FALSE(ArrowArrayIsReleased(&c_array));
+
+    ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array, 
expected.schema()));
+    AssertBatchesEqual(expected, *batch);
+  }
+};
+
+TEST_F(TestArrayStreamExport, Empty) {
+  auto schema = arrow::schema({field("ints", int32())});
+  auto batches = MakeBatches(schema, {});
+  ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema));
+
+  struct ArrowArrayStream c_stream;
+
+  ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+  ArrayStreamExportGuard guard(&c_stream);
+
+  ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+  AssertStreamSchema(&c_stream, *schema);
+  AssertStreamEnd(&c_stream);
+  AssertStreamEnd(&c_stream);
+}
+
+TEST_F(TestArrayStreamExport, Simple) {
+  auto schema = arrow::schema({field("ints", int32())});
+  auto batches = MakeBatches(
+      schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4, 
5, null]")});
+  ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema));
+
+  struct ArrowArrayStream c_stream;
+
+  ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+  ArrayStreamExportGuard guard(&c_stream);
+
+  ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+  AssertStreamSchema(&c_stream, *schema);
+  AssertStreamNext(&c_stream, *batches[0]);
+  AssertStreamNext(&c_stream, *batches[1]);
+  AssertStreamEnd(&c_stream);
+  AssertStreamEnd(&c_stream);
+}
+
+TEST_F(TestArrayStreamExport, ArrayLifetime) {
+  auto schema = arrow::schema({field("ints", int32())});
+  auto batches = MakeBatches(
+      schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4, 
5, null]")});
+  ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema));
+
+  struct ArrowArrayStream c_stream;
+  struct ArrowSchema c_schema;
+  struct ArrowArray c_array0, c_array1;
+
+  ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+  {
+    ArrayStreamExportGuard guard(&c_stream);
+    ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+
+    ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema));
+    ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array0));
+    ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array1));
+    AssertStreamEnd(&c_stream);
+  }
+
+  ArrayExportGuard guard0(&c_array0), guard1(&c_array1);
+
+  {
+    SchemaExportGuard schema_guard(&c_schema);
+    ASSERT_OK_AND_ASSIGN(auto got_schema, ImportSchema(&c_schema));
+    AssertSchemaEqual(*schema, *got_schema);
+  }
+
+  ASSERT_GT(pool_->bytes_allocated(), orig_allocated_);
+  ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array1, schema));
+  AssertBatchesEqual(*batches[1], *batch);
+  ASSERT_OK_AND_ASSIGN(batch, ImportRecordBatch(&c_array0, schema));
+  AssertBatchesEqual(*batches[0], *batch);
+}
+
+TEST_F(TestArrayStreamExport, Errors) {
+  auto reader =
+      std::make_shared<FailingRecordBatchReader>(Status::Invalid("some example 
error"));
+
+  struct ArrowArrayStream c_stream;
+
+  ASSERT_OK(ExportRecordBatchReader(reader, &c_stream));
+  ArrayStreamExportGuard guard(&c_stream);
+
+  struct ArrowSchema c_schema;
+  ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema));
+  ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema));
+  {
+    SchemaExportGuard schema_guard(&c_schema);
+    ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema));
+    AssertSchemaEqual(schema, arrow::schema({}));
+  }
+
+  struct ArrowArray c_array;
+  ASSERT_EQ(EINVAL, c_stream.get_next(&c_stream, &c_array));
+}
+
+////////////////////////////////////////////////////////////////////////////
+// Array stream roundtrip tests
+
+class TestArrayStreamRoundtrip : public BaseArrayStreamTest {
+ public:
+  void Roundtrip(std::shared_ptr<RecordBatchReader>* reader,
+                 struct ArrowArrayStream* c_stream) {
+    ASSERT_OK(ExportRecordBatchReader(*reader, c_stream));
+    ASSERT_FALSE(ArrowArrayStreamIsReleased(c_stream));
+
+    ASSERT_OK_AND_ASSIGN(auto got_reader, ImportRecordBatchReader(c_stream));
+    *reader = std::move(got_reader);
+  }
+
+  void Roundtrip(
+      std::shared_ptr<RecordBatchReader> reader,
+      std::function<void(const std::shared_ptr<RecordBatchReader>&)> 
check_func) {
+    ArrowArrayStream c_stream;
+
+    // NOTE: ReleaseCallback<> is not immediately usable with ArrowArrayStream,
+    // because get_next and get_schema need the original private_data.
+    std::weak_ptr<RecordBatchReader> weak_reader(reader);
+    ASSERT_EQ(weak_reader.use_count(), 1);  // Expiration check will fail 
otherwise
+
+    ASSERT_OK(ExportRecordBatchReader(std::move(reader), &c_stream));
+    ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream));
+
+    {
+      ASSERT_OK_AND_ASSIGN(auto new_reader, 
ImportRecordBatchReader(&c_stream));
+      // Stream was moved
+      ASSERT_TRUE(ArrowArrayStreamIsReleased(&c_stream));
+      ASSERT_FALSE(weak_reader.expired());
+
+      check_func(new_reader);
+    }
+    // Stream was released when `new_reader` was destroyed
+    ASSERT_TRUE(weak_reader.expired());
+  }
+
+  void AssertReaderNext(const std::shared_ptr<RecordBatchReader>& reader,
+                        const RecordBatch& expected) {
+    ASSERT_OK_AND_ASSIGN(auto batch, reader->Next());
+    ASSERT_NE(batch, nullptr);
+    AssertBatchesEqual(expected, *batch);
+  }
+
+  void AssertReaderEnd(const std::shared_ptr<RecordBatchReader>& reader) {
+    ASSERT_OK_AND_ASSIGN(auto batch, reader->Next());
+    ASSERT_EQ(batch, nullptr);
+  }
+};
+
+TEST_F(TestArrayStreamRoundtrip, Simple) {
+  auto orig_schema = arrow::schema({field("ints", int32())});
+  auto batches = MakeBatches(orig_schema, {ArrayFromJSON(int32(), "[1, 2]"),
+                                           ArrayFromJSON(int32(), "[4, 5, 
null]")});
+
+  ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, 
orig_schema));
+
+  Roundtrip(std::move(reader), [&](const std::shared_ptr<RecordBatchReader>& 
reader) {
+    AssertSchemaEqual(*orig_schema, *reader->schema());
+    AssertReaderNext(reader, *batches[0]);
+    AssertReaderNext(reader, *batches[1]);
+    AssertReaderEnd(reader);
+    AssertReaderEnd(reader);
+  });
+}
+
+TEST_F(TestArrayStreamRoundtrip, Errors) {
+  auto reader = std::make_shared<FailingRecordBatchReader>(
+      Status::Invalid("roundtrip error example"));
+
+  Roundtrip(std::move(reader), [&](const std::shared_ptr<RecordBatchReader>& 
reader) {
+    auto status = reader->Next().status();
+    ASSERT_RAISES(Invalid, status);
+    ASSERT_THAT(status.message(), ::testing::HasSubstr("roundtrip error 
example"));
+  });
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/c/helpers.h b/cpp/src/arrow/c/helpers.h
index a1a1240..a5c1f6f 100644
--- a/cpp/src/arrow/c/helpers.h
+++ b/cpp/src/arrow/c/helpers.h
@@ -82,6 +82,36 @@ inline void ArrowArrayRelease(struct ArrowArray* array) {
   }
 }
 
+/// Query whether the C array stream is released
+inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) {
+  return stream->release == NULL;
+}
+
+/// Mark the C array stream released (for use in release callbacks)
+inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) {
+  stream->release = NULL;
+}
+
+/// Move the C array stream from `src` to `dest`
+///
+/// Note `dest` must *not* point to a valid stream already, otherwise there
+/// will be a memory leak.
+inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
+                                 struct ArrowArrayStream* dest) {
+  assert(dest != src);
+  assert(!ArrowArrayStreamIsReleased(src));
+  memcpy(dest, src, sizeof(struct ArrowArrayStream));
+  ArrowArrayStreamMarkReleased(src);
+}
+
+/// Release the C array stream, if necessary, by calling its release callback
+inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
+  if (!ArrowArrayStreamIsReleased(stream)) {
+    stream->release(stream);
+    assert(ArrowArrayStreamIsReleased(stream));
+  }
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/cpp/src/arrow/c/util_internal.h b/cpp/src/arrow/c/util_internal.h
index 3ece524..6a33be9 100644
--- a/cpp/src/arrow/c/util_internal.h
+++ b/cpp/src/arrow/c/util_internal.h
@@ -34,6 +34,12 @@ struct ArrayExportTraits {
   static constexpr auto ReleaseFunc = &ArrowArrayRelease;
 };
 
+struct ArrayStreamExportTraits {
+  typedef struct ArrowArrayStream CType;
+  static constexpr auto IsReleasedFunc = &ArrowArrayStreamIsReleased;
+  static constexpr auto ReleaseFunc = &ArrowArrayStreamRelease;
+};
+
 // A RAII-style object to release a C Array / Schema struct at block scope 
exit.
 template <typename Traits>
 class ExportGuard {
@@ -73,6 +79,7 @@ class ExportGuard {
 
 using SchemaExportGuard = ExportGuard<SchemaExportTraits>;
 using ArrayExportGuard = ExportGuard<ArrayExportTraits>;
+using ArrayStreamExportGuard = ExportGuard<ArrayStreamExportTraits>;
 
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/python/CMakeLists.txt 
b/cpp/src/arrow/python/CMakeLists.txt
index b972b0d..9601557 100644
--- a/cpp/src/arrow/python/CMakeLists.txt
+++ b/cpp/src/arrow/python/CMakeLists.txt
@@ -38,6 +38,7 @@ set(ARROW_PYTHON_SRCS
     inference.cc
     init.cc
     io.cc
+    ipc.cc
     numpy_convert.cc
     numpy_to_arrow.cc
     python_to_arrow.cc
diff --git a/cpp/src/arrow/python/extension_type.h 
b/cpp/src/arrow/python/extension_type.h
index 0041c8a..f5b12e6 100644
--- a/cpp/src/arrow/python/extension_type.h
+++ b/cpp/src/arrow/python/extension_type.h
@@ -44,6 +44,7 @@ class ARROW_PYTHON_EXPORT PyExtensionType : public 
ExtensionType {
   std::string Serialize() const override;
 
   // For use from Cython
+  // Assumes that `typ` is borrowed
   static Status FromClass(const std::shared_ptr<DataType> storage_type,
                           const std::string extension_name, PyObject* typ,
                           std::shared_ptr<ExtensionType>* out);
diff --git a/cpp/src/arrow/python/ipc.cc b/cpp/src/arrow/python/ipc.cc
new file mode 100644
index 0000000..2e6c9d9
--- /dev/null
+++ b/cpp/src/arrow/python/ipc.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/ipc.h"
+
+#include <memory>
+
+#include "arrow/python/pyarrow.h"
+
+namespace arrow {
+namespace py {
+
+PyRecordBatchReader::PyRecordBatchReader() {}
+
+Status PyRecordBatchReader::Init(std::shared_ptr<Schema> schema, PyObject* 
iterable) {
+  schema_ = std::move(schema);
+
+  iterator_.reset(PyObject_GetIter(iterable));
+  return CheckPyError();
+}
+
+std::shared_ptr<Schema> PyRecordBatchReader::schema() const { return schema_; }
+
+Status PyRecordBatchReader::ReadNext(std::shared_ptr<RecordBatch>* batch) {
+  PyAcquireGIL lock;
+
+  if (!iterator_) {
+    // End of stream
+    batch->reset();
+    return Status::OK();
+  }
+
+  OwnedRef py_batch(PyIter_Next(iterator_.obj()));
+  if (!py_batch) {
+    RETURN_IF_PYERROR();
+    // End of stream
+    batch->reset();
+    iterator_.reset();
+    return Status::OK();
+  }
+
+  return unwrap_batch(py_batch.obj()).Value(batch);
+}
+
+Result<std::shared_ptr<RecordBatchReader>> PyRecordBatchReader::Make(
+    std::shared_ptr<Schema> schema, PyObject* iterable) {
+  auto reader = std::shared_ptr<PyRecordBatchReader>(new 
PyRecordBatchReader());
+  RETURN_NOT_OK(reader->Init(std::move(schema), iterable));
+  return reader;
+}
+
+}  // namespace py
+}  // namespace arrow
diff --git a/cpp/src/arrow/python/ipc.h b/cpp/src/arrow/python/ipc.h
new file mode 100644
index 0000000..92232ed
--- /dev/null
+++ b/cpp/src/arrow/python/ipc.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/python/common.h"
+#include "arrow/python/visibility.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+namespace py {
+
+class ARROW_PYTHON_EXPORT PyRecordBatchReader : public RecordBatchReader {
+ public:
+  std::shared_ptr<Schema> schema() const override;
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override;
+
+  // For use from Cython
+  // Assumes that `iterable` is borrowed
+  static Result<std::shared_ptr<RecordBatchReader>> 
Make(std::shared_ptr<Schema>,
+                                                         PyObject* iterable);
+
+ protected:
+  PyRecordBatchReader();
+
+  Status Init(std::shared_ptr<Schema>, PyObject* iterable);
+
+  std::shared_ptr<Schema> schema_;
+  OwnedRefNoGIL iterator_;
+};
+
+}  // namespace py
+}  // namespace arrow
diff --git a/docs/source/cpp/api.rst b/docs/source/cpp/api.rst
index 59d2210..626b388 100644
--- a/docs/source/cpp/api.rst
+++ b/docs/source/cpp/api.rst
@@ -29,6 +29,7 @@ API Reference
    api/scalar
    api/builder
    api/table
+   api/c_abi
    api/compute
    api/tensor
    api/utilities
diff --git a/docs/source/cpp/api.rst b/docs/source/cpp/api/c_abi.rst
similarity index 59%
copy from docs/source/cpp/api.rst
copy to docs/source/cpp/api/c_abi.rst
index 59d2210..4e451c3 100644
--- a/docs/source/cpp/api.rst
+++ b/docs/source/cpp/api/c_abi.rst
@@ -15,25 +15,34 @@
 .. specific language governing permissions and limitations
 .. under the License.
 
-*************
-API Reference
-*************
-
-.. toctree::
-   :maxdepth: 3
-
-   api/support
-   api/memory
-   api/datatype
-   api/array
-   api/scalar
-   api/builder
-   api/table
-   api/compute
-   api/tensor
-   api/utilities
-   api/io
-   api/formats
-   api/cuda
-   api/flight
-   api/filesystem
+============
+C Interfaces
+============
+
+.. seealso::
+   The :ref:`C data interface <c-data-interface>` and
+   :ref:`C stream interface <c-stream-interface>` specifications.
+
+ABI Structures
+==============
+
+.. doxygenstruct:: ArrowSchema
+   :project: arrow_cpp
+
+.. doxygenstruct:: ArrowArray
+   :project: arrow_cpp
+
+.. doxygenstruct:: ArrowArrayStream
+   :project: arrow_cpp
+
+C Data Interface
+================
+
+.. doxygengroup:: c-data-interface
+   :content-only:
+
+C Stream Interface
+==================
+
+.. doxygengroup:: c-stream-interface
+   :content-only:
diff --git a/docs/source/format/CDataInterface.rst 
b/docs/source/format/CDataInterface.rst
index 768dc47..dbecf30 100644
--- a/docs/source/format/CDataInterface.rst
+++ b/docs/source/format/CDataInterface.rst
@@ -535,6 +535,8 @@ Therefore, the consumer MUST not try to interfere with the 
producer's
 handling of these members' lifetime.  The only way the consumer influences
 data lifetime is by calling the base structure's ``release`` callback.
 
+.. _c-data-interface-released:
+
 Released structure
 ''''''''''''''''''
 
diff --git a/docs/source/format/CStreamInterface.rst 
b/docs/source/format/CStreamInterface.rst
new file mode 100644
index 0000000..b8ccce3
--- /dev/null
+++ b/docs/source/format/CStreamInterface.rst
@@ -0,0 +1,218 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. highlight:: c
+
+.. _c-stream-interface:
+
+============================
+The Arrow C stream interface
+============================
+
+.. warning::
+   This interface is experimental and may evolve based on feedback from
+   early users.  ABI stability is not guaranteed yet.  Feel free to
+   `contact us <https://arrow.apache.org/community/>`__.
+
+The C stream interface builds on the structures defined in the
+:ref:`C data interface <c-data-interface>` and combines them into a 
higher-level
+specification so as to ease the communication of streaming data within a single
+process.
+
+Semantics
+=========
+
+An Arrow C stream exposes a streaming source of data chunks, each with the
+same schema.  Chunks are obtained by calling a blocking pull-style iteration
+function.
+
+Structure definition
+====================
+
+The C stream interface is defined by a single ``struct`` definition::
+
+   struct ArrowArrayStream {
+     // Callbacks providing stream functionality
+     int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+     int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+     const char* (*get_last_error)(struct ArrowArrayStream*);
+
+     // Release callback
+     void (*release)(struct ArrowArrayStream*);
+
+     // Opaque producer-specific data
+     void* private_data;
+   };
+
+The ArrowArrayStream structure
+------------------------------
+
+The ``ArrowArrayStream`` provides the required callbacks to interact with a
+streaming source of Arrow arrays.  It has the following fields:
+
+.. c:member:: int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, 
struct ArrowSchema* out)
+
+   *Mandatory.*  This callback allows the consumer to query the schema of
+   the chunks of data in the stream.  The schema is the same for all
+   data chunks.
+
+   This callback must NOT be called on a released ``ArrowArrayStream``.
+
+   *Return value:* 0 on success, a non-zero
+   :ref:`error code <c-stream-interface-error-codes>` otherwise.
+
+.. c:member:: int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, 
struct ArrowArray* out)
+
+   *Mandatory.*  This callback allows the consumer to get the next chunk
+   of data in the stream.
+
+   This callback must NOT be called on a released ``ArrowArrayStream``.
+
+   *Return value:* 0 on success, a non-zero
+   :ref:`error code <c-stream-interface-error-codes>` otherwise.
+
+   On success, the consumer must check whether the ``ArrowArray`` is
+   marked :ref:`released <c-data-interface-released>`.  If the
+   ``ArrowArray`` is released, then the end of stream has been reached.
+   Otherwise, the ``ArrowArray`` contains a valid data chunk.
+
+.. c:member:: const char* (*ArrowArrayStream.get_last_error)(struct 
ArrowArrayStream*)
+
+   *Mandatory.*  This callback allows the consumer to get a textual description
+   of the last error.
+
+   This callback must ONLY be called if the last operation on the
+   ``ArrowArrayStream`` returned an error.  It must NOT be called on a
+   released ``ArrowArrayStream``.
+
+   *Return value:* a pointer to a NULL-terminated character string 
(UTF8-encoded).
+   NULL can also be returned if no detailed description is available.
+
+   The returned pointer is only guaranteed to be valid until the next call of
+   one of the stream's callbacks.  The character string it points to should
+   be copied to consumer-managed storage if it is intended to survive longer.
+
+.. c:member:: void (*ArrowArrayStream.release)(struct ArrowArrayStream*)
+
+   *Mandatory.*  A pointer to a producer-provided release callback.
+
+.. c:member:: void* ArrowArrayStream.private_data
+
+   *Optional.*  An opaque pointer to producer-provided private data.
+
+   Consumers MUST not process this member.  Lifetime of this member
+   is handled by the producer, and especially by the release callback.
+
+
+.. _c-stream-interface-error-codes:
+
+Error codes
+-----------
+
+The ``get_schema`` and ``get_next`` callbacks may return an error under the 
form
+of a non-zero integer code.  Such error codes should be interpreted like
+``errno`` numbers (as defined by the local platform).  Note that the symbolic
+forms of these constants are stable from platform to platform, but their 
numeric
+values are platform-specific.
+
+In particular, it is recommended to recognize the following values:
+
+* ``EINVAL``: for a parameter or input validation error
+* ``ENOMEM``: for a memory allocation failure (out of memory)
+* ``EIO``: for a generic input/output error
+
+.. seealso::
+   `Standard POSIX error codes 
<https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/errno.h.html>`__.
+
+   `Error codes recognized by the Windows C runtime library
+   
<https://docs.microsoft.com/en-us/cpp/c-runtime-library/errno-doserrno-sys-errlist-and-sys-nerr>`__.
+
+Result lifetimes
+----------------
+
+The data returned by the ``get_schema`` and ``get_next`` callbacks must be
+released independently.  Their lifetimes are not tied to that of the
+``ArrowArrayStream``.
+
+Stream lifetime
+---------------
+
+Lifetime of the C stream is managed using a release callback with similar
+usage as in the :ref:`C data interface <c-data-interface-released>`.
+
+
+C consumer example
+==================
+
+Let's say a particular database provides the following C API to execute
+a SQL query and return the result set as a Arrow C stream::
+
+   void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);
+
+Then a consumer could use the following code to iterate over the results::
+
+   static void handle_error(int errcode, struct ArrowArrayStream* stream) {
+      // Print stream error
+      const char* errdesc = stream->get_last_error(stream);
+      if (errdesc != NULL) {
+         fputs(errdesc, stderr);
+      } else {
+         fputs(strerror(errcode), stderr);
+      }
+      // Release stream and abort
+      stream->release(stream),
+      exit(1);
+   }
+
+   void run_query() {
+      struct ArrowArrayStream stream;
+      struct ArrowSchema schema;
+      struct ArrowArray chunk;
+      int errcode;
+
+      MyDB_Query("SELECT * FROM my_table", &stream);
+
+      // Query result set schema
+      errcode = stream.get_schema(&stream, &schema);
+      if (errcode != 0) {
+         handle_error(errcode, &stream);
+      }
+
+      int64_t num_rows = 0;
+
+      // Iterate over results: loop until error or end of stream
+      while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
+             chunk.release != NULL) {
+         // Do something with chunk...
+         fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
+         num_rows += chunk.length;
+
+         // Release chunk
+         chunk.release(&chunk);
+      }
+
+      // Was it an error?
+      if (errcode != 0) {
+         handle_error(errcode, &stream);
+      }
+
+      fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);
+
+      // Release schema and stream
+      schema.release(&schema);
+      stream.release(&stream);
+   }
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 2d95e22..cfcf865 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -43,6 +43,7 @@ such topics as:
    format/Flight
    format/Integration
    format/CDataInterface
+   format/CStreamInterface
    format/Other
 
 .. _toc.usage:
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 028ddc6..34c6693 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -28,7 +28,7 @@ from collections.abc import Mapping
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
-                          _CRecordBatchReader, ensure_type,
+                          RecordBatchReader, ensure_type,
                           maybe_unbox_memory_pool, get_input_stream,
                           native_transcoding_input_stream,
                           pyarrow_wrap_schema, pyarrow_wrap_table,
@@ -633,7 +633,7 @@ cdef _get_convert_options(ConvertOptions convert_options,
         out[0] = convert_options.options
 
 
-cdef class CSVStreamingReader(_CRecordBatchReader):
+cdef class CSVStreamingReader(RecordBatchReader):
     """An object that reads record batches incrementally from a CSV file.
 
     Should not be instantiated directly by user code.
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index 484fbb2..3950cee 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -35,7 +35,7 @@ from pyarrow.lib cimport *
 from pyarrow.lib import ArrowException, ArrowInvalid
 from pyarrow.lib import as_buffer, frombytes, tobytes
 from pyarrow.includes.libarrow_flight cimport *
-from pyarrow.ipc import _ReadPandasOption, _get_legacy_format_default
+from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin
 import pyarrow.lib as lib
 
 
@@ -812,7 +812,7 @@ cdef class FlightStreamChunk(_Weakrefable):
             self.chunk.data != NULL, self.chunk.app_metadata != NULL)
 
 
-cdef class _MetadataRecordBatchReader(_Weakrefable):
+cdef class _MetadataRecordBatchReader(_Weakrefable, _ReadPandasMixin):
     """A reader for Flight streams."""
 
     # Needs to be separate class so the "real" class can subclass the
@@ -869,8 +869,7 @@ cdef class _MetadataRecordBatchReader(_Weakrefable):
         return chunk
 
 
-cdef class MetadataRecordBatchReader(_MetadataRecordBatchReader,
-                                     _ReadPandasOption):
+cdef class MetadataRecordBatchReader(_MetadataRecordBatchReader):
     """The virtual base class for readers for Flight streams."""
 
 
@@ -1365,7 +1364,7 @@ cdef class RecordBatchStream(FlightDataStream):
         data_source : RecordBatchReader or Table
         options : pyarrow.ipc.IpcWriteOptions, optional
         """
-        if (not isinstance(data_source, _CRecordBatchReader) and
+        if (not isinstance(data_source, RecordBatchReader) and
                 not isinstance(data_source, lib.Table)):
             raise TypeError("Expected RecordBatchReader or Table, "
                             "but got: {}".format(type(data_source)))
@@ -1375,8 +1374,8 @@ cdef class RecordBatchStream(FlightDataStream):
     cdef CFlightDataStream* to_stream(self) except *:
         cdef:
             shared_ptr[CRecordBatchReader] reader
-        if isinstance(self.data_source, _CRecordBatchReader):
-            reader = (<_CRecordBatchReader> self.data_source).reader
+        if isinstance(self.data_source, RecordBatchReader):
+            reader = (<RecordBatchReader> self.data_source).reader
         elif isinstance(self.data_source, lib.Table):
             table = (<Table> self.data_source).table
             reader.reset(new TableBatchReader(deref(table)))
@@ -1617,7 +1616,7 @@ cdef CStatus _data_stream_next(void* self, 
CFlightPayload* payload) except *:
     else:
         result, metadata = result, None
 
-    if isinstance(result, (Table, _CRecordBatchReader)):
+    if isinstance(result, (Table, RecordBatchReader)):
         if metadata:
             raise ValueError("Can only return metadata alongside a "
                              "RecordBatch.")
diff --git a/python/pyarrow/cffi.py b/python/pyarrow/cffi.py
index 8880c25..961b61d 100644
--- a/python/pyarrow/cffi.py
+++ b/python/pyarrow/cffi.py
@@ -52,6 +52,18 @@ c_source = """
       // Opaque producer-specific data
       void* private_data;
     };
+
+    struct ArrowArrayStream {
+      int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+      int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+      const char* (*get_last_error)(struct ArrowArrayStream*);
+
+      // Release callback
+      void (*release)(struct ArrowArrayStream*);
+      // Opaque producer-specific data
+      void* private_data;
+    };
     """
 
 # TODO use out-of-line mode for faster import and avoid C parsing
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index d1c4110..5d5800e 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1393,7 +1393,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" 
nogil:
             " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
         @staticmethod
         CResult[shared_ptr[CRecordBatchReader]] Open(
-            const CInputStream* stream, const CIpcReadOptions& options)
+            const shared_ptr[CInputStream], const CIpcReadOptions&)
 
         @staticmethod
         CResult[shared_ptr[CRecordBatchReader]] Open2" Open"(
@@ -1965,6 +1965,14 @@ cdef extern from 'arrow/python/inference.h' namespace 
'arrow::py':
     c_bool IsPyFloat(object o)
 
 
+cdef extern from 'arrow/python/ipc.h' namespace 'arrow::py':
+    cdef cppclass CPyRecordBatchReader" arrow::py::PyRecordBatchReader" \
+            (CRecordBatchReader):
+        @staticmethod
+        CResult[shared_ptr[CRecordBatchReader]] Make(shared_ptr[CSchema],
+                                                     object)
+
+
 cdef extern from 'arrow/extension_type.h' namespace 'arrow':
     cdef cppclass CExtensionTypeRegistry" arrow::ExtensionTypeRegistry":
         @staticmethod
@@ -2064,6 +2072,9 @@ cdef extern from 'arrow/c/abi.h':
     cdef struct ArrowArray:
         pass
 
+    cdef struct ArrowArrayStream:
+        pass
+
 cdef extern from 'arrow/c/bridge.h' namespace 'arrow' nogil:
     CStatus ExportType(CDataType&, ArrowSchema* out)
     CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*)
@@ -2084,3 +2095,8 @@ cdef extern from 'arrow/c/bridge.h' namespace 'arrow' 
nogil:
                                                         shared_ptr[CSchema])
     CResult[shared_ptr[CRecordBatch]] ImportRecordBatch(ArrowArray*,
                                                         ArrowSchema*)
+
+    CStatus ExportRecordBatchReader(shared_ptr[CRecordBatchReader],
+                                    ArrowArrayStream*)
+    CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader(
+        ArrowArrayStream*)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index bcb7477..74a81c6 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -400,8 +400,29 @@ cdef _get_input_stream(object source, 
shared_ptr[CInputStream]* out):
     get_input_stream(source, True, out)
 
 
-cdef class _CRecordBatchReader(_Weakrefable):
-    """The base RecordBatchReader wrapper.
+class _ReadPandasMixin:
+
+    def read_pandas(self, **options):
+        """
+        Read contents of stream to a pandas.DataFrame.
+
+        Read all record batches as a pyarrow.Table then convert it to a
+        pandas.DataFrame using Table.to_pandas.
+
+        Parameters
+        ----------
+        **options : arguments to forward to Table.to_pandas
+
+        Returns
+        -------
+        df : pandas.DataFrame
+        """
+        table = self.read_all()
+        return table.to_pandas(**options)
+
+
+cdef class RecordBatchReader(_Weakrefable):
+    """Base class for reading stream of record batches.
 
     Provides common implementations of convenience methods. Should not
     be instantiated directly by user code.
@@ -413,6 +434,18 @@ cdef class _CRecordBatchReader(_Weakrefable):
         while True:
             yield self.read_next_batch()
 
+    @property
+    def schema(self):
+        """
+        Shared schema of the record batches in the stream.
+        """
+        cdef shared_ptr[CSchema] c_schema
+
+        with nogil:
+            c_schema = self.reader.get().schema()
+
+        return pyarrow_wrap_schema(c_schema)
+
     def get_next_batch(self):
         import warnings
         warnings.warn('Please use read_next_batch instead of '
@@ -447,21 +480,91 @@ cdef class _CRecordBatchReader(_Weakrefable):
             check_status(self.reader.get().ReadAll(&table))
         return pyarrow_wrap_table(table)
 
+    read_pandas = _ReadPandasMixin.read_pandas
+
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         pass
 
+    def _export_to_c(self, uintptr_t out_ptr):
+        """
+        Export to a C ArrowArrayStream struct, given its pointer.
+
+        Parameters
+        ----------
+        out_ptr: int
+            The raw pointer to a C ArrowArrayStream struct.
 
-cdef class _RecordBatchStreamReader(_CRecordBatchReader):
+        Be careful: if you don't pass the ArrowArrayStream struct to a
+        consumer, array memory will leak.  This is a low-level function
+        intended for expert users.
+        """
+        with nogil:
+            check_status(ExportRecordBatchReader(
+                self.reader, <ArrowArrayStream*> out_ptr))
+
+    @staticmethod
+    def _import_from_c(uintptr_t in_ptr):
+        """
+        Import RecordBatchReader from a C ArrowArrayStream struct,
+        given its pointer.
+
+        Parameters
+        ----------
+        in_ptr: int
+            The raw pointer to a C ArrowArrayStream struct.
+
+        This is a low-level function intended for expert users.
+        """
+        cdef:
+            shared_ptr[CRecordBatchReader] c_reader
+            RecordBatchReader self
+
+        with nogil:
+            c_reader = GetResultValue(ImportRecordBatchReader(
+                <ArrowArrayStream*> in_ptr))
+
+        self = RecordBatchReader.__new__(RecordBatchReader)
+        self.reader = c_reader
+        return self
+
+    @staticmethod
+    def from_batches(schema, batches):
+        """
+        Create RecordBatchReader from an iterable of batches.
+
+        Parameters
+        ----------
+        schema : Schema
+            The shared schema of the record batches
+        batches : Iterable[RecordBatch]
+            The batches that this reader will return.
+
+        Returns
+        -------
+        reader : RecordBatchReader
+        """
+        cdef:
+            shared_ptr[CSchema] c_schema
+            shared_ptr[CRecordBatchReader] c_reader
+            RecordBatchReader self
+
+        c_schema = pyarrow_unwrap_schema(schema)
+        c_reader = GetResultValue(CPyRecordBatchReader.Make(
+            c_schema, batches))
+
+        self = RecordBatchReader.__new__(RecordBatchReader)
+        self.reader = c_reader
+        return self
+
+
+cdef class _RecordBatchStreamReader(RecordBatchReader):
     cdef:
         shared_ptr[CInputStream] in_stream
         CIpcReadOptions options
 
-    cdef readonly:
-        Schema schema
-
     def __cinit__(self):
         pass
 
@@ -469,9 +572,7 @@ cdef class _RecordBatchStreamReader(_CRecordBatchReader):
         _get_input_stream(source, &self.in_stream)
         with nogil:
             self.reader = GetResultValue(CRecordBatchStreamReader.Open(
-                self.in_stream.get(), self.options))
-
-        self.schema = pyarrow_wrap_schema(self.reader.get().schema())
+                self.in_stream, self.options))
 
 
 cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
@@ -565,6 +666,8 @@ cdef class _RecordBatchFileReader(_Weakrefable):
 
         return pyarrow_wrap_table(table)
 
+    read_pandas = _ReadPandasMixin.read_pandas
+
     def __enter__(self):
         return self
 
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 19e80ba..65325c4 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -22,6 +22,7 @@ import os
 import pyarrow as pa
 
 from pyarrow.lib import (IpcWriteOptions, Message, MessageReader,  # noqa
+                         RecordBatchReader, _ReadPandasMixin,
                          MetadataVersion,
                          read_message, read_record_batch, read_schema,
                          read_tensor, write_tensor,
@@ -29,28 +30,7 @@ from pyarrow.lib import (IpcWriteOptions, Message, 
MessageReader,  # noqa
 import pyarrow.lib as lib
 
 
-class _ReadPandasOption:
-
-    def read_pandas(self, **options):
-        """
-        Read contents of stream to a pandas.DataFrame.
-
-        Read all record batches as a pyarrow.Table then convert it to a
-        pandas.DataFrame using Table.to_pandas.
-
-        Parameters
-        ----------
-        **options : arguments to forward to Table.to_pandas
-
-        Returns
-        -------
-        df : pandas.DataFrame
-        """
-        table = self.read_all()
-        return table.to_pandas(**options)
-
-
-class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption):
+class RecordBatchStreamReader(lib._RecordBatchStreamReader):
     """
     Reader for the Arrow streaming binary format.
 
@@ -97,7 +77,7 @@ class RecordBatchStreamWriter(lib._RecordBatchStreamWriter):
         self._open(sink, schema, options=options)
 
 
-class RecordBatchFileReader(lib._RecordBatchFileReader, _ReadPandasOption):
+class RecordBatchFileReader(lib._RecordBatchFileReader):
     """
     Class for reading Arrow record batch data from the Arrow binary file format
 
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 8e06dcd..5b2958a 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -483,7 +483,7 @@ cdef class _CRecordBatchWriter(_Weakrefable):
         shared_ptr[CRecordBatchWriter] writer
 
 
-cdef class _CRecordBatchReader(_Weakrefable):
+cdef class RecordBatchReader(_Weakrefable):
     cdef:
         shared_ptr[CRecordBatchReader] reader
 
diff --git a/python/pyarrow/tests/test_cffi.py 
b/python/pyarrow/tests/test_cffi.py
index bcf3c72..5505e57 100644
--- a/python/pyarrow/tests/test_cffi.py
+++ b/python/pyarrow/tests/test_cffi.py
@@ -26,6 +26,13 @@ except ImportError:
 
 import pytest
 
+try:
+    import pandas as pd
+    import pandas.testing as tm
+except ImportError:
+    pd = tm = None
+
+
 needs_cffi = pytest.mark.skipif(ffi is None,
                                 reason="test needs cffi package installed")
 
@@ -36,6 +43,34 @@ assert_schema_released = pytest.raises(
 assert_array_released = pytest.raises(
     ValueError, match="Cannot import released ArrowArray")
 
+assert_stream_released = pytest.raises(
+    ValueError, match="Cannot import released ArrowArrayStream")
+
+
+def make_schema():
+    return pa.schema([('ints', pa.list_(pa.int32()))],
+                     metadata={b'key1': b'value1'})
+
+
+def make_batch():
+    return pa.record_batch([[[1], [2, 42]]], make_schema())
+
+
+def make_batches():
+    schema = make_schema()
+    return [
+        pa.record_batch([[[1], [2, 42]]], schema),
+        pa.record_batch([[None, [], [5, 6]]], schema),
+    ]
+
+
+def make_serialized(schema, batches):
+    with pa.BufferOutputStream() as sink:
+        with pa.ipc.new_stream(sink, schema) as out:
+            for batch in batches:
+                out.write(batch)
+        return sink.getvalue()
+
 
 @needs_cffi
 def test_export_import_type():
@@ -120,10 +155,6 @@ def test_export_import_schema():
     c_schema = ffi.new("struct ArrowSchema*")
     ptr_schema = int(ffi.cast("uintptr_t", c_schema))
 
-    def make_schema():
-        return pa.schema([('ints', pa.list_(pa.int32()))],
-                         metadata={b'key1': b'value1'})
-
     gc.collect()  # Make sure no Arrow data dangles in a ref cycle
     old_allocated = pa.total_allocated_bytes()
 
@@ -156,13 +187,6 @@ def test_export_import_batch():
     c_array = ffi.new("struct ArrowArray*")
     ptr_array = int(ffi.cast("uintptr_t", c_array))
 
-    def make_schema():
-        return pa.schema([('ints', pa.list_(pa.int32()))],
-                         metadata={b'key1': b'value1'})
-
-    def make_batch():
-        return pa.record_batch([[[1], [2, 42]]], make_schema())
-
     gc.collect()  # Make sure no Arrow data dangles in a ref cycle
     old_allocated = pa.total_allocated_bytes()
 
@@ -172,7 +196,7 @@ def test_export_import_batch():
     py_value = batch.to_pydict()
     batch._export_to_c(ptr_array)
     assert pa.total_allocated_bytes() > old_allocated
-    # Delete recreate C++ object from exported pointer
+    # Delete and recreate C++ object from exported pointer
     del batch
     batch_new = pa.RecordBatch._import_from_c(ptr_array, schema)
     assert batch_new.to_pydict() == py_value
@@ -192,8 +216,6 @@ def test_export_import_batch():
     del batch
     batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
     assert batch_new.to_pydict() == py_value
-    print(batch_new.schema)
-    print(make_schema())
     assert batch_new.schema == make_schema()
     assert pa.total_allocated_bytes() > old_allocated
     del batch_new
@@ -211,3 +233,63 @@ def test_export_import_batch():
     # Now released
     with assert_schema_released:
         pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
+
+
+def _export_import_batch_reader(ptr_stream, reader_factory):
+    # Prepare input
+    batches = make_batches()
+    schema = batches[0].schema
+
+    reader = reader_factory(schema, batches)
+    reader._export_to_c(ptr_stream)
+    # Delete and recreate C++ object from exported pointer
+    del reader, batches
+
+    reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
+    assert reader_new.schema == schema
+    got_batches = list(reader_new)
+    del reader_new
+    assert got_batches == make_batches()
+
+    # Test read_pandas()
+    if pd is not None:
+        batches = make_batches()
+        schema = batches[0].schema
+        expected_df = pa.Table.from_batches(batches).to_pandas()
+
+        reader = reader_factory(schema, batches)
+        reader._export_to_c(ptr_stream)
+        del reader, batches
+
+        reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
+        got_df = reader_new.read_pandas()
+        del reader_new
+        tm.assert_frame_equal(expected_df, got_df)
+
+
+def make_ipc_stream_reader(schema, batches):
+    return pa.ipc.open_stream(make_serialized(schema, batches))
+
+
+def make_py_record_batch_reader(schema, batches):
+    return pa.ipc.RecordBatchReader.from_batches(schema, batches)
+
+
+@needs_cffi
[email protected]('reader_factory',
+                         [make_ipc_stream_reader,
+                          make_py_record_batch_reader])
+def test_export_import_batch_reader(reader_factory):
+    c_stream = ffi.new("struct ArrowArrayStream*")
+    ptr_stream = int(ffi.cast("uintptr_t", c_stream))
+
+    gc.collect()  # Make sure no Arrow data dangles in a ref cycle
+    old_allocated = pa.total_allocated_bytes()
+
+    _export_import_batch_reader(ptr_stream, reader_factory)
+
+    assert pa.total_allocated_bytes() == old_allocated
+
+    # Now released
+    with assert_stream_released:
+        pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 44f8499..3d3e72e 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -15,11 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from collections import UserList
 import io
 import pytest
 import socket
 import sys
 import threading
+import weakref
 
 import numpy as np
 
@@ -858,3 +860,36 @@ def test_write_empty_ipc_file():
         table = reader.read_all()
     assert len(table) == 0
     assert table.schema.equals(schema)
+
+
+def test_py_record_batch_reader():
+    def make_schema():
+        return pa.schema([('field', pa.int64())])
+
+    def make_batches():
+        schema = make_schema()
+        batch1 = pa.record_batch([[1, 2, 3]], schema=schema)
+        batch2 = pa.record_batch([[4, 5]], schema=schema)
+        return [batch1, batch2]
+
+    # With iterable
+    batches = UserList(make_batches())  # weakrefable
+    wr = weakref.ref(batches)
+
+    with pa.ipc.RecordBatchReader.from_batches(make_schema(),
+                                               batches) as reader:
+        batches = None
+        assert wr() is not None
+        assert list(reader) == make_batches()
+        assert wr() is None
+
+    # With iterator
+    batches = iter(UserList(make_batches()))  # weakrefable
+    wr = weakref.ref(batches)
+
+    with pa.ipc.RecordBatchReader.from_batches(make_schema(),
+                                               batches) as reader:
+        batches = None
+        assert wr() is not None
+        assert list(reader) == make_batches()
+        assert wr() is None

Reply via email to