lidavidm commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1138551688


##########
java/dataset/src/main/cpp/jni_util.cc:
##########
@@ -331,6 +331,18 @@ std::vector<std::string> ToStringVector(JNIEnv* env, 
jobjectArray& str_array) {
   return vector;
 }
 
+std::map<std::string, long> ToMap(JNIEnv* env, jobjectArray& str_array) {

Review Comment:
   Use std::unordered_map, not std::map



##########
java/dataset/pom.xml:
##########
@@ -156,27 +195,29 @@
                 </includes>
             </resource>
         </resources>
-
-        <plugins>
-            <plugin>
-                <groupId>org.xolstice.maven.plugins</groupId>
-                <artifactId>protobuf-maven-plugin</artifactId>
-                <version>0.6.1</version>
-                <configuration>
-                    
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
-                    </protocArtifact>
-                    
<protoSourceRoot>../../cpp/src/jni/dataset/proto</protoSourceRoot>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>compile</goal>
-                            <goal>test-compile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
     </build>
-
+    <profiles>
+        <profile>
+            <id>default</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin.version}</version>
+                        <configuration>
+                            <source>1.8</source>
+                            <target>1.8</target>
+                            <!--To be able to consume Isthmus Library to 
create binary Substrait Plan-->
+                            <testSource>11</testSource>
+                            <testTarget>11</testTarget>

Review Comment:
   Can we check in or embed pre-generated plans instead of mixing up Java 
versions like this?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/Substrait.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.util.Map;
+
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Java binding of the C++ Substrait API.
+ */
+public interface Substrait {

Review Comment:
   Originally I thought the separation was to allow multiple consumer 
implementations so I suggested an interface. If that isn't the intent then 
remove the interface and only provide the concrete implementation.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/Substrait.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.util.Map;
+
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Java binding of the C++ Substrait API.
+ */
+public interface Substrait {
+  /**
+   * Execute Substrait Plan to read Schema and ArrowRecordBatches for Local 
Files.
+   *
+   * @param plan the JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  ArrowReader runQueryLocalFiles(String plan);
+
+  /**
+   * Execute JSON Substrait Plan to read Schema and ArrowRecordBatches for 
Named Tables.
+   *
+   * @param plan                  the JSON Substrait plan.
+   * @param mapTableToArrowReader the mapping name of Tables Name and theirs 
memory addres representation.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  ArrowReader runQueryNamedTables(String plan, Map<String, ArrowReader> 
mapTableToArrowReader);
+
+
+  /**
+   * Execute binary Substrait Plan to read Schema and ArrowRecordBatches for 
Named Tables.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @param mapTableToArrowReader the mapping name of Tables Name and theirs 
memory addres representation.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  ArrowReader runQueryNamedTables(Object plan, Map<String, ArrowReader> 
mapTableToArrowReader);

Review Comment:
   make this byte[] or ByteBuffer, not Object



##########
java/dataset/src/test/resources/substrait/parquet/binary.parquet:
##########


Review Comment:
   Move all testing files into arrow-testing. Use the data that's already there 
if possible.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * JniWrapper to Consume Substrait Plans.
+ */
+public class JniWrapper {
+  private static final JniWrapper INSTANCE = new JniWrapper();
+
+  private JniWrapper() {
+  }
+
+  public static JniWrapper get() {
+    JniLoader.get().ensureLoaded();
+    return INSTANCE;
+  }
+
+  /**
+   * Consume the JSON Substrait Plan that contains Local Files and export the 
RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the JSON Substrait plan.
+   * @param memoryAddressOutput the memory address where RecordBatchReader is 
exported.
+   */
+  public native void executeSerializedPlanLocalFiles(String planInput, long 
memoryAddressOutput);
+
+  /**
+   * Consume the JSON Substrait Plan that contains Named Tables and export the 
RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the JSON Substrait plan.
+   * @param mapTableToMemoryAddressInput the mapping name of Tables Name and 
theirs memory addres representation.
+   * @param memoryAddressOutput the memory address where RecordBatchReader is 
exported.
+   */
+  public native void executeSerializedPlanNamedTables(String planInput, 
String[] mapTableToMemoryAddressInput,
+                                                      long 
memoryAddressOutput);
+
+  /**
+   * Consume the binary Substrait Plan that contains Named Tables and export 
the RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the binary Substrait plan.
+   * @param mapTableToMemoryAddressInput the mapping name of Tables Name and 
theirs memory addres representation.
+   * @param memoryAddressOutput the memory address where RecordBatchReader is 
exported.
+   */
+  public native void executeSerializedPlanNamedTables(Object planInput, 
String[] mapTableToMemoryAddressInput,

Review Comment:
   Be more specific than Object.



##########
java/dataset/src/main/cpp/jni_util.h:
##########
@@ -25,6 +25,7 @@
 #include "arrow/memory_pool.h"
 #include "arrow/result.h"
 #include "arrow/type.h"
+#include "map"

Review Comment:
   Use bracket includes for standard library headers.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/Substrait.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.util.Map;
+
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Java binding of the C++ Substrait API.
+ */
+public interface Substrait {
+  /**
+   * Execute Substrait Plan to read Schema and ArrowRecordBatches for Local 
Files.
+   *
+   * @param plan the JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  ArrowReader runQueryLocalFiles(String plan);

Review Comment:
   Add specific `throws` clauses



##########
java/dataset/pom.xml:
##########
@@ -156,27 +195,29 @@
                 </includes>
             </resource>
         </resources>
-
-        <plugins>
-            <plugin>
-                <groupId>org.xolstice.maven.plugins</groupId>
-                <artifactId>protobuf-maven-plugin</artifactId>
-                <version>0.6.1</version>
-                <configuration>
-                    
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
-                    </protocArtifact>
-                    
<protoSourceRoot>../../cpp/src/jni/dataset/proto</protoSourceRoot>

Review Comment:
   What happened to Protobuf generation?



##########
java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java:
##########
@@ -38,9 +40,13 @@
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.calcite.sql.parser.SqlParseException;
 import org.junit.After;
 import org.junit.Before;
 
+import io.substrait.isthmus.SqlToSubstrait;

Review Comment:
   I don't think it really matters to use Isthmus here. Just check in the 
generated plan and drop the dependency.



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,100 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/Object;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_Object_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // mapping arrow::Buffer
+  jbyte *buff = (jbyte *) env->GetDirectBufferAddress(plan);
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = 
arrow::AllocateBuffer(length).ValueOrDie();

Review Comment:
   Do not use ValueOrDie.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * JniWrapper to Consume Substrait Plans.
+ */
+public class JniWrapper {
+  private static final JniWrapper INSTANCE = new JniWrapper();
+
+  private JniWrapper() {
+  }
+
+  public static JniWrapper get() {
+    JniLoader.get().ensureLoaded();
+    return INSTANCE;
+  }
+
+  /**
+   * Consume the JSON Substrait Plan that contains Local Files and export the 
RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the JSON Substrait plan.
+   * @param memoryAddressOutput the memory address where RecordBatchReader is 
exported.
+   */
+  public native void executeSerializedPlanLocalFiles(String planInput, long 
memoryAddressOutput);
+
+  /**
+   * Consume the JSON Substrait Plan that contains Named Tables and export the 
RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the JSON Substrait plan.
+   * @param mapTableToMemoryAddressInput the mapping name of Tables Name and 
theirs memory addres representation.
+   * @param memoryAddressOutput the memory address where RecordBatchReader is 
exported.
+   */
+  public native void executeSerializedPlanNamedTables(String planInput, 
String[] mapTableToMemoryAddressInput,

Review Comment:
   You should document the encoding of the array parameter here and below.



##########
java/dataset/src/main/cpp/jni_util.cc:
##########
@@ -331,6 +331,18 @@ std::vector<std::string> ToStringVector(JNIEnv* env, 
jobjectArray& str_array) {
   return vector;
 }
 
+std::map<std::string, long> ToMap(JNIEnv* env, jobjectArray& str_array) {
+  std::map<std::string, long> map_table_to_memory_address;
+  int length = env->GetArrayLength(str_array);
+  for (int pos = 0; pos < length; pos++) {
+    jstring j_string_key = (jstring)(env->GetObjectArrayElement(str_array, 
pos));

Review Comment:
   No C-style casts.



##########
docs/source/java/substrait.rst:
##########
@@ -0,0 +1,543 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+Cross-Language Serialization for Relational Algebra.
+
+`Substrait`_ is a well-defined, cross-language specification for data compute 
operations.
+
+.. contents::
+
+Getting Started
+===============
+
+There are two perspectives among substrait implementors:
+
+- Substrait Consumer
+- Substrait Producer
+
+Substrait is calling Acero thru JNI wrappers to Consume Substrait plans.
+
+.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero.

Review Comment:
   If the latter, just state that the Java JNI modules provide a Substrait 
consumer based on Acero, and link to any supporting documentation as needed. We 
are not the Substrait project.



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestSubstraitConsumer.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.util.JsonFormat;
+
+import io.substrait.proto.Plan;
+
+public class TestSubstraitConsumer extends TestDataset {
+  private RootAllocator allocator = null;
+
+  public static String planReplaceLocalFileURI(String plan, String uri) throws 
IOException {
+    StringBuilder builder = new StringBuilder(plan);
+    builder.replace(builder.indexOf("FILENAME_PLACEHOLDER"),
+        builder.indexOf("FILENAME_PLACEHOLDER") + 
"FILENAME_PLACEHOLDER".length(), uri);
+    return builder.toString();
+  }
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testCreateSubstraitPlan() throws SqlParseException, IOException {
+    String sql = "SELECT * from nation";
+    String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME 
CHAR(25), " +
+        "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+    Plan plan = getPlan(sql, ImmutableList.of(nation));
+    String jsonPlan = 
JsonFormat.printer().includingDefaultValueFields().print(plan);
+    assertEquals(getSubstraitPlan("named_table_nation.json"), jsonPlan);
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    try (ArrowReader arrowReader = new SubstraitConsumer(rootAllocator())
+        .runQueryLocalFiles(
+            planReplaceLocalFileURI(
+                getSubstraitPlan("local_files_binary.json"),
+                getNamedTableUri("binary.parquet")
+            )
+        )
+    ) {
+      while (arrowReader.loadNextBatch()) {
+        assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 12);
+      }
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNation() throws Exception {
+    // Query: SELECT * from nation
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("nation.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", reader);
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          getSubstraitPlan("named_table_nation.json"),
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("MOROCCO"));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNationAndCustomer() throws Exception {
+    // Query:
+    // SELECT n.n_name, c.c_name, c.c_phone, c.c_address FROM nation n JOIN 
customer c ON n.n_nationkey = c.c_nationkey
+    ScanOptions optionsNations = new ScanOptions(/*batchSize*/ 32768);
+    ScanOptions optionsCustomer = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("nation.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(optionsNations);
+        ArrowReader readerNation = scanner.scanBatches();
+        DatasetFactory datasetFactoryCustomer = new 
FileSystemDatasetFactory(rootAllocator(),
+            NativeMemoryPool.getDefault(), FileFormat.PARQUET, 
getNamedTableUri("customer.parquet"));
+        Dataset datasetCustomer = datasetFactoryCustomer.finish();
+        Scanner scannerCustomer = datasetCustomer.newScan(optionsCustomer);
+        ArrowReader readerCustomer = scannerCustomer.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", readerNation);
+      mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          getSubstraitPlan("named_table_nation_customer.json"),
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 15000);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("Customer#000014924"));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testRunBinaryQueryNamedTableNation() throws Exception {
+    // Query: SELECT * from nation
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("nation.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      // map table to reader
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", reader);
+      // get binary plan
+      String sql = "SELECT * from nation";
+      String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, 
N_NAME CHAR(25), " +
+          "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+      Plan plan = getPlan(sql, ImmutableList.of(nation));
+      ByteBuffer substraitPlan = 
ByteBuffer.allocateDirect(plan.toByteArray().length);
+      substraitPlan.put(plan.toByteArray());
+      // run query
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          substraitPlan,
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("MOROCCO"));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();

Review Comment:
   Don't do this in tests. Allow the exception to propagate and fail the test.



##########
java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java:
##########
@@ -38,9 +40,13 @@
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.calcite.sql.parser.SqlParseException;
 import org.junit.After;
 import org.junit.Before;
 
+import io.substrait.isthmus.SqlToSubstrait;

Review Comment:
   If testing with Isthmus specifically is a big deal, we should consider how 
to set up specific integration tests with important third party 
dependencies/dependents.



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,100 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/Object;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_Object_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // mapping arrow::Buffer
+  jbyte *buff = (jbyte *) env->GetDirectBufferAddress(plan);

Review Comment:
   No C-style casts



##########
java/dataset/src/main/cpp/jni_util.h:
##########
@@ -46,6 +47,8 @@ std::string JStringToCString(JNIEnv* env, jstring string);
 
 std::vector<std::string> ToStringVector(JNIEnv* env, jobjectArray& str_array);
 
+std::map<std::string, long> ToMap(JNIEnv* env, jobjectArray& str_array);

Review Comment:
   This needs to be more descriptively named and needs a docstring because it 
has a very specific purpose.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * JniWrapper to Consume Substrait Plans.
+ */
+public class JniWrapper {

Review Comment:
   Make the class final, and does this need to be public?



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,100 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }

Review Comment:
   Also this seems wrong, why is there an unused input parameter, and what 
happens if a plan references the same table twice?



##########
docs/source/java/substrait.rst:
##########
@@ -0,0 +1,543 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+Cross-Language Serialization for Relational Algebra.
+
+`Substrait`_ is a well-defined, cross-language specification for data compute 
operations.
+
+.. contents::
+
+Getting Started
+===============
+
+There are two perspectives among substrait implementors:
+
+- Substrait Consumer
+- Substrait Producer
+
+Substrait is calling Acero thru JNI wrappers to Consume Substrait plans.
+
+.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero.

Review Comment:
   I think the docs need to be rewritten. Acero is introduced out of nowhere, 
and it's unclear whether this is trying to be a Substrait reference or an Arrow 
reference.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitConsumer.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Java binding of the C++ ExecuteSerializedPlan.
+ */
+public class SubstraitConsumer implements Substrait {

Review Comment:
   Also needs to be final?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/Substrait.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.util.Map;
+
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Java binding of the C++ Substrait API.

Review Comment:
   Please provide descriptive docstrings for new classes



##########
docs/source/java/substrait.rst:
##########
@@ -0,0 +1,543 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+Cross-Language Serialization for Relational Algebra.
+
+`Substrait`_ is a well-defined, cross-language specification for data compute 
operations.
+
+.. contents::
+
+Getting Started
+===============
+
+There are two perspectives among substrait implementors:
+
+- Substrait Consumer
+- Substrait Producer
+
+Substrait is calling Acero thru JNI wrappers to Consume Substrait plans.
+
+.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero.
+
+
+Substrait Consumer
+==================
+
+Substrait Plan offer two ways to define URI for Query:
+
+- Local Files: A fixed URI value on the plan
+- Named Table: An external configuration to define URI value
+
+Local Files:
+
+.. code-block:: json
+
+    "local_files": {
+      "items": [
+        {
+          "uri_file": "file:///tmp/opt/lineitem.parquet",
+          "parquet": {}
+        }
+      ]
+    }
+
+Named Table:
+
+.. code-block:: json
+
+    "namedTable": {
+        "names": ["LINEITEM"]
+    }
+
+Below shows a simplest example of using Substrait to query a Parquet file in 
Java:
+
+
+Substrait Consumer For Binary Plan
+----------------------------------
+
+Next examples uses `Isthmus`_ library to create Substrait binary plan.
+
+Substrait Consumer For Named Table

Review Comment:
   I think most of these docs should actually go in the cookbook



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitConsumer.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Java binding of the C++ ExecuteSerializedPlan.
+ */
+public class SubstraitConsumer implements Substrait {
+  private final BufferAllocator allocator;
+
+  public SubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  @Override
+  public ArrowReader runQueryLocalFiles(String plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }
+
+  @Override
+  public ArrowReader runQueryNamedTables(String plan, Map<String, ArrowReader> 
mapTableToArrowReader) {
+    List<ArrowArrayStream> listStreamInput = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = 
ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = new 
String[mapTableToArrowReader.size() * 2];
+      ArrowArrayStream streamInput;
+      int pos = 0;
+      for (Map.Entry<String, ArrowReader> entries : 
mapTableToArrowReader.entrySet()) {
+        streamInput = ArrowArrayStream.allocateNew(this.allocator);
+        listStreamInput.add(streamInput);
+        Data.exportArrayStream(this.allocator, entries.getValue(), 
streamInput);
+        mapTableToMemoryAddress[pos] = entries.getKey();
+        mapTableToMemoryAddress[pos + 1] = 
String.valueOf(streamInput.memoryAddress());
+        pos += 2;
+      }
+      JniWrapper.get().executeSerializedPlanNamedTables(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      for (ArrowArrayStream stream : listStreamInput) {
+        stream.close();
+      }
+    }
+  }
+
+  @Override
+  public ArrowReader runQueryNamedTables(Object plan, Map<String, ArrowReader> 
mapTableToArrowReader) {
+    List<ArrowArrayStream> listStreamInput = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = 
ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = new 
String[mapTableToArrowReader.size() * 2];
+      ArrowArrayStream streamInput;
+      int pos = 0;
+      for (Map.Entry<String, ArrowReader> entries : 
mapTableToArrowReader.entrySet()) {

Review Comment:
   This code is duplicated, factor out.



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,100 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }

Review Comment:
   You can just turn a reader into a table without using the scanner.



##########
docs/source/java/substrait.rst:
##########
@@ -0,0 +1,543 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+Cross-Language Serialization for Relational Algebra.
+
+`Substrait`_ is a well-defined, cross-language specification for data compute 
operations.
+
+.. contents::
+
+Getting Started
+===============
+
+There are two perspectives among substrait implementors:
+
+- Substrait Consumer
+- Substrait Producer
+
+Substrait is calling Acero thru JNI wrappers to Consume Substrait plans.

Review Comment:
   Don't use texting shorthand in docs.



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,100 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/Object;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_Object_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {

Review Comment:
   Code is duplicated



##########
docs/source/java/substrait.rst:
##########
@@ -0,0 +1,543 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+Cross-Language Serialization for Relational Algebra.
+
+`Substrait`_ is a well-defined, cross-language specification for data compute 
operations.
+
+.. contents::
+
+Getting Started
+===============
+
+There are two perspectives among substrait implementors:
+
+- Substrait Consumer
+- Substrait Producer
+
+Substrait is calling Acero thru JNI wrappers to Consume Substrait plans.
+
+.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero.
+
+
+Substrait Consumer
+==================
+
+Substrait Plan offer two ways to define URI for Query:
+
+- Local Files: A fixed URI value on the plan
+- Named Table: An external configuration to define URI value
+
+Local Files:
+
+.. code-block:: json
+
+    "local_files": {
+      "items": [
+        {
+          "uri_file": "file:///tmp/opt/lineitem.parquet",
+          "parquet": {}
+        }
+      ]
+    }
+
+Named Table:
+
+.. code-block:: json
+
+    "namedTable": {
+        "names": ["LINEITEM"]
+    }
+
+Below shows a simplest example of using Substrait to query a Parquet file in 
Java:
+
+
+Substrait Consumer For Binary Plan
+----------------------------------
+
+Next examples uses `Isthmus`_ library to create Substrait binary plan.
+
+Substrait Consumer For Named Table
+++++++++++++++++++++++++++++++++++
+
+Query one table. Nation TPCH table:
+
+.. code-block:: Java
+
+    // Query: SELECT * from nation
+    String uri = "file:///data/tpch_parquet/nation.parquet";
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, uri);
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      // map table to reader
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", reader);
+      // get binary plan
+      String sql = "SELECT * from nation";
+      String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, 
N_NAME CHAR(25), " +
+          "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+      Plan plan = getPlan(sql, ImmutableList.of(nation));
+      ByteBuffer substraitPlan = 
ByteBuffer.allocateDirect(plan.toByteArray().length);
+      substraitPlan.put(plan.toByteArray());
+      // run query
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          substraitPlan,
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("MOROCCO"));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+.. code-block:: text
+
+    // Results example:
+    FieldPath(0)       FieldPath(1)    FieldPath(2)    FieldPath(3)
+    0  ALGERIA 0        haggle. carefully final deposits detect slyly agai
+    1  ARGENTINA       1       al foxes promise slyly according to the regular 
accounts. bold requests alon
+
+Query two tables. Nation and Customer TPCH tables:
+
+.. code-block:: Java
+
+    String uriNation = "file:///data/tpch_parquet/nation.parquet";
+    String uriCustomer = "file:///data/tpch_parquet/customer.parquet";
+    ScanOptions optionsNations = new ScanOptions(/*batchSize*/ 32768);
+    ScanOptions optionsCustomer = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, uriNation);
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(optionsNations);
+        ArrowReader readerNation = scanner.scanBatches();
+        DatasetFactory datasetFactoryCustomer = new 
FileSystemDatasetFactory(rootAllocator(),
+            NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriCustomer);
+        Dataset datasetCustomer = datasetFactoryCustomer.finish();
+        Scanner scannerCustomer = datasetCustomer.newScan(optionsCustomer);
+        ArrowReader readerCustomer = scannerCustomer.scanBatches()
+    ) {
+      // map table to reader
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", readerNation);
+      mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+      // get binary plan
+      String sql = "SELECT n.n_name, c.c_name, c.c_phone, c.c_address FROM 
nation n JOIN customer c " +
+          "ON n.n_nationkey = c.c_nationkey";
+      String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, 
N_NAME CHAR(25), " +
+          "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+      String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, 
C_NAME VARCHAR(25), " +
+          "C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, C_PHONE 
CHAR(15), C_ACCTBAL DECIMAL, " +
+          "C_MKTSEGMENT CHAR(10), C_COMMENT VARCHAR(117) )";
+      Plan plan = getPlan(sql, ImmutableList.of(nation, customer));
+      ByteBuffer substraitPlan = 
ByteBuffer.allocateDirect(plan.toByteArray().length);
+      substraitPlan.put(plan.toByteArray());
+      // run query
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          substraitPlan,
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 15000);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("Customer#000014924"));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+Substrait Consumer For JSON Plan
+--------------------------------
+
+In the following example, we use plain text Substrait plans to illustrate the 
process.
+For production environments, it is recommended to use binary plans.
+
+Substrait Consumer for Local Files
+++++++++++++++++++++++++++++++++++
+
+.. code-block:: Java
+
+    String substraitPlan = "{...json Substrait plan...}";
+    try (ArrowReader reader = new SubstraitConsumer(rootAllocator()).
+                                  runQueryLocalFiles(
+                                    substraitPlan
+                                  )
+    ) {
+      while (reader.loadNextBatch()) {
+        System.out.println(reader.getVectorSchemaRoot().contentToTSVString());
+      }
+    }
+
+
+Substrait Consumer For Named Table
+++++++++++++++++++++++++++++++++++
+
+Query one table. Nation TPCH table:
+
+.. code-block:: Java
+
+    // Query: SELECT * from nation
+    String uri = "file:///data/tpch_parquet/nation.parquet";
+    String substraitPlan = "{...json Substrait plan...}";
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), 
FileFormat.PARQUET, uri);
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapReaderToTable = new HashMap<>();
+      mapReaderToTable.put("NATION", reader);
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          substraitPlan,
+          mapReaderToTable
+      )){
+        while(arrowReader.loadNextBatch()){
+          
System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
+          System.out.println(arrowReader.getVectorSchemaRoot().getRowCount());
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+.. code-block:: text
+
+    // Results example:
+    FieldPath(0)       FieldPath(1)    FieldPath(2)    FieldPath(3)
+    0  ALGERIA 0        haggle. carefully final deposits detect slyly agai
+    1  ARGENTINA       1       al foxes promise slyly according to the regular 
accounts. bold requests alon
+
+Query two tables. Nation and Customer TPCH tables:
+
+.. code-block:: Java
+
+    // Query: SELECT n.n_name, c.c_name, c.c_phone, c.c_address FROM nation n 
JOIN customer c ON n.n_nationkey = c.c_nationkey (defined below)
+    String uriNation = "file:///data/tpch_parquet/nation.parquet";
+    String uriCustomer = "file:///data/tpch_parquet/customer.parquet";
+    String substraitPlan = "{...json Substrait plan...}";
+    ScanOptions optionsNations = new ScanOptions(/*batchSize*/ 32768);
+    ScanOptions optionsCustomer = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), 
FileFormat.PARQUET, uriNation);
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(optionsNations);
+        ArrowReader readerNation = scanner.scanBatches();
+        DatasetFactory datasetFactoryCustomer = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), 
FileFormat.PARQUET, uriCustomer);
+        Dataset datasetCustomer = datasetFactoryCustomer.finish();
+        Scanner scannerCustomer = datasetCustomer.newScan(optionsCustomer);
+        ArrowReader readerCustomer = scannerCustomer.scanBatches();
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", readerNation);
+      mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          substraitPlan,
+          mapTableToArrowReader
+      )){
+        while(arrowReader.loadNextBatch()){
+          
System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
+          System.out.println(arrowReader.getVectorSchemaRoot().getRowCount());
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+.. code-block:: text
+
+    // Results example:
+    FieldPath(1)       FieldPath(5)    FieldPath(8)    FieldPath(6)
+    ALGERIA    Customer#000014977      10-901-414-3869 ARaV3SU4TwhxUf
+    ALGERIA    Customer#000014975      10-318-218-3381 BzV ELDsdtukkrRf5fQ
+
+Substrait Producer
+==================
+
+Let start producing Substrait plan by `Java Substrait`_ thru `Isthmus`_ by
+CLI command:
+
+.. code-block:: sql
+
+    # schema.sql
+    CREATE TABLE CUSTOMER (
+      C_CUSTKEY BIGINT NOT NULL,
+      C_NAME VARCHAR(25),
+      C_ADDRESS VARCHAR(40),
+      C_NATIONKEY BIGINT NOT NULL,
+      C_PHONE CHAR(15),
+      C_ACCTBAL DECIMAL,
+      C_MKTSEGMENT CHAR(10),
+      C_COMMENT VARCHAR(117)
+    );
+    CREATE TABLE NATION (
+      N_NATIONKEY BIGINT NOT NULL,
+      N_NAME CHAR(25),
+      N_REGIONKEY BIGINT NOT NULL,
+      N_COMMENT VARCHAR(152)
+    );
+
+.. code-block:: bash
+
+    # define Schema DDL, Query, download Isthmus and run CLI command.
+    DDL=`cat schema.sql`
+    QUERY="SELECT n.n_name, c.c_name, c.c_phone, c.c_address FROM nation n 
JOIN customer c ON n.n_nationkey = c.c_nationkey"
+    ./isthmus-macOS-0.6.0 "${QUERY}" --create "${DDL}"
+
+Then, the following Susbtrait Plan will be generated:
+
+.. code-block:: json

Review Comment:
   I don't think it helps to show the plan here. It's very long, and again, is 
this meant to be a Substrait tutorial?



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,100 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/Object;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_Object_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // mapping arrow::Buffer
+  jbyte *buff = (jbyte *) env->GetDirectBufferAddress(plan);
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = 
arrow::AllocateBuffer(length).ValueOrDie();
+  memcpy(buffer->mutable_data(), buff, length);

Review Comment:
   std::memcpy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to