lidavidm commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1151988130


##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * Class that contains Native methods to call Acero C++ Substrait API. It 
internally depends on C++ function
+ * arrow::engine::ExecuteSerializedPlan. Currently supported input parameters 
supported are:
+ * <pre>
+ * - arrow::Buffer: Susbtrait Plan (JSON or Binary format).
+ * - arrow::engine::ConversionOptions: Mapping for 
arrow::engine::NamedTableProvider.
+ * </pre>
+ */
+final class JniWrapper {
+  private static final JniWrapper INSTANCE = new JniWrapper();
+
+  private JniWrapper() {
+  }
+
+  public static JniWrapper get() {
+    JniLoader.get().ensureLoaded();
+    return INSTANCE;
+  }
+
+  /**
+   * Consume the JSON Substrait Plan that contains Local Files and export the 
RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the JSON Substrait plan.
+   * @param memoryAddressOutput the memory address where RecordBatchReader is 
exported.
+   */
+  public native void executeSerializedPlanLocalFiles(String planInput, long 
memoryAddressOutput);

Review Comment:
   Again, I still don't understand why we need four methods. We should only 
need two.



##########
java/dataset/src/test/resources/substrait/nation.parquet:
##########


Review Comment:
   Binary files still shouldn't be checked in. You can generate these from 
within the test.



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    // Query: SELECT * from nation
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("N_NATIONKEY", new ArrowType.Int(64, true)),
+        Field.nullable("N_NAME", new ArrowType.FixedSizeBinary(25)),
+        Field.nullable("N_REGIONKEY", new ArrowType.Int(64, true)),
+        Field.nullable("N_COMMENT", new ArrowType.Utf8())
+    ));
+    try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator())
+        .runQuery(
+            planReplaceLocalFileURI(
+                localTableJsonPlan,
+                TestAceroSubstraitConsumer.class.getClassLoader()
+                    .getResource("substrait/nation.parquet").toURI().toString()
+            ),
+            Collections.EMPTY_MAP
+        )
+    ) {
+      assertEquals(schema.toString(), 
arrowReader.getVectorSchemaRoot().getSchema().toString());

Review Comment:
   Don't compare string representations.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a 
Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> 
namedTables) {
+    if (namedTables.isEmpty()) {
+      return getArrowReader(plan);
+    } else {
+      return getArrowReader(plan, namedTables);
+    }
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                              for the table. Contains the Table Name to 
Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> 
namedTables) {
+    if (namedTables.isEmpty()) {
+      return getArrowReader(plan);
+    } else {
+      return getArrowReader(plan, namedTables);
+    }
+  }
+
+  private ArrowReader getArrowReader(String plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }
+
+  private ArrowReader getArrowReader(ByteBuffer plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }

Review Comment:
   I still don't see why we need this. What is the problem with executing a 
plan with an empty set of named tables?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a 
Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> 
namedTables) {
+    if (namedTables.isEmpty()) {
+      return getArrowReader(plan);
+    } else {
+      return getArrowReader(plan, namedTables);
+    }
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                              for the table. Contains the Table Name to 
Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> 
namedTables) {
+    if (namedTables.isEmpty()) {
+      return getArrowReader(plan);
+    } else {
+      return getArrowReader(plan, namedTables);
+    }
+  }
+
+  private ArrowReader getArrowReader(String plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }
+
+  private ArrowReader getArrowReader(ByteBuffer plan) {
+    try (ArrowArrayStream arrowArrayStream = 
ArrowArrayStream.allocateNew(this.allocator)) {
+      JniWrapper.get().executeSerializedPlanLocalFiles(plan, 
arrowArrayStream.memoryAddress());
+      return Data.importArrayStream(this.allocator, arrowArrayStream);
+    }
+  }
+
+  private ArrowReader getArrowReader(String plan, Map<String, ArrowReader> 
namedTables) {
+    List<ArrowArrayStream> listStreamInput = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = 
ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = 
getMapTableToMemoryAddress(namedTables, listStreamInput);
+      JniWrapper.get().executeSerializedPlanNamedTables(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      for (ArrowArrayStream stream : listStreamInput) {

Review Comment:
   Use AutoCloseables.



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    // Query: SELECT * from nation
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("N_NATIONKEY", new ArrowType.Int(64, true)),
+        Field.nullable("N_NAME", new ArrowType.FixedSizeBinary(25)),
+        Field.nullable("N_REGIONKEY", new ArrowType.Int(64, true)),
+        Field.nullable("N_COMMENT", new ArrowType.Utf8())
+    ));
+    try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator())
+        .runQuery(
+            planReplaceLocalFileURI(
+                localTableJsonPlan,
+                TestAceroSubstraitConsumer.class.getClassLoader()
+                    .getResource("substrait/nation.parquet").toURI().toString()
+            ),
+            Collections.EMPTY_MAP
+        )
+    ) {
+      assertEquals(schema.toString(), 
arrowReader.getVectorSchemaRoot().getSchema().toString());
+      while (arrowReader.loadNextBatch()) {
+        assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
+      }
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNation() throws Exception {
+    // Query: SELECT * from nation
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("N_NATIONKEY", new ArrowType.Int(64, true)),
+        Field.nullable("N_NAME", new ArrowType.FixedSizeBinary(25)),
+        Field.nullable("N_REGIONKEY", new ArrowType.Int(64, true)),
+        Field.nullable("N_COMMENT", new ArrowType.Utf8())
+    ));
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, 
TestAceroSubstraitConsumer.class.getClassLoader()
+            .getResource("substrait/nation.parquet").toURI().toString());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("NATION", reader);
+      try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(rootAllocator()).runQuery(
+          namedTableJsonPlan,
+          mapTableToArrowReader
+      )) {
+        assertEquals(schema.toString(), 
arrowReader.getVectorSchemaRoot().getSchema().toString());
+        while (arrowReader.loadNextBatch()) {
+          assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
+          
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("MOROCCO"));
+        }
+      }
+    }
+  }
+
+  private static String planReplaceLocalFileURI(String plan, String uri) {
+    StringBuilder builder = new StringBuilder(plan);
+    builder.replace(builder.indexOf("FILENAME_PLACEHOLDER"),
+        builder.indexOf("FILENAME_PLACEHOLDER") + 
"FILENAME_PLACEHOLDER".length(), uri);
+    return builder.toString();
+  }
+
+  final String localTableJsonPlan = "" +
+      "{\n" +
+      "  \"extensionUris\": [],\n" +
+      "  \"extensions\": [],\n" +
+      "  \"relations\": [{\n" +
+      "    \"root\": {\n" +
+      "      \"input\": {\n" +
+      "        \"project\": {\n" +
+      "          \"common\": {\n" +
+      "            \"emit\": {\n" +
+      "              \"outputMapping\": [4, 5, 6, 7]\n" +
+      "            }\n" +
+      "          },\n" +
+      "          \"input\": {\n" +
+      "            \"read\": {\n" +
+      "              \"common\": {\n" +
+      "                \"direct\": {\n" +
+      "                }\n" +
+      "              },\n" +
+      "              \"baseSchema\": {\n" +
+      "                \"names\": [\"N_NATIONKEY\", \"N_NAME\", 
\"N_REGIONKEY\", \"N_COMMENT\"],\n" +
+      "                \"struct\": {\n" +
+      "                  \"types\": [{\n" +
+      "                    \"i64\": {\n" +
+      "                      \"typeVariationReference\": 0,\n" +
+      "                      \"nullability\": \"NULLABILITY_REQUIRED\"\n" +
+      "                    }\n" +
+      "                  }, {\n" +
+      "                    \"fixedChar\": {\n" +
+      "                      \"length\": 25,\n" +
+      "                      \"typeVariationReference\": 0,\n" +
+      "                      \"nullability\": \"NULLABILITY_NULLABLE\"\n" +
+      "                    }\n" +
+      "                  }, {\n" +
+      "                    \"i64\": {\n" +
+      "                      \"typeVariationReference\": 0,\n" +
+      "                      \"nullability\": \"NULLABILITY_REQUIRED\"\n" +
+      "                    }\n" +
+      "                  }, {\n" +
+      "                    \"varchar\": {\n" +
+      "                      \"length\": 152,\n" +
+      "                      \"typeVariationReference\": 0,\n" +
+      "                      \"nullability\": \"NULLABILITY_NULLABLE\"\n" +
+      "                    }\n" +
+      "                  }],\n" +
+      "                  \"typeVariationReference\": 0,\n" +
+      "                  \"nullability\": \"NULLABILITY_REQUIRED\"\n" +
+      "                }\n" +
+      "              },\n" +
+      "              \"local_files\": {\n" +
+      "                \"items\": [\n" +
+      "                  {\n" +
+      "                    \"uri_file\": \"FILENAME_PLACEHOLDER\",\n" +
+      "                    \"parquet\": {}\n" +
+      "                  }\n" +
+      "                ]\n" +
+      "              }\n" +
+      "            }\n" +
+      "          },\n" +
+      "          \"expressions\": [{\n" +
+      "            \"selection\": {\n" +
+      "              \"directReference\": {\n" +
+      "                \"structField\": {\n" +
+      "                  \"field\": 0\n" +
+      "                }\n" +
+      "              },\n" +
+      "              \"rootReference\": {\n" +
+      "              }\n" +
+      "            }\n" +
+      "          }, {\n" +
+      "            \"selection\": {\n" +
+      "              \"directReference\": {\n" +
+      "                \"structField\": {\n" +
+      "                  \"field\": 1\n" +
+      "                }\n" +
+      "              },\n" +
+      "              \"rootReference\": {\n" +
+      "              }\n" +
+      "            }\n" +
+      "          }, {\n" +
+      "            \"selection\": {\n" +
+      "              \"directReference\": {\n" +
+      "                \"structField\": {\n" +
+      "                  \"field\": 2\n" +
+      "                }\n" +
+      "              },\n" +
+      "              \"rootReference\": {\n" +
+      "              }\n" +
+      "            }\n" +
+      "          }, {\n" +
+      "            \"selection\": {\n" +
+      "              \"directReference\": {\n" +
+      "                \"structField\": {\n" +
+      "                  \"field\": 3\n" +
+      "                }\n" +
+      "              },\n" +
+      "              \"rootReference\": {\n" +
+      "              }\n" +
+      "            }\n" +
+      "          }]\n" +
+      "        }\n" +
+      "      },\n" +
+      "      \"names\": [\"N_NATIONKEY\", \"N_NAME\", \"N_REGIONKEY\", 
\"N_COMMENT\"]\n" +
+      "    }\n" +
+      "  }],\n" +
+      "  \"expectedTypeUrls\": []\n" +
+      "}" +
+      "";
+  
+  final String namedTableJsonPlan = "" +

Review Comment:
   Non-binary test files can still be stored as files/accessed via resources 
instead of embedding them like this.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a 
Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> 
namedTables) {
+    if (namedTables.isEmpty()) {
+      return getArrowReader(plan);
+    } else {
+      return getArrowReader(plan, namedTables);
+    }
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader 
representation.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                              for the table. Contains the Table Name to 
Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> 
namedTables) {

Review Comment:
   And on the contrary, here it's perfectly fine to offer overloads that don't 
have the namedTables parameter. But they can just delegate to the primary 
overload with `Collections.emptyMap()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to