This is an automated email from the ASF dual-hosted git repository.

amolina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new cd3c6ead97 ARROW-16913: [Java] Implement ArrowArrayStream (#13465)
cd3c6ead97 is described below

commit cd3c6ead97d584366aafd2f14d99a1cb8ace9ca2
Author: David Li <[email protected]>
AuthorDate: Tue Jul 5 07:48:22 2022 -0400

    ARROW-16913: [Java] Implement ArrowArrayStream (#13465)
    
    Implements ArrowArrayStream for Java. The equivalent Java-side interface 
chosen is ArrowReader.
    
    Also:
    - Fixes a couple of JDK9 compatibility issues I ran into. I _think_ these 
will not normally affect people except during development (I think because I 
was mixing IntelliJ and Maven).
    - Manually clang-format the C++ code. Clean up some things to match Arrow 
convention and remove some unused declarations.
    - Extends the DictionaryProvider interface. This is a potentially breaking 
change; we could make the method default (and raise an exception) instead.
    
    Authored-by: David Li <[email protected]>
    Signed-off-by: Alessandro Molina <[email protected]>
---
 cpp/src/arrow/c/bridge.cc                          |  11 +-
 docs/source/python/integration/python_java.rst     | 163 ++++++++--
 java/c/CMakeLists.txt                              |   1 +
 java/c/pom.xml                                     |   5 +
 java/c/src/main/cpp/jni_wrapper.cc                 | 330 +++++++++++++++++---
 .../org/apache/arrow/c/ArrayStreamExporter.java    | 115 +++++++
 .../java/org/apache/arrow/c/ArrowArrayStream.java  | 194 ++++++++++++
 .../org/apache/arrow/c/ArrowArrayStreamReader.java |  95 ++++++
 .../apache/arrow/c/CDataDictionaryProvider.java    |   1 +
 java/c/src/main/java/org/apache/arrow/c/Data.java  |  21 ++
 .../main/java/org/apache/arrow/c/NativeUtil.java   |   4 +-
 .../{JniWrapper.java => CDataJniException.java}    |  38 ++-
 .../java/org/apache/arrow/c/jni/JniWrapper.java    |   8 +
 .../java/org/apache/arrow/c/RoundtripTest.java     |   3 -
 .../test/java/org/apache/arrow/c/StreamTest.java   | 332 +++++++++++++++++++++
 java/c/src/test/python/integration_tests.py        |  51 ++++
 java/pom.xml                                       |   6 +
 .../vector/dictionary/DictionaryProvider.java      |   4 +
 .../org/apache/arrow/vector/ipc/ArrowReader.java   |   6 +
 .../apache/arrow/vector/ipc/JsonFileReader.java    |   6 +
 .../vector/ipc/message/MessageSerializer.java      |   3 +-
 python/pyarrow/includes/libarrow.pxd               |   1 +
 python/pyarrow/ipc.pxi                             |  13 +-
 23 files changed, 1329 insertions(+), 82 deletions(-)

diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc
index f2671b5016..de531dbc60 100644
--- a/cpp/src/arrow/c/bridge.cc
+++ b/cpp/src/arrow/c/bridge.cc
@@ -1748,7 +1748,9 @@ class ArrayStreamBatchReader : public RecordBatchReader {
   }
 
   ~ArrayStreamBatchReader() {
-    ArrowArrayStreamRelease(&stream_);
+    if (!ArrowArrayStreamIsReleased(&stream_)) {
+      ArrowArrayStreamRelease(&stream_);
+    }
     DCHECK(ArrowArrayStreamIsReleased(&stream_));
   }
 
@@ -1766,6 +1768,13 @@ class ArrayStreamBatchReader : public RecordBatchReader {
     }
   }
 
+  Status Close() override {
+    if (!ArrowArrayStreamIsReleased(&stream_)) {
+      ArrowArrayStreamRelease(&stream_);
+    }
+    return Status::OK();
+  }
+
  private:
   std::shared_ptr<Schema> CacheSchema() const {
     if (!schema_) {
diff --git a/docs/source/python/integration/python_java.rst 
b/docs/source/python/integration/python_java.rst
index c191682721..a524fe9b48 100644
--- a/docs/source/python/integration/python_java.rst
+++ b/docs/source/python/integration/python_java.rst
@@ -29,7 +29,7 @@ marshaling and unmarshaling data.
 
     The article takes for granted that you have a ``Python`` environment
     with ``pyarrow`` correctly installed and a ``Java`` environment with
-    ``arrow`` library correctly installed. 
+    ``arrow`` library correctly installed.
     The ``Arrow Java`` version must have been compiled with ``mvn 
-Parrow-c-data`` to
     ensure CData exchange support is enabled.
     See `Python Install Instructions 
<https://arrow.apache.org/docs/python/install.html>`_
@@ -53,7 +53,7 @@ We would save such class in the ``Simple.java`` file and 
proceed with
 compiling it to ``Simple.class`` using ``javac Simple.java``.
 
 Once the ``Simple.class`` file is created we can use the class
-from Python using the 
+from Python using the
 `JPype <https://jpype.readthedocs.io/>`_ library which
 enables a Java runtime within the Python interpreter.
 
@@ -64,11 +64,11 @@ enables a Java runtime within the Python interpreter.
     $ pip install jpype1
 
 The most basic thing we can do with our ``Simple`` class is to
-use the ``Simple.getNumber`` method from Python and see 
+use the ``Simple.getNumber`` method from Python and see
 if it will return the result.
 
 To do so, we can create a ``simple.py`` file which uses ``jpype`` to
-import the ``Simple`` class from ``Simple.class`` file and invoke 
+import the ``Simple`` class from ``Simple.class`` file and invoke
 the ``Simple.getNumber`` method:
 
 .. code-block:: python
@@ -87,7 +87,7 @@ to access the ``Java`` method and print the expected result:
 
 .. code-block:: console
 
-    $ python simple.py 
+    $ python simple.py
     4
 
 Java to Python using pyarrow.jvm
@@ -132,7 +132,7 @@ class, named ``FillTen.java``
     }
 
 This class provides a public ``createArray`` method that anyone can invoke
-to get back an array containing numbers from 1 to 10. 
+to get back an array containing numbers from 1 to 10.
 
 Given that this class now has a dependency on a bunch of packages,
 compiling it with ``javac`` is not enough anymore. We need to create
@@ -142,7 +142,7 @@ a dedicated ``pom.xml`` file where we can collect the 
dependencies:
 
     <project>
         <modelVersion>4.0.0</modelVersion>
-        
+
         <groupId>org.apache.arrow.py2java</groupId>
         <artifactId>FillTen</artifactId>
         <version>1</version>
@@ -150,7 +150,7 @@ a dedicated ``pom.xml`` file where we can collect the 
dependencies:
         <properties>
             <maven.compiler.source>8</maven.compiler.source>
             <maven.compiler.target>8</maven.compiler.target>
-        </properties> 
+        </properties>
 
         <dependencies>
             <dependency>
@@ -170,7 +170,7 @@ a dedicated ``pom.xml`` file where we can collect the 
dependencies:
             <artifactId>arrow-vector</artifactId>
             <version>8.0.0</version>
             <type>pom</type>
-            </dependency> 
+            </dependency>
             <dependency>
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-c-data</artifactId>
@@ -182,22 +182,22 @@ a dedicated ``pom.xml`` file where we can collect the 
dependencies:
 
 Once the ``FillTen.java`` file with the class is created
 as ``src/main/java/FillTen.java`` we can use ``maven`` to
-compile the project with ``mvn package`` and get it 
+compile the project with ``mvn package`` and get it
 available in the ``target`` directory.
 
 .. code-block:: console
 
     $ mvn package
     [INFO] Scanning for projects...
-    [INFO] 
+    [INFO]
     [INFO] ------------------< org.apache.arrow.py2java:FillTen 
>------------------
     [INFO] Building FillTen 1
     [INFO] --------------------------------[ jar 
]---------------------------------
-    [INFO] 
+    [INFO]
     [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen 
---
     [INFO] Changes detected - recompiling the module!
     [INFO] Compiling 1 source file to /experiments/java2py/target/classes
-    [INFO] 
+    [INFO]
     [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
     [INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
     [INFO] 
------------------------------------------------------------------------
@@ -215,11 +215,11 @@ We can use ``maven`` to collect all dependencies and make 
them available in a si
 
     $ mvn 
org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies 
-DoutputDirectory=dependencies
     [INFO] Scanning for projects...
-    [INFO] 
+    [INFO]
     [INFO] ------------------< org.apache.arrow.py2java:FillTen 
>------------------
     [INFO] Building FillTen 1
     [INFO] --------------------------------[ jar 
]---------------------------------
-    [INFO] 
+    [INFO]
     [INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ 
FillTen ---
     [INFO] Copying jsr305-3.0.2.jar to 
/experiments/java2py/dependencies/jsr305-3.0.2.jar
     [INFO] Copying netty-common-4.1.72.Final.jar to 
/experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
@@ -246,9 +246,9 @@ We can use ``maven`` to collect all dependencies and make 
them available in a si
     Instead of manually collecting dependencies, you could also rely on the
     ``maven-assembly-plugin`` to build a single ``jar`` with all dependencies.
 
-Once our package and all its depdendencies are available, 
+Once our package and all its depdendencies are available,
 we can invoke it from ``fillten_pyarrowjvm.py`` script that will
-import the ``FillTen`` class and print out the result of invoking 
``FillTen.createArray`` 
+import the ``FillTen`` class and print out the result of invoking 
``FillTen.createArray``
 
 .. code-block:: python
 
@@ -291,7 +291,7 @@ Running the python script will lead to two lines getting 
printed:
 
 The first line is the raw result of invoking the ``FillTen.createArray`` 
method.
 The resulting object is a proxy to the actual Java object, so it's not really 
a pyarrow
-Array, it will lack most of its capabilities and methods. 
+Array, it will lack most of its capabilities and methods.
 That's why we subsequently use ``pyarrow.jvm.array`` to convert it to an actual
 ``pyarrow`` array. That allows us to treat it like any other ``pyarrow`` array.
 The result is the second line in the output where the array is correctly 
reported
@@ -441,3 +441,130 @@ values printed by the Python script have been properly 
changed by the Java code:
         9,
         10
     ]
+
+We can also use the C Stream Interface to exchange
+:py:class:`pyarrow.RecordBatchReader`s between Java and Python.  We'll
+use this Java class as a demo, which lets you read an Arrow IPC file
+via Java's implementation, or write data to a JSON file:
+
+.. code-block:: java
+
+   import java.io.File;
+   import java.nio.file.Files;
+   import java.nio.file.Paths;
+
+   import org.apache.arrow.c.ArrowArrayStream;
+   import org.apache.arrow.c.Data;
+   import org.apache.arrow.memory.BufferAllocator;
+   import org.apache.arrow.memory.RootAllocator;
+   import org.apache.arrow.vector.ipc.ArrowFileReader;
+   import org.apache.arrow.vector.ipc.ArrowReader;
+   import org.apache.arrow.vector.ipc.JsonFileWriter;
+
+   public class PythonInteropDemo implements AutoCloseable {
+     private final BufferAllocator allocator;
+
+     public PythonInteropDemo() {
+       this.allocator = new RootAllocator();
+     }
+
+     public void exportStream(String path, long cStreamPointer) throws 
Exception {
+       try (final ArrowArrayStream stream = 
ArrowArrayStream.wrap(cStreamPointer)) {
+         ArrowFileReader reader = new 
ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
+         Data.exportArrayStream(allocator, reader, stream);
+       }
+     }
+
+     public void importStream(String path, long cStreamPointer) throws 
Exception {
+       try (final ArrowArrayStream stream = 
ArrowArrayStream.wrap(cStreamPointer);
+            final ArrowReader input = Data.importArrayStream(allocator, 
stream);
+            JsonFileWriter writer = new JsonFileWriter(new File(path))) {
+         writer.start(input.getVectorSchemaRoot().getSchema(), input);
+         while (input.loadNextBatch()) {
+           writer.write(input.getVectorSchemaRoot());
+         }
+       }
+     }
+
+     @Override
+     public void close() throws Exception {
+       allocator.close();
+     }
+   }
+
+On the Python side, we'll use JPype as before, except this time we'll
+send RecordBatchReaders back and forth:
+
+.. code-block:: python
+
+   import tempfile
+
+   import jpype
+   import jpype.imports
+   from jpype.types import *
+
+   # Init the JVM and make demo class available to Python.
+   jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
+   PythonInteropDemo = JClass("PythonInteropDemo")
+   demo = PythonInteropDemo()
+
+   # Create a Python record batch reader
+   import pyarrow as pa
+   schema = pa.schema([
+       ("ints", pa.int64()),
+       ("strs", pa.string())
+   ])
+   batches = [
+       pa.record_batch([
+           [0, 2, 4, 8],
+           ["a", "b", "c", None],
+       ], schema=schema),
+       pa.record_batch([
+           [None, 32, 64, None],
+           ["e", None, None, "h"],
+       ], schema=schema),
+   ]
+   reader = pa.RecordBatchReader.from_batches(schema, batches)
+
+   from pyarrow.cffi import ffi as arrow_c
+
+   # Export the Python reader through C Data
+   c_stream = arrow_c.new("struct ArrowArrayStream*")
+   c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
+   reader._export_to_c(c_stream_ptr)
+
+   # Send reader to the Java function that writes a JSON file
+   with tempfile.NamedTemporaryFile() as temp:
+       demo.importStream(temp.name, c_stream_ptr)
+
+       # Read the JSON file back
+       with open(temp.name) as source:
+           print("JSON file written by Java:")
+           print(source.read())
+
+
+   # Write an Arrow IPC file for Java to read
+   with tempfile.NamedTemporaryFile() as temp:
+       with pa.ipc.new_file(temp.name, schema) as sink:
+           for batch in batches:
+               sink.write_batch(batch)
+
+       demo.exportStream(temp.name, c_stream_ptr)
+       with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
+           print("IPC file read by Java:")
+           print(source.read_all())
+
+.. code-block:: console
+
+   $ mvn package
+   $ mvn 
org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies 
-DoutputDirectory=dependencies
+   $ python demo.py
+   JSON file written by Java:
+   
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"na
 [...]
+   IPC file read by Java:
+   pyarrow.Table
+   ints: int64
+   strs: string
+   ----
+   ints: [[0,2,4,8],[null,32,64,null]]
+   strs: [["a","b","c",null],["e",null,null,"h"]]
diff --git a/java/c/CMakeLists.txt b/java/c/CMakeLists.txt
index 1025f87afb..05938508de 100644
--- a/java/c/CMakeLists.txt
+++ b/java/c/CMakeLists.txt
@@ -35,6 +35,7 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR} 
${CMAKE_CURRENT_SOURCE_DIR}
                     ${JNI_INCLUDE_DIRS} ${JNI_HEADERS_DIR})
 
 add_jar(${PROJECT_NAME}
+        src/main/java/org/apache/arrow/c/jni/CDataJniException.java
         src/main/java/org/apache/arrow/c/jni/JniLoader.java
         src/main/java/org/apache/arrow/c/jni/JniWrapper.java
         src/main/java/org/apache/arrow/c/jni/PrivateData.java
diff --git a/java/c/pom.xml b/java/c/pom.xml
index 930c5b22d6..6d0632ea16 100644
--- a/java/c/pom.xml
+++ b/java/c/pom.xml
@@ -62,6 +62,11 @@
             <version>${dep.guava.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <resources>
diff --git a/java/c/src/main/cpp/jni_wrapper.cc 
b/java/c/src/main/cpp/jni_wrapper.cc
index cfb0af9bcb..fea53aff49 100644
--- a/java/c/src/main/cpp/jni_wrapper.cc
+++ b/java/c/src/main/cpp/jni_wrapper.cc
@@ -18,6 +18,8 @@
 #include <jni.h>
 
 #include <cassert>
+#include <cstring>
+#include <iostream>
 #include <memory>
 #include <stdexcept>
 #include <string>
@@ -27,19 +29,19 @@
 
 namespace {
 
-jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
-  jclass local_class = env->FindClass(class_name);
-  jclass global_class = (jclass)env->NewGlobalRef(local_class);
-  env->DeleteLocalRef(local_class);
-  return global_class;
-}
+jclass kObjectClass;
+jclass kRuntimeExceptionClass;
+jclass kPrivateDataClass;
+jclass kCDataExceptionClass;
+jclass kStreamPrivateDataClass;
 
-jclass illegal_access_exception_class;
-jclass illegal_argument_exception_class;
-jclass runtime_exception_class;
-jclass private_data_class;
+jfieldID kPrivateDataLastErrorField;
 
-jmethodID private_data_close_method;
+jmethodID kObjectToStringMethod;
+jmethodID kPrivateDataCloseMethod;
+jmethodID kPrivateDataGetNextMethod;
+jmethodID kPrivateDataGetSchemaMethod;
+jmethodID kCDataExceptionConstructor;
 
 jint JNI_VERSION = JNI_VERSION_1_6;
 
@@ -54,16 +56,43 @@ void ThrowPendingException(const std::string& message) {
 
 void JniThrow(std::string message) { ThrowPendingException(message); }
 
+jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
+  jclass local_class = env->FindClass(class_name);
+  if (!local_class) {
+    std::string message = "Could not find class ";
+    message += class_name;
+    ThrowPendingException(message);
+  }
+  jclass global_class = (jclass)env->NewGlobalRef(local_class);
+  if (!global_class) {
+    std::string message = "Could not create global reference to class ";
+    message += class_name;
+    ThrowPendingException(message);
+  }
+  env->DeleteLocalRef(local_class);
+  return global_class;
+}
+
 jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const 
char* sig) {
   jmethodID ret = env->GetMethodID(this_class, name, sig);
   if (ret == nullptr) {
     std::string error_message = "Unable to find method " + std::string(name) +
-                                " within signature " + std::string(sig);
+                                " with signature " + std::string(sig);
     ThrowPendingException(error_message);
   }
   return ret;
 }
 
+jfieldID GetFieldID(JNIEnv* env, jclass this_class, const char* name, const 
char* sig) {
+  jfieldID fieldId = env->GetFieldID(this_class, name, sig);
+  if (fieldId == nullptr) {
+    std::string error_message = "Unable to find field " + std::string(name) +
+                                " with signature " + std::string(sig);
+    ThrowPendingException(error_message);
+  }
+  return fieldId;
+}
+
 class InnerPrivateData {
  public:
   InnerPrivateData(JavaVM* vm, jobject private_data)
@@ -71,6 +100,8 @@ class InnerPrivateData {
 
   JavaVM* vm_;
   jobject j_private_data_;
+  // Only for ArrowArrayStream
+  std::string last_error_;
 };
 
 class JNIEnvGuard {
@@ -132,32 +163,164 @@ void release_exported(T* base) {
   InnerPrivateData* private_data =
       reinterpret_cast<InnerPrivateData*>(base->private_data);
 
+  // It is possible for the JVM to be shut down when this is called;
+  // guard against that.  Example: Python code using JPype may shut
+  // down the JVM before releasing the stream.
+  try {
+    JNIEnvGuard guard(private_data->vm_);
+    JNIEnv* env = guard.env();
+
+    env->CallObjectMethod(private_data->j_private_data_, 
kPrivateDataCloseMethod);
+    if (env->ExceptionCheck()) {
+      // Can't signal this to caller, so log and then try to free things
+      // as best we can
+      env->ExceptionDescribe();
+      env->ExceptionClear();
+    }
+    env->DeleteGlobalRef(private_data->j_private_data_);
+  } catch (const JniPendingException& e) {
+    std::cerr << "WARNING: Failed to release Java C Data resource: " << 
e.what()
+              << std::endl;
+  }
+  delete private_data;
+  base->private_data = nullptr;
+
+  // Mark released
+  base->release = nullptr;
+}
+
+// Attempt to copy the JVM-side lastError to the C++ side
+void TryCopyLastError(JNIEnv* env, InnerPrivateData* private_data) {
+  jobject error_data =
+      env->GetObjectField(private_data->j_private_data_, 
kPrivateDataLastErrorField);
+  if (!error_data) {
+    private_data->last_error_.clear();
+    return;
+  }
+
+  auto arr = reinterpret_cast<jbyteArray>(error_data);
+  jbyte* error_bytes = env->GetByteArrayElements(arr, nullptr);
+  if (!error_bytes) {
+    private_data->last_error_.clear();
+    return;
+  }
+
+  char* error_str = reinterpret_cast<char*>(error_bytes);
+  private_data->last_error_ = std::string(error_str, std::strlen(error_str));
+
+  env->ReleaseByteArrayElements(arr, error_bytes, JNI_ABORT);
+}
+
+// Normally the Java side catches all exceptions and populates
+// lastError. If that fails we check for an exception and try to
+// populate last_error_ ourselves.
+void TryHandleUncaughtException(JNIEnv* env, InnerPrivateData* private_data,
+                                jthrowable exc) {
+  jstring message =
+      reinterpret_cast<jstring>(env->CallObjectMethod(exc, 
kObjectToStringMethod));
+  if (!message) {
+    private_data->last_error_.clear();
+    return;
+  }
+  const char* str = env->GetStringUTFChars(message, 0);
+  if (!str) {
+    private_data->last_error_.clear();
+    return;
+  }
+  private_data->last_error_ = str;
+  env->ReleaseStringUTFChars(message, 0);
+}
+
+int ArrowArrayStreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) {
+  assert(stream->private_data != nullptr);
+  InnerPrivateData* private_data =
+      reinterpret_cast<InnerPrivateData*>(stream->private_data);
   JNIEnvGuard guard(private_data->vm_);
   JNIEnv* env = guard.env();
 
-  env->CallObjectMethod(private_data->j_private_data_, 
private_data_close_method);
-  if (env->ExceptionCheck()) {
-    env->ExceptionDescribe();
+  const jlong out_addr = static_cast<jlong>(reinterpret_cast<uintptr_t>(out));
+  const int err_code = env->CallIntMethod(private_data->j_private_data_,
+                                          kPrivateDataGetSchemaMethod, 
out_addr);
+  if (jthrowable exc = env->ExceptionOccurred()) {
+    TryHandleUncaughtException(env, private_data, exc);
     env->ExceptionClear();
-    ThrowPendingException("Error calling close of private data");
+    return EIO;
+  } else if (err_code != 0) {
+    TryCopyLastError(env, private_data);
+  }
+  return err_code;
+}
+
+int ArrowArrayStreamGetNext(ArrowArrayStream* stream, ArrowArray* out) {
+  assert(stream->private_data != nullptr);
+  InnerPrivateData* private_data =
+      reinterpret_cast<InnerPrivateData*>(stream->private_data);
+  JNIEnvGuard guard(private_data->vm_);
+  JNIEnv* env = guard.env();
+
+  const jlong out_addr = static_cast<jlong>(reinterpret_cast<uintptr_t>(out));
+  const int err_code = env->CallIntMethod(private_data->j_private_data_,
+                                          kPrivateDataGetNextMethod, out_addr);
+  if (jthrowable exc = env->ExceptionOccurred()) {
+    TryHandleUncaughtException(env, private_data, exc);
+    env->ExceptionClear();
+    return EIO;
+  } else if (err_code != 0) {
+    TryCopyLastError(env, private_data);
+  }
+  return err_code;
+}
+
+const char* ArrowArrayStreamGetLastError(ArrowArrayStream* stream) {
+  assert(stream->private_data != nullptr);
+  InnerPrivateData* private_data =
+      reinterpret_cast<InnerPrivateData*>(stream->private_data);
+  JNIEnvGuard guard(private_data->vm_);
+  JNIEnv* env = guard.env();
+
+  if (private_data->last_error_.empty()) return nullptr;
+  return private_data->last_error_.c_str();
+}
+
+void ArrowArrayStreamRelease(ArrowArrayStream* stream) {
+  // This should not be called on already released structure
+  assert(stream->release != nullptr);
+  // Release all data directly owned by the struct
+  InnerPrivateData* private_data =
+      reinterpret_cast<InnerPrivateData*>(stream->private_data);
+
+  // It is possible for the JVM to be shut down (see above)
+  try {
+    JNIEnvGuard guard(private_data->vm_);
+    JNIEnv* env = guard.env();
+
+    env->CallObjectMethod(private_data->j_private_data_, 
kPrivateDataCloseMethod);
+    if (env->ExceptionCheck()) {
+      env->ExceptionDescribe();
+      env->ExceptionClear();
+    }
+    env->DeleteGlobalRef(private_data->j_private_data_);
+  } catch (const JniPendingException& e) {
+    std::cerr << "WARNING: Failed to release Java ArrowArrayStream: " << 
e.what()
+              << std::endl;
   }
-  env->DeleteGlobalRef(private_data->j_private_data_);
   delete private_data;
-  base->private_data = nullptr;
+  stream->private_data = nullptr;
 
   // Mark released
-  base->release = nullptr;
+  stream->release = nullptr;
 }
+
 }  // namespace
 
 #define JNI_METHOD_START try {
 // macro ended
 
-#define JNI_METHOD_END(fallback_expr)                 \
-  }                                                   \
-  catch (JniPendingException & e) {                   \
-    env->ThrowNew(runtime_exception_class, e.what()); \
-    return fallback_expr;                             \
+#define JNI_METHOD_END(fallback_expr)                \
+  }                                                  \
+  catch (JniPendingException & e) {                  \
+    env->ThrowNew(kRuntimeExceptionClass, e.what()); \
+    return fallback_expr;                            \
   }
 // macro ended
 
@@ -167,16 +330,28 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
     return JNI_ERR;
   }
   JNI_METHOD_START
-  illegal_access_exception_class =
-      CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;");
-  illegal_argument_exception_class =
-      CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;");
-  runtime_exception_class =
+  kObjectClass = CreateGlobalClassReference(env, "Ljava/lang/Object;");
+  kRuntimeExceptionClass =
       CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;");
-  private_data_class =
+  kPrivateDataClass =
       CreateGlobalClassReference(env, "Lorg/apache/arrow/c/jni/PrivateData;");
-
-  private_data_close_method = GetMethodID(env, private_data_class, "close", 
"()V");
+  kCDataExceptionClass =
+      CreateGlobalClassReference(env, 
"Lorg/apache/arrow/c/jni/CDataJniException;");
+  kStreamPrivateDataClass = CreateGlobalClassReference(
+      env, 
"Lorg/apache/arrow/c/ArrayStreamExporter$ExportedArrayStreamPrivateData;");
+
+  kPrivateDataLastErrorField =
+      GetFieldID(env, kStreamPrivateDataClass, "lastError", "[B");
+
+  kObjectToStringMethod =
+      GetMethodID(env, kObjectClass, "toString", "()Ljava/lang/String;");
+  kPrivateDataCloseMethod = GetMethodID(env, kPrivateDataClass, "close", 
"()V");
+  kPrivateDataGetNextMethod =
+      GetMethodID(env, kStreamPrivateDataClass, "getNext", "(J)I");
+  kPrivateDataGetSchemaMethod =
+      GetMethodID(env, kStreamPrivateDataClass, "getSchema", "(J)I");
+  kCDataExceptionConstructor =
+      GetMethodID(env, kCDataExceptionClass, "<init>", 
"(ILjava/lang/String;)V");
 
   return JNI_VERSION;
   JNI_METHOD_END(JNI_ERR)
@@ -185,9 +360,11 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
 void JNI_OnUnload(JavaVM* vm, void* reserved) {
   JNIEnv* env;
   vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION);
-  env->DeleteGlobalRef(illegal_access_exception_class);
-  env->DeleteGlobalRef(illegal_argument_exception_class);
-  env->DeleteGlobalRef(runtime_exception_class);
+  env->DeleteGlobalRef(kObjectClass);
+  env->DeleteGlobalRef(kRuntimeExceptionClass);
+  env->DeleteGlobalRef(kPrivateDataClass);
+  env->DeleteGlobalRef(kCDataExceptionClass);
+  env->DeleteGlobalRef(kStreamPrivateDataClass);
 }
 
 /*
@@ -220,6 +397,65 @@ 
Java_org_apache_arrow_c_jni_JniWrapper_releaseArray(JNIEnv* env, jobject, jlong
   JNI_METHOD_END()
 }
 
+/*
+ * Class:     org_apache_arrow_c_jni_JniWrapper
+ * Method:    getNextArrayStream
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_c_jni_JniWrapper_getNextArrayStream(
+    JNIEnv* env, jobject, jlong address, jlong out_address) {
+  JNI_METHOD_START
+  auto* stream = reinterpret_cast<ArrowArrayStream*>(address);
+  auto* out = reinterpret_cast<ArrowArray*>(out_address);
+  const int err_code = stream->get_next(stream, out);
+  if (err_code != 0) {
+    const char* message = stream->get_last_error(stream);
+    if (!message) message = std::strerror(err_code);
+    jstring java_message = env->NewStringUTF(message);
+    jthrowable exception = static_cast<jthrowable>(env->NewObject(
+        kCDataExceptionClass, kCDataExceptionConstructor, err_code, 
java_message));
+    env->Throw(exception);
+  }
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_c_jni_JniWrapper
+ * Method:    getSchemaArrayStream
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_c_jni_JniWrapper_getSchemaArrayStream(
+    JNIEnv* env, jobject, jlong address, jlong out_address) {
+  JNI_METHOD_START
+  auto* stream = reinterpret_cast<ArrowArrayStream*>(address);
+  auto* out = reinterpret_cast<ArrowSchema*>(out_address);
+  const int err_code = stream->get_schema(stream, out);
+  if (err_code != 0) {
+    const char* message = stream->get_last_error(stream);
+    if (!message) message = std::strerror(err_code);
+    jstring java_message = env->NewStringUTF(message);
+    jthrowable exception = static_cast<jthrowable>(env->NewObject(
+        kCDataExceptionClass, kCDataExceptionConstructor, err_code, 
java_message));
+    env->Throw(exception);
+  }
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_c_jni_JniWrapper
+ * Method:    releaseArrayStream
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_c_jni_JniWrapper_releaseArrayStream(
+    JNIEnv* env, jobject, jlong address) {
+  JNI_METHOD_START
+  auto* stream = reinterpret_cast<ArrowArrayStream*>(address);
+  if (stream->release != nullptr) {
+    stream->release(stream);
+  }
+  JNI_METHOD_END()
+}
+
 /*
  * Class:     org_apache_arrow_c_jni_JniWrapper
  * Method:    exportSchema
@@ -261,3 +497,27 @@ JNIEXPORT void JNICALL 
Java_org_apache_arrow_c_jni_JniWrapper_exportArray(
   array->release = &release_exported<ArrowArray>;
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_c_jni_JniWrapper
+ * Method:    exportArrayStream
+ * Signature: (JLorg/apache/arrow/c/jni/PrivateData;)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_c_jni_JniWrapper_exportArrayStream(
+    JNIEnv* env, jobject, jlong address, jobject private_data) {
+  JNI_METHOD_START
+  auto* stream = reinterpret_cast<ArrowArrayStream*>(address);
+
+  JavaVM* vm;
+  if (env->GetJavaVM(&vm) != JNI_OK) {
+    JniThrow("Unable to get JavaVM instance");
+  }
+  jobject private_data_ref = env->NewGlobalRef(private_data);
+
+  stream->get_schema = &ArrowArrayStreamGetSchema;
+  stream->get_next = &ArrowArrayStreamGetNext;
+  stream->get_last_error = &ArrowArrayStreamGetLastError;
+  stream->release = &ArrowArrayStreamRelease;
+  stream->private_data = new InnerPrivateData(vm, private_data_ref);
+  JNI_METHOD_END()
+}
diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrayStreamExporter.java 
b/java/c/src/main/java/org/apache/arrow/c/ArrayStreamExporter.java
new file mode 100644
index 0000000000..2c5ca08e71
--- /dev/null
+++ b/java/c/src/main/java/org/apache/arrow/c/ArrayStreamExporter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.arrow.c.jni.JniWrapper;
+import org.apache.arrow.c.jni.PrivateData;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/**
+ * Utility to export an {@link ArrowReader} as an ArrowArrayStream.
+ */
+final class ArrayStreamExporter {
+  private final BufferAllocator allocator;
+
+  ArrayStreamExporter(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Java-side state for the exported stream.
+   */
+  static class ExportedArrayStreamPrivateData implements PrivateData {
+    final BufferAllocator allocator;
+    final ArrowReader reader;
+    // Read by the JNI side for get_last_error
+    byte[] lastError;
+
+    ExportedArrayStreamPrivateData(BufferAllocator allocator, ArrowReader 
reader) {
+      this.allocator = allocator;
+      this.reader = reader;
+    }
+
+    private int setLastError(Throwable err) {
+      // Do not let exceptions propagate up to JNI
+      try {
+        StringWriter buf = new StringWriter();
+        PrintWriter writer = new PrintWriter(buf);
+        err.printStackTrace(writer);
+        lastError = buf.toString().getBytes(StandardCharsets.UTF_8);
+      } catch (Throwable e) {
+        // Bail out of setting the error message - we'll still return an error 
code
+        lastError = null;
+      }
+      return 5; // = EIO
+    }
+
+    @SuppressWarnings("unused") // Used by JNI
+    int getNext(long arrayAddress) {
+      try (ArrowArray out = ArrowArray.wrap(arrayAddress)) {
+        if (reader.loadNextBatch()) {
+          Data.exportVectorSchemaRoot(allocator, reader.getVectorSchemaRoot(), 
reader, out);
+        } else {
+          out.markReleased();
+        }
+        return 0;
+      } catch (Throwable e) {
+        return setLastError(e);
+      }
+    }
+
+    @SuppressWarnings("unused") // Used by JNI
+    int getSchema(long schemaAddress) {
+      try (ArrowSchema out = ArrowSchema.wrap(schemaAddress)) {
+        final Schema schema = reader.getVectorSchemaRoot().getSchema();
+        Data.exportSchema(allocator, schema, reader, out);
+        return 0;
+      } catch (Throwable e) {
+        return setLastError(e);
+      }
+    }
+
+    @Override
+    public void close() {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        // XXX: C Data Interface gives us no way to signal errors to the 
caller,
+        // but the JNI side will catch this and log an error.
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  void export(ArrowArrayStream stream, ArrowReader reader) {
+    ExportedArrayStreamPrivateData data = new 
ExportedArrayStreamPrivateData(allocator, reader);
+    try {
+      JniWrapper.get().exportArrayStream(stream.memoryAddress(), data);
+    } catch (Exception e) {
+      data.close();
+      throw e;
+    }
+  }
+}
diff --git a/java/c/src/main/java/org/apache/arrow/c/ArrowArrayStream.java 
b/java/c/src/main/java/org/apache/arrow/c/ArrowArrayStream.java
new file mode 100644
index 0000000000..caf1f2fe96
--- /dev/null
+++ b/java/c/src/main/java/org/apache/arrow/c/ArrowArrayStream.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.apache.arrow.c.NativeUtil.NULL;
+import static org.apache.arrow.util.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.arrow.c.jni.CDataJniException;
+import org.apache.arrow.c.jni.JniWrapper;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.ReferenceManager;
+import org.apache.arrow.memory.util.MemoryUtil;
+
+/**
+ * C Stream Interface ArrowArrayStream.
+ * <p>
+ * Represents a wrapper for the following C structure:
+ *
+ * <pre>
+ * struct ArrowArrayStream {
+ *   int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+ *   int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+ *   const char* (*get_last_error)(struct ArrowArrayStream*);
+ *   void (*release)(struct ArrowArrayStream*);
+ *   void* private_data;
+ * };
+ * </pre>
+ */
+public class ArrowArrayStream implements BaseStruct {
+  private static final int SIZE_OF = 40;
+  private static final int INDEX_RELEASE_CALLBACK = 24;
+
+  private ArrowBuf data;
+
+  /**
+   * Snapshot of the ArrowArrayStream raw data.
+   */
+  public static class Snapshot {
+    public long get_schema;
+    public long get_next;
+    public long get_last_error;
+    public long release;
+    public long private_data;
+
+    /**
+     * Initialize empty ArrowArray snapshot.
+     */
+    public Snapshot() {
+      get_schema = NULL;
+      get_next = NULL;
+      get_last_error = NULL;
+      release = NULL;
+      private_data = NULL;
+    }
+  }
+
+  /**
+   * Create ArrowArrayStream from an existing memory address.
+   * <p>
+   * The resulting ArrowArrayStream does not own the memory.
+   *
+   * @param memoryAddress Memory address to wrap
+   * @return A new ArrowArrayStream instance
+   */
+  public static ArrowArrayStream wrap(long memoryAddress) {
+    return new ArrowArrayStream(new ArrowBuf(ReferenceManager.NO_OP, null, 
ArrowArrayStream.SIZE_OF, memoryAddress));
+  }
+
+  /**
+   * Create ArrowArrayStream by allocating memory.
+   * <p>
+   * The resulting ArrowArrayStream owns the memory.
+   *
+   * @param allocator Allocator for memory allocations
+   * @return A new ArrowArrayStream instance
+   */
+  public static ArrowArrayStream allocateNew(BufferAllocator allocator) {
+    ArrowArrayStream array = new 
ArrowArrayStream(allocator.buffer(ArrowArrayStream.SIZE_OF));
+    array.markReleased();
+    return array;
+  }
+
+  ArrowArrayStream(ArrowBuf data) {
+    checkNotNull(data, "ArrowArrayStream initialized with a null buffer");
+    this.data = data;
+  }
+
+  /**
+   * Mark the array as released.
+   */
+  public void markReleased() {
+    directBuffer().putLong(INDEX_RELEASE_CALLBACK, NULL);
+  }
+
+  @Override
+  public long memoryAddress() {
+    checkNotNull(data, "ArrowArrayStream is already closed");
+    return data.memoryAddress();
+  }
+
+  @Override
+  public void release() {
+    long address = memoryAddress();
+    JniWrapper.get().releaseArrayStream(address);
+  }
+
+  /**
+   * Get the schema of the stream.
+   * @param schema The ArrowSchema struct to output to
+   * @throws IOException if the stream returns an error
+   */
+  public void getSchema(ArrowSchema schema) throws IOException {
+    long address = memoryAddress();
+    try {
+      JniWrapper.get().getSchemaArrayStream(address, schema.memoryAddress());
+    } catch (CDataJniException e) {
+      throw new IOException("[errno " + e.getErrno() + "] " + e.getMessage());
+    }
+  }
+
+  /**
+   * Get the next batch in the stream.
+   * @param array The ArrowArray struct to output to
+   * @throws IOException if the stream returns an error
+   */
+  public void getNext(ArrowArray array) throws IOException {
+    long address = memoryAddress();
+    try {
+      JniWrapper.get().getNextArrayStream(address, array.memoryAddress());
+    } catch (CDataJniException e) {
+      throw new IOException("[errno " + e.getErrno() + "] " + e.getMessage());
+    }
+  }
+
+  @Override
+  public void close() {
+    if (data != null) {
+      data.close();
+      data = null;
+    }
+  }
+
+  private ByteBuffer directBuffer() {
+    return MemoryUtil.directBuffer(memoryAddress(), 
ArrowArrayStream.SIZE_OF).order(ByteOrder.nativeOrder());
+  }
+
+  /**
+   * Take a snapshot of the ArrowArrayStream raw values.
+   *
+   * @return snapshot
+   */
+  public ArrowArrayStream.Snapshot snapshot() {
+    ByteBuffer data = directBuffer();
+    ArrowArrayStream.Snapshot snapshot = new ArrowArrayStream.Snapshot();
+    snapshot.get_schema = data.getLong();
+    snapshot.get_next = data.getLong();
+    snapshot.get_last_error = data.getLong();
+    snapshot.release = data.getLong();
+    snapshot.private_data = data.getLong();
+    return snapshot;
+  }
+
+  /**
+   * Write values from Snapshot to the underlying ArrowArrayStream memory 
buffer.
+   */
+  public void save(ArrowArrayStream.Snapshot snapshot) {
+    directBuffer()
+        .putLong(snapshot.get_schema)
+        .putLong(snapshot.get_next)
+        .putLong(snapshot.get_last_error)
+        .putLong(snapshot.release)
+        .putLong(snapshot.private_data);
+  }
+}
diff --git 
a/java/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java 
b/java/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java
new file mode 100644
index 0000000000..b39a3be9b8
--- /dev/null
+++ b/java/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.apache.arrow.c.NativeUtil.NULL;
+import static org.apache.arrow.util.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/**
+ * An implementation of an {@link ArrowReader} backed by an ArrowArrayStream.
+ */
+final class ArrowArrayStreamReader extends ArrowReader {
+  private final ArrowArrayStream ownedStream;
+  private final CDataDictionaryProvider provider;
+
+  ArrowArrayStreamReader(BufferAllocator allocator, ArrowArrayStream stream) {
+    super(allocator);
+    this.provider = new CDataDictionaryProvider();
+
+    ArrowArrayStream.Snapshot snapshot = stream.snapshot();
+    checkState(snapshot.release != NULL, "Cannot import released 
ArrowArrayStream");
+
+    // Move imported stream
+    this.ownedStream = ArrowArrayStream.allocateNew(allocator);
+    this.ownedStream.save(snapshot);
+    stream.markReleased();
+    stream.close();
+  }
+
+  @Override
+  public Map<Long, Dictionary> getDictionaryVectors() {
+    return 
provider.getDictionaryIds().stream().collect(Collectors.toMap(Function.identity(),
 provider::lookup));
+  }
+
+  @Override
+  public Dictionary lookup(long id) {
+    return provider.lookup(id);
+  }
+
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    try (ArrowArray array = ArrowArray.allocateNew(allocator)) {
+      ownedStream.getNext(array);
+      if (array.snapshot().release == NULL) {
+        return false;
+      }
+      Data.importIntoVectorSchemaRoot(allocator, array, getVectorSchemaRoot(), 
provider);
+      return true;
+    }
+  }
+
+  @Override
+  public long bytesRead() {
+    return 0;
+  }
+
+  @Override
+  protected void closeReadSource() {
+    ownedStream.release();
+    ownedStream.close();
+    provider.close();
+  }
+
+  @Override
+  protected Schema readSchema() throws IOException {
+    try (ArrowSchema schema = ArrowSchema.allocateNew(allocator)) {
+      ownedStream.getSchema(schema);
+      return Data.importSchema(allocator, schema, provider);
+    }
+  }
+}
diff --git 
a/java/c/src/main/java/org/apache/arrow/c/CDataDictionaryProvider.java 
b/java/c/src/main/java/org/apache/arrow/c/CDataDictionaryProvider.java
index 43bcda276e..4a84f11704 100644
--- a/java/c/src/main/java/org/apache/arrow/c/CDataDictionaryProvider.java
+++ b/java/c/src/main/java/org/apache/arrow/c/CDataDictionaryProvider.java
@@ -52,6 +52,7 @@ public class CDataDictionaryProvider implements 
DictionaryProvider, AutoCloseabl
     }
   }
 
+  @Override
   public final Set<Long> getDictionaryIds() {
     return map.keySet();
   }
diff --git a/java/c/src/main/java/org/apache/arrow/c/Data.java 
b/java/c/src/main/java/org/apache/arrow/c/Data.java
index 7151bff94b..9ee5a6c757 100644
--- a/java/c/src/main/java/org/apache/arrow/c/Data.java
+++ b/java/c/src/main/java/org/apache/arrow/c/Data.java
@@ -26,6 +26,7 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID;
@@ -162,6 +163,16 @@ public final class Data {
     }
   }
 
+  /**
+   * Export a reader as an ArrowArrayStream using the C Stream Interface.
+   * @param allocator Buffer allocator for allocating C data inteface fields
+   * @param reader Reader to export
+   * @param out C struct to export the stream
+   */
+  public static void exportArrayStream(BufferAllocator allocator, ArrowReader 
reader, ArrowArrayStream out) {
+    new ArrayStreamExporter(allocator).export(out, reader);
+  }
+
   /**
    * Import Java Field from the C data interface.
    * <p>
@@ -314,4 +325,14 @@ public final class Data {
     }
     return vsr;
   }
+
+  /**
+   * Import an ArrowArrayStream as an {@link ArrowReader}.
+   * @param allocator Buffer allocator for allocating the output data.
+   * @param stream C stream interface struct to import.
+   * @return Imported reader
+   */
+  public static ArrowReader importArrayStream(BufferAllocator allocator, 
ArrowArrayStream stream) {
+    return new ArrowArrayStreamReader(allocator, stream);
+  }
 }
diff --git a/java/c/src/main/java/org/apache/arrow/c/NativeUtil.java 
b/java/c/src/main/java/org/apache/arrow/c/NativeUtil.java
index e2feda1e5d..b152ea4e7c 100644
--- a/java/c/src/main/java/org/apache/arrow/c/NativeUtil.java
+++ b/java/c/src/main/java/org/apache/arrow/c/NativeUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.arrow.c;
 
+import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
@@ -52,7 +53,8 @@ public final class NativeUtil {
       length++;
     }
     byte[] bytes = new byte[length];
-    ((ByteBuffer) reader.rewind()).get(bytes);
+    // Force use of base class rewind() to avoid breaking change of 
ByteBuffer.rewind in JDK9+
+    ((ByteBuffer) ((Buffer) reader).rewind()).get(bytes);
     return new String(bytes, 0, length, StandardCharsets.UTF_8);
   }
 
diff --git a/java/c/src/main/java/org/apache/arrow/c/jni/JniWrapper.java 
b/java/c/src/main/java/org/apache/arrow/c/jni/CDataJniException.java
similarity index 50%
copy from java/c/src/main/java/org/apache/arrow/c/jni/JniWrapper.java
copy to java/c/src/main/java/org/apache/arrow/c/jni/CDataJniException.java
index 9e1c19b100..bebd434f3d 100644
--- a/java/c/src/main/java/org/apache/arrow/c/jni/JniWrapper.java
+++ b/java/c/src/main/java/org/apache/arrow/c/jni/CDataJniException.java
@@ -18,30 +18,28 @@
 package org.apache.arrow.c.jni;
 
 /**
- * JniWrapper for C Data Interface API implementation.
+ * An exception raised by the JNI side of the C Data bridge.
  */
-public class JniWrapper {
-  private static final JniWrapper INSTANCE = new JniWrapper();
+public final class CDataJniException extends Exception {
+  private final int errno;
 
-  public static JniWrapper get() {
-    JniLoader.get().ensureLoaded();
-    return INSTANCE;
+  public CDataJniException(int errno, String message) {
+    super(message);
+    this.errno = errno;
   }
 
-  private JniWrapper() {
-    // A best effort to error on 32-bit systems
-    String dataModel = System.getProperty("sun.arch.data.model");
-    if (dataModel != null && dataModel.equals("32")) {
-      throw new UnsupportedOperationException(
-          "The Java C Data Interface implementation is currently only 
supported on 64-bit systems");
-    }
+  /**
+   * The original error code returned from C.
+   */
+  public int getErrno() {
+    return errno;
   }
 
-  public native void releaseSchema(long memoryAddress);
-
-  public native void releaseArray(long memoryAddress);
-
-  public native void exportSchema(long memoryAddress, PrivateData privateData);
-
-  public native void exportArray(long memoryAddress, PrivateData data);
+  @Override
+  public String toString() {
+    return "CDataJniException{" +
+        "errno=" + errno +
+        ", message=" + getMessage() +
+        '}';
+  }
 }
diff --git a/java/c/src/main/java/org/apache/arrow/c/jni/JniWrapper.java 
b/java/c/src/main/java/org/apache/arrow/c/jni/JniWrapper.java
index 9e1c19b100..eb299b65f0 100644
--- a/java/c/src/main/java/org/apache/arrow/c/jni/JniWrapper.java
+++ b/java/c/src/main/java/org/apache/arrow/c/jni/JniWrapper.java
@@ -41,7 +41,15 @@ public class JniWrapper {
 
   public native void releaseArray(long memoryAddress);
 
+  public native void getNextArrayStream(long streamAddress, long arrayAddress) 
throws CDataJniException;
+
+  public native void getSchemaArrayStream(long streamAddress, long 
arrayAddress) throws CDataJniException;
+
+  public native void releaseArrayStream(long memoryAddress);
+
   public native void exportSchema(long memoryAddress, PrivateData privateData);
 
   public native void exportArray(long memoryAddress, PrivateData data);
+
+  public native void exportArrayStream(long memoryAddress, PrivateData data);
 }
diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java 
b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
index 6aa6e889ba..6a2b476b0c 100644
--- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
+++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
@@ -34,9 +34,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.arrow.c.ArrowArray;
-import org.apache.arrow.c.ArrowSchema;
-import org.apache.arrow.c.Data;
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
diff --git a/java/c/src/test/java/org/apache/arrow/c/StreamTest.java 
b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
new file mode 100644
index 0000000000..06401687a5
--- /dev/null
+++ b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.c;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.compare.Range;
+import org.apache.arrow.vector.compare.RangeEqualsVisitor;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+final class StreamTest {
+  private RootAllocator allocator = null;
+
+  @BeforeEach
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @AfterEach
+  public void tearDown() {
+    allocator.close();
+  }
+
+  @Test
+  public void testRoundtripInts() throws Exception {
+    final Schema schema = new 
Schema(Collections.singletonList(Field.nullable("ints", new ArrowType.Int(32, 
true))));
+    final List<ArrowRecordBatch> batches = new ArrayList<>();
+    try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+      final IntVector ints = (IntVector) root.getVector(0);
+      VectorUnloader unloader = new VectorUnloader(root);
+
+      root.allocateNew();
+      ints.setSafe(0, 1);
+      ints.setSafe(1, 2);
+      ints.setSafe(2, 4);
+      ints.setSafe(3, 8);
+      root.setRowCount(4);
+      batches.add(unloader.getRecordBatch());
+
+      root.allocateNew();
+      ints.setSafe(0, 1);
+      ints.setNull(1);
+      ints.setSafe(2, 4);
+      ints.setNull(3);
+      root.setRowCount(4);
+      batches.add(unloader.getRecordBatch());
+      roundtrip(schema, batches);
+    }
+  }
+
+  @Test
+  public void roundtripStrings() throws Exception {
+    final Schema schema = new Schema(Arrays.asList(Field.nullable("ints", new 
ArrowType.Int(32, true)),
+        Field.nullable("strs", new ArrowType.Utf8())));
+    final List<ArrowRecordBatch> batches = new ArrayList<>();
+    try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+      final IntVector ints = (IntVector) root.getVector(0);
+      final VarCharVector strs = (VarCharVector) root.getVector(1);
+      VectorUnloader unloader = new VectorUnloader(root);
+
+      root.allocateNew();
+      ints.setSafe(0, 1);
+      ints.setSafe(1, 2);
+      ints.setSafe(2, 4);
+      ints.setSafe(3, 8);
+      strs.setSafe(0, "".getBytes(StandardCharsets.UTF_8));
+      strs.setSafe(1, "a".getBytes(StandardCharsets.UTF_8));
+      strs.setSafe(2, "bc".getBytes(StandardCharsets.UTF_8));
+      strs.setSafe(3, "defg".getBytes(StandardCharsets.UTF_8));
+      root.setRowCount(4);
+      batches.add(unloader.getRecordBatch());
+
+      root.allocateNew();
+      ints.setSafe(0, 1);
+      ints.setNull(1);
+      ints.setSafe(2, 4);
+      ints.setNull(3);
+      strs.setSafe(0, "".getBytes(StandardCharsets.UTF_8));
+      strs.setNull(1);
+      strs.setSafe(2, "bc".getBytes(StandardCharsets.UTF_8));
+      strs.setNull(3);
+      root.setRowCount(4);
+      batches.add(unloader.getRecordBatch());
+      roundtrip(schema, batches);
+    }
+  }
+
+  @Test
+  public void roundtripDictionary() throws Exception {
+    final ArrowType.Int indexType = new ArrowType.Int(32, true);
+    final DictionaryEncoding encoding = new DictionaryEncoding(1L, false, 
indexType);
+    final Schema schema = new Schema(Collections.singletonList(
+        new Field("dict", new FieldType(/*nullable=*/true, indexType, 
encoding), Collections.emptyList())));
+    final List<ArrowRecordBatch> batches = new ArrayList<>();
+    try (final CDataDictionaryProvider provider = new 
CDataDictionaryProvider();
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+      final VarCharVector dictionary = new VarCharVector("values", allocator);
+      dictionary.allocateNew();
+      dictionary.setSafe(0, "foo".getBytes(StandardCharsets.UTF_8));
+      dictionary.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));
+      dictionary.setNull(2);
+      dictionary.setValueCount(3);
+      provider.put(new Dictionary(dictionary, encoding));
+      final IntVector encoded = (IntVector) root.getVector(0);
+      VectorUnloader unloader = new VectorUnloader(root);
+
+      root.allocateNew();
+      encoded.setSafe(0, 0);
+      encoded.setSafe(1, 1);
+      encoded.setSafe(2, 0);
+      encoded.setSafe(3, 2);
+      root.setRowCount(4);
+      batches.add(unloader.getRecordBatch());
+
+      root.allocateNew();
+      encoded.setSafe(0, 0);
+      encoded.setNull(1);
+      encoded.setSafe(2, 1);
+      encoded.setNull(3);
+      root.setRowCount(4);
+      batches.add(unloader.getRecordBatch());
+      roundtrip(schema, batches, provider);
+    }
+  }
+
+  @Test
+  public void importReleasedStream() {
+    try (final ArrowArrayStream stream = 
ArrowArrayStream.allocateNew(allocator)) {
+      Exception e = assertThrows(IllegalStateException.class, () -> 
Data.importArrayStream(allocator, stream));
+      assertThat(e).hasMessageContaining("Cannot import released 
ArrowArrayStream");
+    }
+  }
+
+  @Test
+  public void getNextError() throws Exception {
+    final Schema schema = new 
Schema(Collections.singletonList(Field.nullable("ints", new ArrowType.Int(32, 
true))));
+    final List<ArrowRecordBatch> batches = new ArrayList<>();
+    try (final ArrowReader source = new InMemoryArrowReader(allocator, schema, 
batches,
+        new DictionaryProvider.MapDictionaryProvider()) {
+      @Override
+      public boolean loadNextBatch() throws IOException {
+        throw new IOException("Failed to load batch!");
+      }
+    }; final ArrowArrayStream stream = 
ArrowArrayStream.allocateNew(allocator)) {
+      Data.exportArrayStream(allocator, source, stream);
+      try (final ArrowReader reader = Data.importArrayStream(allocator, 
stream)) {
+        assertThat(reader.getVectorSchemaRoot().getSchema()).isEqualTo(schema);
+        final IOException e = assertThrows(IOException.class, 
reader::loadNextBatch);
+        assertThat(e).hasMessageContaining("Failed to load batch!");
+        assertThat(e).hasMessageContaining("[errno ");
+      }
+    }
+  }
+
+  @Test
+  public void getSchemaError() throws Exception {
+    final Schema schema = new 
Schema(Collections.singletonList(Field.nullable("ints", new ArrowType.Int(32, 
true))));
+    final List<ArrowRecordBatch> batches = new ArrayList<>();
+    try (final ArrowReader source = new InMemoryArrowReader(allocator, schema, 
batches,
+        new DictionaryProvider.MapDictionaryProvider()) {
+      @Override
+      protected Schema readSchema() {
+        throw new IllegalArgumentException("Failed to read schema!");
+      }
+    }; final ArrowArrayStream stream = 
ArrowArrayStream.allocateNew(allocator)) {
+      Data.exportArrayStream(allocator, source, stream);
+      try (final ArrowReader reader = Data.importArrayStream(allocator, 
stream)) {
+        final IOException e = assertThrows(IOException.class, 
reader::getVectorSchemaRoot);
+        assertThat(e).hasMessageContaining("Failed to read schema!");
+        assertThat(e).hasMessageContaining("[errno ");
+      }
+    }
+  }
+
+  void roundtrip(Schema schema, List<ArrowRecordBatch> batches, 
DictionaryProvider provider) throws Exception {
+    ArrowReader source = new InMemoryArrowReader(allocator, schema, batches, 
provider);
+
+    try (final ArrowArrayStream stream = 
ArrowArrayStream.allocateNew(allocator);
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+      final VectorLoader loader = new VectorLoader(root);
+      Data.exportArrayStream(allocator, source, stream);
+
+      try (final ArrowReader reader = Data.importArrayStream(allocator, 
stream)) {
+        assertThat(reader.getVectorSchemaRoot().getSchema()).isEqualTo(schema);
+
+        for (ArrowRecordBatch batch : batches) {
+          assertThat(reader.loadNextBatch()).isTrue();
+          loader.load(batch);
+
+          
assertThat(reader.getVectorSchemaRoot().getRowCount()).isEqualTo(root.getRowCount());
+
+          for (int i = 0; i < root.getFieldVectors().size(); i++) {
+            final FieldVector expected = root.getVector(i);
+            final FieldVector actual = 
reader.getVectorSchemaRoot().getVector(i);
+            assertVectorsEqual(expected, actual);
+          }
+        }
+        assertThat(reader.loadNextBatch()).isFalse();
+        
assertThat(reader.getDictionaryIds()).isEqualTo(provider.getDictionaryIds());
+        for (Map.Entry<Long, Dictionary> entry : 
reader.getDictionaryVectors().entrySet()) {
+          final FieldVector expected = 
provider.lookup(entry.getKey()).getVector();
+          final FieldVector actual = entry.getValue().getVector();
+          assertVectorsEqual(expected, actual);
+        }
+      }
+    }
+  }
+
+  void roundtrip(Schema schema, List<ArrowRecordBatch> batches) throws 
Exception {
+    roundtrip(schema, batches, new CDataDictionaryProvider());
+  }
+
+  private static void assertVectorsEqual(FieldVector expected, FieldVector 
actual) {
+    
assertThat(actual.getField().getType()).isEqualTo(expected.getField().getType());
+    assertThat(actual.getValueCount()).isEqualTo(expected.getValueCount());
+    final Range range = new Range(/*leftStart=*/0, /*rightStart=*/0, 
expected.getValueCount());
+    assertThat(new RangeEqualsVisitor(expected, actual)
+        .rangeEquals(range))
+        .as("Vectors were not equal.\nExpected: %s\nGot: %s", expected, actual)
+        .isTrue();
+  }
+
+  /**
+   * An ArrowReader backed by a fixed list of batches.
+   */
+  static class InMemoryArrowReader extends ArrowReader {
+    private final Schema schema;
+    private final List<ArrowRecordBatch> batches;
+    private final DictionaryProvider provider;
+    private int nextBatch;
+
+    InMemoryArrowReader(BufferAllocator allocator, Schema schema, 
List<ArrowRecordBatch> batches,
+                        DictionaryProvider provider) {
+      super(allocator);
+      this.schema = schema;
+      this.batches = batches;
+      this.provider = provider;
+      this.nextBatch = 0;
+    }
+
+    @Override
+    public Dictionary lookup(long id) {
+      return provider.lookup(id);
+    }
+
+    @Override
+    public Set<Long> getDictionaryIds() {
+      return provider.getDictionaryIds();
+    }
+
+    @Override
+    public Map<Long, Dictionary> getDictionaryVectors() {
+      return 
getDictionaryIds().stream().collect(Collectors.toMap(Function.identity(), 
this::lookup));
+    }
+
+    @Override
+    public boolean loadNextBatch() throws IOException {
+      if (nextBatch < batches.size()) {
+        VectorLoader loader = new VectorLoader(getVectorSchemaRoot());
+        loader.load(batches.get(nextBatch++));
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public long bytesRead() {
+      return 0;
+    }
+
+    @Override
+    protected void closeReadSource() throws IOException {
+      try {
+        AutoCloseables.close(batches);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    protected Schema readSchema() {
+      return schema;
+    }
+  }
+}
diff --git a/java/c/src/test/python/integration_tests.py 
b/java/c/src/test/python/integration_tests.py
index c1f130f21d..33ff1cf4a9 100644
--- a/java/c/src/test/python/integration_tests.py
+++ b/java/c/src/test/python/integration_tests.py
@@ -84,6 +84,13 @@ class Bridge:
             ptr_array), self.java_c.ArrowSchema.wrap(ptr_schema))
         return pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
 
+    def java_to_python_reader(self, reader):
+        c_stream = ffi.new("struct ArrowArrayStream*")
+        ptr_stream = int(ffi.cast("uintptr_t", c_stream))
+        self.java_c.Data.exportArrayStream(self.java_allocator, reader,
+                                           
self.java_c.ArrowArrayStream.wrap(ptr_stream))
+        return pa.RecordBatchReader._import_from_c(ptr_stream)
+
     def python_to_java_field(self, field):
         c_schema = self.java_c.ArrowSchema.allocateNew(self.java_allocator)
         field._export_to_c(c_schema.memoryAddress())
@@ -102,6 +109,11 @@ class Bridge:
             c_array.memoryAddress(), c_schema.memoryAddress())
         return self.java_c.Data.importVectorSchemaRoot(self.java_allocator, 
c_array, c_schema, None)
 
+    def python_to_java_reader(self, reader):
+        c_stream = 
self.java_c.ArrowArrayStream.allocateNew(self.java_allocator)
+        reader._export_to_c(c_stream.memoryAddress())
+        return self.java_c.Data.importArrayStream(self.java_allocator, 
c_stream)
+
     def close(self):
         self.java_allocator.close()
 
@@ -151,6 +163,17 @@ class TestPythonIntegration(unittest.TestCase):
         expected = rb_generator()
         self.assertEqual(expected, new_rb)
 
+    def round_trip_reader(self, schema, batches):
+        reader = pa.RecordBatchReader.from_batches(schema, batches)
+
+        java_reader = self.bridge.python_to_java_reader(reader)
+        del reader
+        py_reader = self.bridge.java_to_python_reader(java_reader)
+        del java_reader
+
+        actual = list(py_reader)
+        self.assertEqual(batches, actual)
+
     def test_string_array(self):
         self.round_trip_array(lambda: pa.array([None, "a", "bb", "ccc"]))
 
@@ -217,6 +240,34 @@ class TestPythonIntegration(unittest.TestCase):
         self.round_trip_record_batch(
             lambda: pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2', 'f3']))
 
+    def test_reader_roundtrip(self):
+        schema = pa.schema([("ints", pa.int64()), ("strs", pa.string())])
+        data = [
+            pa.record_batch([[1, 2, 3, None],
+                             ["a", "bc", None, ""]],
+                            schema=schema),
+            pa.record_batch([[None, 4, 5, 6],
+                             [None, "", "def", "g"]],
+                            schema=schema),
+        ]
+        self.round_trip_reader(schema, data)
+
+    def test_reader_complex_roundtrip(self):
+        schema = pa.schema([
+            ("str_dict", pa.dictionary(pa.int8(), pa.string())),
+            ("int_list", pa.list_(pa.int64())),
+        ])
+        dictionary = pa.array(["a", "bc", None])
+        data = [
+            pa.record_batch([pa.DictionaryArray.from_arrays([0, 2], 
dictionary),
+                             [[1, 2, 3], None]],
+                            schema=schema),
+            pa.record_batch([pa.DictionaryArray.from_arrays([None, 1], 
dictionary),
+                             [[], [4]]],
+                            schema=schema),
+        ]
+        self.round_trip_reader(schema, data)
+
 
 if __name__ == '__main__':
     setup_jvm()
diff --git a/java/pom.xml b/java/pom.xml
index 28afabc344..6f2ed823cf 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -554,6 +554,12 @@
         <artifactId>javax.annotation-api</artifactId>
         <version>1.3.2</version>
       </dependency>
+      <dependency>
+        <groupId>org.assertj</groupId>
+        <artifactId>assertj-core</artifactId>
+        <version>3.23.1</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.immutables</groupId>
         <artifactId>value</artifactId>
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
index 21165c07d9..76e1eb9f66 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
@@ -29,6 +29,9 @@ public interface DictionaryProvider {
   /** Return the dictionary for the given ID. */
   Dictionary lookup(long id);
 
+  /** Get all dictionary IDs. */
+  Set<Long> getDictionaryIds();
+
   /**
    * Implementation of {@link DictionaryProvider} that is backed by a hash-map.
    */
@@ -50,6 +53,7 @@ public interface DictionaryProvider {
       map.put(dictionary.getEncoding().getId(), dictionary);
     }
 
+    @Override
     public final Set<Long> getDictionaryIds() {
       return map.keySet();
     }
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
index 9d940deecf..04c57d7e82 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
@@ -99,6 +100,11 @@ public abstract class ArrowReader implements 
DictionaryProvider, AutoCloseable {
     return dictionaries.get(id);
   }
 
+  @Override
+  public Set<Long> getDictionaryIds() {
+    return dictionaries.keySet();
+  }
+
   /**
    * Load the next ArrowRecordBatch to the vector schema root if available.
    *
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
index d093e840ab..6455857c27 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
@@ -115,6 +116,11 @@ public class JsonFileReader implements AutoCloseable, 
DictionaryProvider {
     return dictionaries.get(id);
   }
 
+  @Override
+  public Set<Long> getDictionaryIds() {
+    return dictionaries.keySet();
+  }
+
   /** Reads the beginning (schema section) of the json file and returns it. */
   public Schema start() throws JsonParseException, IOException {
     readToken(START_OBJECT);
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index 6597e0302c..9deb42c498 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -684,7 +684,8 @@ public class MessageSerializer {
 
       int messageLength = MessageSerializer.bytesToInt(buffer.array());
       if (messageLength == IPC_CONTINUATION_TOKEN) {
-        buffer.clear();
+        // Avoid breaking change in signature of ByteBuffer.clear() in JDK9+
+        ((java.nio.Buffer) buffer).clear();
         // ARROW-6313, if the first 4 bytes are continuation message, read the 
next 4 for the length
         if (in.readFully(buffer) == 4) {
           messageLength = MessageSerializer.bytesToInt(buffer.array());
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 9e43eb4eb9..ee5446fd57 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -869,6 +869,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
     cdef cppclass CRecordBatchReader" arrow::RecordBatchReader":
         shared_ptr[CSchema] schema()
+        CStatus Close()
         CStatus ReadNext(shared_ptr[CRecordBatch]* batch)
         CResult[shared_ptr[CTable]] ToTable()
 
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index f0297ff004..b5cbbfb62c 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -598,7 +598,7 @@ class _ReadPandasMixin:
 cdef class RecordBatchReader(_Weakrefable):
     """Base class for reading stream of record batches.
 
-    Record batch readers function as iterators of record batches that also 
+    Record batch readers function as iterators of record batches that also
     provide the schema (without the need to get any batches).
 
     Warnings
@@ -608,7 +608,7 @@ cdef class RecordBatchReader(_Weakrefable):
 
     Notes
     -----
-    To import and export using the Arrow C stream interface, use the 
+    To import and export using the Arrow C stream interface, use the
     ``_import_from_c`` and ``_export_from_c`` methods. However, keep in mind 
this
     interface is intended for expert users.
 
@@ -702,11 +702,18 @@ cdef class RecordBatchReader(_Weakrefable):
 
     read_pandas = _ReadPandasMixin.read_pandas
 
+    def close(self):
+        """
+        Release any resources associated with the reader.
+        """
+        with nogil:
+            check_status(self.reader.get().Close())
+
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        pass
+        self.close()
 
     def _export_to_c(self, out_ptr):
         """

Reply via email to