lidavidm commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1142020476


##########
java/dataset/src/test/resources/substrait/plan/named_table_nation.binary:
##########


Review Comment:
   Binary files should be moved to arrow-testing



##########
java/dataset/src/main/cpp/jni_util.h:
##########
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <jni.h>
+#include <map>

Review Comment:
   ```suggestion
   #include <unordered_map>
   ```



##########
java/dataset/src/main/cpp/jni_util.h:
##########
@@ -46,6 +47,8 @@ std::string JStringToCString(JNIEnv* env, jstring string);
 
 std::vector<std::string> ToStringVector(JNIEnv* env, jobjectArray& str_array);
 
+std::unordered_map<std::string, long> ToMapTableToArrowReader(JNIEnv* env, 
jobjectArray& str_array);

Review Comment:
   This needs a docstring because its purpose is not obvious



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitConsumer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class SubstraitConsumer {

Review Comment:
   Probably name this to make it clear that it is Acero-based?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitConsumer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class SubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public SubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Local files can be recovered using URI items defined in the Substrait 
plan.
+   *
+   * @param plan the JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQueryLocalFiles(String plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan                  the JSON Substrait plan.
+   * @param mapTableToArrowReader the mapping name of Tables Name and theirs 
ArrowReader representation.

Review Comment:
   ```suggestion
      * @param namedTables A mapping of named tables referenced by the plan to 
an ArrowReader providing the data for the table
   ```



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitConsumer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class SubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public SubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Local files can be recovered using URI items defined in the Substrait 
plan.
+   *
+   * @param plan the JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQueryLocalFiles(String plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan                  the JSON Substrait plan.
+   * @param mapTableToArrowReader the mapping name of Tables Name and theirs 
ArrowReader representation.
+   *                              Contains the Table Name to Query as a Key 
and ArrowReader as a Value.
+   * <pre>{@code
+   * public class Client {

Review Comment:
   The code isn't valid anyways, so why wrap it in a class? (That goes for all 
the docstrings here.)



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestSubstraitConsumer.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSubstraitConsumer extends TestDataset {
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    try (ArrowReader arrowReader = new SubstraitConsumer(rootAllocator())
+        .runQueryLocalFiles(
+            planReplaceLocalFileURI(
+                getSubstraitPlan("local_files_binary.json"),
+                getNamedTableUri("binary.parquet")
+            )
+        )
+    ) {
+      while (arrowReader.loadNextBatch()) {
+        assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 12);
+      }

Review Comment:
   Seems like a brittle test in many ways (what if the reader is empty?).
   
   Can we instead validate the result schema and total row count?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitConsumer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class SubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public SubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Local files can be recovered using URI items defined in the Substrait 
plan.
+   *
+   * @param plan the JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQueryLocalFiles(String plan) {

Review Comment:
   Do we really need the local files vs named table distinction? You can just 
provide an empty map if there are no named tables in the plan, right?



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestTpchSubstraitConsumer.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+public class TestTpchSubstraitConsumer extends TestDataset {
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryNamedTableTpch01() throws Exception {
+    // Query: Go to src/test/resources/substrait/tpch/sql/01.sql
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("lineitem.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapReaderToTable = new HashMap<>();
+      mapReaderToTable.put("LINEITEM", reader);
+      Assertions.assertThrows(RuntimeException.class, () -> {
+        try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+            getSubstraitTpchPlan("01.json"),
+            mapReaderToTable
+        )) {
+          while (arrowReader.loadNextBatch()) {
+          }
+        }
+      }, "conversion to arrow::compute::Declaration from Substrait relation 
sort");

Review Comment:
   If we can't meaningfully run the TPCH queries, maybe just don't include them 
in the first place?



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,92 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, long> map_table_to_memory_address = 
ToMapTableToArrowReader(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      output_table = JniGetOrThrow(readerIn->ToTable());

Review Comment:
   FYI, this will throw an exception across an Arrow stack frame...I would 
rather we do these conversions up front



##########
java/dataset/src/main/cpp/jni_util.h:
##########
@@ -46,6 +47,8 @@ std::string JStringToCString(JNIEnv* env, jstring string);
 
 std::vector<std::string> ToStringVector(JNIEnv* env, jobjectArray& str_array);
 
+std::unordered_map<std::string, long> ToMapTableToArrowReader(JNIEnv* env, 
jobjectArray& str_array);

Review Comment:
   IMO, instead of storing long, we should just import all of the readers in 
this function, too



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestSubstraitConsumer.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSubstraitConsumer extends TestDataset {
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {

Review Comment:
   You shouldn't be using RootAllocator as a return type or field type, 
generally



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitConsumer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class SubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public SubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Local files can be recovered using URI items defined in the Substrait 
plan.
+   *
+   * @param plan the JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQueryLocalFiles(String plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan                  the JSON Substrait plan.
+   * @param mapTableToArrowReader the mapping name of Tables Name and theirs 
ArrowReader representation.
+   *                              Contains the Table Name to Query as a Key 
and ArrowReader as a Value.
+   * <pre>{@code
+   * public class Client {
+   *    ArrowReader nationReader = scanner.scanBatches();
+   *    Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+   *    mapTableToArrowReader.put("NATION", nationReader);
+   * }
+   * }
+   * </pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQueryNamedTables(String plan, Map<String, ArrowReader> 
mapTableToArrowReader) {
+    List<ArrowArrayStream> listStreamInput = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = 
ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = 
getMapTableToMemoryAddress(mapTableToArrowReader, listStreamInput);
+      JniWrapper.get().executeSerializedPlanNamedTables(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      for (ArrowArrayStream stream : listStreamInput) {
+        stream.close();

Review Comment:
   Use AutoCloseables



##########
java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java:
##########
@@ -113,4 +116,60 @@ protected <T> Stream<T> stream(Iterator<T> iterator) {
   protected <T> List<T> collect(Iterator<T> iterator) {
     return stream(iterator).collect(Collectors.toList());
   }
+
+  protected String getNamedTableUri(String name) {
+    return Paths.get(
+        Paths.get(
+            
(Paths.get("").toAbsolutePath().getParent().getParent().toString()),

Review Comment:
   Why is this path different than all the others?



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestSubstraitConsumer.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSubstraitConsumer extends TestDataset {
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    try (ArrowReader arrowReader = new SubstraitConsumer(rootAllocator())
+        .runQueryLocalFiles(
+            planReplaceLocalFileURI(
+                getSubstraitPlan("local_files_binary.json"),
+                getNamedTableUri("binary.parquet")
+            )
+        )
+    ) {
+      while (arrowReader.loadNextBatch()) {
+        assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 12);
+      }
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNation() throws Exception {
+    // Query: SELECT * from nation
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("nation.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", reader);
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          getSubstraitPlan("named_table_nation.json"),
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("MOROCCO"));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNationAndCustomer() throws Exception {
+    // Query:
+    // SELECT n.n_name, c.c_name, c.c_phone, c.c_address FROM nation n JOIN 
customer c ON n.n_nationkey = c.c_nationkey
+    ScanOptions optionsNations = new ScanOptions(/*batchSize*/ 32768);
+    ScanOptions optionsCustomer = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("nation.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(optionsNations);
+        ArrowReader readerNation = scanner.scanBatches();
+        DatasetFactory datasetFactoryCustomer = new 
FileSystemDatasetFactory(rootAllocator(),
+            NativeMemoryPool.getDefault(), FileFormat.PARQUET, 
getNamedTableUri("customer.parquet"));
+        Dataset datasetCustomer = datasetFactoryCustomer.finish();
+        Scanner scannerCustomer = datasetCustomer.newScan(optionsCustomer);
+        ArrowReader readerCustomer = scannerCustomer.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", readerNation);
+      mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          getSubstraitPlan("named_table_nation_customer.json"),
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 15000);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("Customer#000014924"));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRunBinaryQueryNamedTableNation() throws Exception {
+    // Query: SELECT * from nation
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("nation.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      // map table to reader
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", reader);
+      // get binary plan
+      byte[] plan = getBinarySubstraitPlan("named_table_nation.binary");
+      ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.length);
+      substraitPlan.put(plan);
+      // run query
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          substraitPlan,
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("MOROCCO"));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRunBinaryQueryNamedTableNationAndCustomer() throws Exception 
{
+    // Query:
+    // SELECT n.n_name, c.c_name, c.c_phone, c.c_address FROM nation n JOIN 
customer c ON n.n_nationkey = c.c_nationkey
+    ScanOptions optionsNations = new ScanOptions(/*batchSize*/ 32768);
+    ScanOptions optionsCustomer = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, getNamedTableUri("nation.parquet"));
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(optionsNations);
+        ArrowReader readerNation = scanner.scanBatches();
+        DatasetFactory datasetFactoryCustomer = new 
FileSystemDatasetFactory(rootAllocator(),
+            NativeMemoryPool.getDefault(), FileFormat.PARQUET, 
getNamedTableUri("customer.parquet"));
+        Dataset datasetCustomer = datasetFactoryCustomer.finish();
+        Scanner scannerCustomer = datasetCustomer.newScan(optionsCustomer);
+        ArrowReader readerCustomer = scannerCustomer.scanBatches()
+    ) {
+      // map table to reader
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", readerNation);
+      mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+      // get binary plan
+      byte[] plan = 
getBinarySubstraitPlan("named_table_nation_customer.binary");
+      ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.length);
+      substraitPlan.put(plan);
+      // run query
+      try (ArrowReader arrowReader = new 
SubstraitConsumer(rootAllocator()).runQueryNamedTables(
+          substraitPlan,
+          mapTableToArrowReader
+      )) {
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 15000);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("Customer#000014924"));
+        }
+      }
+    }
+  }
+
+  private static String planReplaceLocalFileURI(String plan, String uri) 
throws IOException {

Review Comment:
   Why does this throw IOException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to