lidavidm commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1164766213


##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {

Review Comment:
   Why do we need this? Also you should never need to use RootAllocator as a 
concrete type. Just use BufferAllocator.



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +629,96 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/nio/ByteBuffer;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J
 (
+    JNIEnv* env, jobject, jobject plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  // mapping arrow::Buffer
+  auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::AllocateBuffer(length));
+  std::memcpy(buffer->mutable_data(), buff, length);
+  // execute plan
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table = GetTableByName(names, 
map_table_to_reader);
+    std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+      
std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::acero::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {

Review Comment:
   This code doesn't seem to have been formatted



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +629,96 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/nio/ByteBuffer;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J
 (
+    JNIEnv* env, jobject, jobject plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  // mapping arrow::Buffer
+  auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::AllocateBuffer(length));
+  std::memcpy(buffer->mutable_data(), buff, length);
+  // execute plan
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table = GetTableByName(names, 
map_table_to_reader);
+    std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+      
std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::acero::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {

Review Comment:
   https://github.com/apache/arrow/issues/35093



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +629,96 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/nio/ByteBuffer;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J
 (
+    JNIEnv* env, jobject, jobject plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  // mapping arrow::Buffer
+  auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::AllocateBuffer(length));
+  std::memcpy(buffer->mutable_data(), buff, length);
+  // execute plan
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table = GetTableByName(names, 
map_table_to_reader);
+    std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+      
std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::acero::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {

Review Comment:
   Hmm, I suppose we don't run the formatter on JNI code?



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, 
"c");
+    try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator())
+        .runQuery(
+            planReplaceLocalFileURI(
+                new 
String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+                    
.getResource("substrait/local_files_users.json").toURI()))),
+                writeSupport.getOutputURI()
+            )
+        )
+    ) {
+      assertEquals("Schema<ID: Int(32, true), NAME: Utf8>",
+          arrowReader.getVectorSchemaRoot().getSchema().toString());
+      int rowcount = 0;
+      while (arrowReader.loadNextBatch()) {
+        rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNation() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, 
"c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS", reader);
+      try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(rootAllocator()).runQuery(
+          new 
String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+              .getResource("substrait/named_table_users.json").toURI()))),
+          mapTableToArrowReader
+      )) {
+        assertEquals("Schema<ID: Int(32, true), NAME: Utf8>",
+            arrowReader.getVectorSchemaRoot().getSchema().toString());
+        int rowcount = 0;
+        while (arrowReader.loadNextBatch()) {
+          rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowcount);
+      }
+    }
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testRunQueryNamedTableNationWithException() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        Field.nullable("NAME", new ArrowType.Utf8())
+    ));
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, 
"c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS_INVALID_MAP", reader);
+      try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(rootAllocator()).runQuery(
+          new 
String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+              .getResource("substrait/named_table_users.json").toURI()))),
+          mapTableToArrowReader
+      )) {
+        assertEquals("Schema<ID: Int(32, true), NAME: Utf8>",

Review Comment:
   Don't assert on the string representation...



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +629,96 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/nio/ByteBuffer;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J
 (
+    JNIEnv* env, jobject, jobject plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  // mapping arrow::Buffer
+  auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::AllocateBuffer(length));
+  std::memcpy(buffer->mutable_data(), buff, length);
+  // execute plan
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table = GetTableByName(names, 
map_table_to_reader);
+    std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+      
std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::acero::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanNamedTables
+ * Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_reader = LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_reader](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table = GetTableByName(names, 
map_table_to_reader);
+    std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+      
std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::acero::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // mapping arrow::Buffer
+  auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::AllocateBuffer(length));
+  std::memcpy(buffer->mutable_data(), buff, length);
+  // execute plan
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, 
conversion_options));

Review Comment:
   nit: follow C++ conventions here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to