lidavidm commented on code in PR #34227: URL: https://github.com/apache/arrow/pull/34227#discussion_r1154460725
########## docs/source/java/substrait.rst: ########## @@ -0,0 +1,149 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +Java Substrait offer capabilities to ``Query`` data received as a Susbtrait +Plan (`plain or binary format`). + +During this process, Substrait plans are read, executed, and ArrowReaders are +returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains +``Local Files`` the URI per table are defined in the Substrait plan, different +than ``Named Tables`` where is needed to define a mapping name of tables +and theirs ArrowReader representation. + +.. contents:: + +Getting Started +=============== + +Java Substrait API uses Acero C++ Substrait API capabilities thru JNI wrappers. Review Comment: This section is not useful. Replace it with installation instructions or remove it. ########## docs/source/java/substrait.rst: ########## @@ -0,0 +1,149 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +Java Substrait offer capabilities to ``Query`` data received as a Susbtrait +Plan (`plain or binary format`). + +During this process, Substrait plans are read, executed, and ArrowReaders are +returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains +``Local Files`` the URI per table are defined in the Substrait plan, different +than ``Named Tables`` where is needed to define a mapping name of tables +and theirs ArrowReader representation. + +.. contents:: + +Getting Started +=============== + +Java Substrait API uses Acero C++ Substrait API capabilities thru JNI wrappers. + +.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero. + +Substrait Consumer +================== Review Comment: ```suggestion Executing Substrait Plans ========================= ``` ########## docs/source/java/substrait.rst: ########## @@ -0,0 +1,149 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +Java Substrait offer capabilities to ``Query`` data received as a Susbtrait +Plan (`plain or binary format`). + +During this process, Substrait plans are read, executed, and ArrowReaders are +returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains +``Local Files`` the URI per table are defined in the Substrait plan, different +than ``Named Tables`` where is needed to define a mapping name of tables +and theirs ArrowReader representation. + +.. contents:: + +Getting Started +=============== + +Java Substrait API uses Acero C++ Substrait API capabilities thru JNI wrappers. + +.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero. + +Substrait Consumer +================== + +Substrait Plan offer two ways to define URI for Query data: + +- Local Files: A fixed URI value on the plan +- Named Table: An external configuration to define URI value Review Comment: ```suggestion Plans can reference data in files via URIs, or "named tables" that must be provided along with the plan. ``` ########## java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.file.FileSystemDatasetFactory; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.Types; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAceroSubstraitConsumer extends TestDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + public static final String AVRO_SCHEMA_USER = "user.avsc"; + private RootAllocator allocator = null; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + protected RootAllocator rootAllocator() { + return allocator; + } + + @Test + public void testRunQueryLocalFiles() throws Exception { + //Query: + //SELECT id, name FROM Users + //Isthmus: + //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users" + ParquetWriteSupport writeSupport = ParquetWriteSupport + .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c"); + try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()) + .runQuery( + planReplaceLocalFileURI( + new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader() + .getResource("substrait/local_files_users.json").toURI()))), + writeSupport.getOutputURI() + ), + Collections.EMPTY_MAP + ) + ) { + while (arrowReader.loadNextBatch()) { + assertEquals(3, arrowReader.getVectorSchemaRoot().getRowCount()); + assertEquals(2, arrowReader.getVectorSchemaRoot().getSchema().getFields().size()); Review Comment: You can just assertEquals on two schemas. ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -261,6 +264,26 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { default_memory_pool_id = -1L; } +/// Iterate over an object array of Tables Name on position `i` and theirs +/// Memory Address representation on `i+1` position linearly. +/// Return a mapping of the Table Name to Query as a Key and a RecordBatchReader +/// as a Value. Review Comment: ```suggestion /// Unpack the named tables passed through JNI. /// /// Named tables are encoded as a string array, where every two elements /// encode (1) the table name and (2) the address of an ArrowArrayStream /// containing the table data. This function will eagerly read all /// tables into Tables. ``` ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -261,6 +264,26 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { default_memory_pool_id = -1L; } +/// Iterate over an object array of Tables Name on position `i` and theirs +/// Memory Address representation on `i+1` position linearly. +/// Return a mapping of the Table Name to Query as a Key and a RecordBatchReader +/// as a Value. +std::unordered_map<std::string, std::shared_ptr<arrow::Table>> ToMapTableToArrowReader(JNIEnv* env, jobjectArray& str_array) { Review Comment: ```suggestion std::unordered_map<std::string, std::shared_ptr<arrow::Table>> LoadNamedTables(JNIEnv* env, jobjectArray& str_array) { ``` ########## docs/source/java/substrait.rst: ########## @@ -0,0 +1,149 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +Java Substrait offer capabilities to ``Query`` data received as a Susbtrait +Plan (`plain or binary format`). + +During this process, Substrait plans are read, executed, and ArrowReaders are +returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains +``Local Files`` the URI per table are defined in the Substrait plan, different +than ``Named Tables`` where is needed to define a mapping name of tables +and theirs ArrowReader representation. Review Comment: ```suggestion The ``arrow-dataset`` module can execute Substrait_ plans via the Acero_ query engine. ``` and provide a link to substrait.io and to Acero. ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -578,3 +601,102 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() } + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan))); + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/nio/ByteBuffer;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J ( + JNIEnv* env, jobject, jobject plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + // mapping arrow::Buffer + auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan)); + int length = env->GetDirectBufferCapacity(plan); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length)); + std::memcpy(buffer->mutable_data(), buff, length); + // execute plan + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanNamedTables + * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader = ToMapTableToArrowReader(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) { + std::shared_ptr<arrow::Table> output_table; + for (const auto& name : names) { + output_table = map_table_to_reader[name]; Review Comment: This also needs to be unit tested. ########## docs/source/java/substrait.rst: ########## @@ -0,0 +1,149 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +Java Substrait offer capabilities to ``Query`` data received as a Susbtrait +Plan (`plain or binary format`). + +During this process, Substrait plans are read, executed, and ArrowReaders are +returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains +``Local Files`` the URI per table are defined in the Substrait plan, different +than ``Named Tables`` where is needed to define a mapping name of tables +and theirs ArrowReader representation. + +.. contents:: + +Getting Started +=============== + +Java Substrait API uses Acero C++ Substrait API capabilities thru JNI wrappers. + +.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero. + +Substrait Consumer +================== + +Substrait Plan offer two ways to define URI for Query data: + +- Local Files: A fixed URI value on the plan +- Named Table: An external configuration to define URI value + +Local Files: + +.. code-block:: json + + "local_files": { + "items": [ + { + "uri_file": "file:///tmp/opt/lineitem.parquet", + "parquet": {} + } + ] + } + +Named Table: + +.. code-block:: json + + "namedTable": { + "names": ["LINEITEM"] + } Review Comment: I don't think we need to be a Substrait tutorial? ########## docs/source/java/substrait.rst: ########## @@ -0,0 +1,149 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +Java Substrait offer capabilities to ``Query`` data received as a Susbtrait +Plan (`plain or binary format`). + +During this process, Substrait plans are read, executed, and ArrowReaders are +returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains +``Local Files`` the URI per table are defined in the Substrait plan, different +than ``Named Tables`` where is needed to define a mapping name of tables +and theirs ArrowReader representation. + +.. contents:: + +Getting Started +=============== + +Java Substrait API uses Acero C++ Substrait API capabilities thru JNI wrappers. + +.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero. + +Substrait Consumer +================== + +Substrait Plan offer two ways to define URI for Query data: + +- Local Files: A fixed URI value on the plan +- Named Table: An external configuration to define URI value + +Local Files: + +.. code-block:: json + + "local_files": { + "items": [ + { + "uri_file": "file:///tmp/opt/lineitem.parquet", + "parquet": {} + } + ] + } + +Named Table: + +.. code-block:: json + + "namedTable": { + "names": ["LINEITEM"] + } + +Here is an example of a Java program that queries a Parquet file using Java Substrait: Review Comment: Clarify that this example uses a third party library to compile a SQL query to a Substrait plan. ########## java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan + * in Plan format (JSON) or Binary format (ByteBuffer). + */ +public final class AceroSubstraitConsumer { + private final BufferAllocator allocator; + + public AceroSubstraitConsumer(BufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan The JSON Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) { + if (namedTables.isEmpty()) { + return getArrowReader(plan); + } else { + return getArrowReader(plan, namedTables); + } + } + + /** + * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan the binary Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> namedTables) { Review Comment: This has not been addressed. ########## docs/source/java/substrait.rst: ########## @@ -0,0 +1,149 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +========= +Substrait +========= + +Java Substrait offer capabilities to ``Query`` data received as a Susbtrait +Plan (`plain or binary format`). + +During this process, Substrait plans are read, executed, and ArrowReaders are +returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains +``Local Files`` the URI per table are defined in the Substrait plan, different +than ``Named Tables`` where is needed to define a mapping name of tables +and theirs ArrowReader representation. + +.. contents:: + +Getting Started +=============== + +Java Substrait API uses Acero C++ Substrait API capabilities thru JNI wrappers. + +.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero. + +Substrait Consumer +================== + +Substrait Plan offer two ways to define URI for Query data: + +- Local Files: A fixed URI value on the plan +- Named Table: An external configuration to define URI value + +Local Files: + +.. code-block:: json + + "local_files": { + "items": [ + { + "uri_file": "file:///tmp/opt/lineitem.parquet", + "parquet": {} + } + ] + } + +Named Table: + +.. code-block:: json + + "namedTable": { + "names": ["LINEITEM"] + } + +Here is an example of a Java program that queries a Parquet file using Java Substrait: + +.. code-block:: Java + + import com.google.common.collect.ImmutableList; + import io.substrait.isthmus.SqlToSubstrait; + import io.substrait.proto.Plan; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.calcite.sql.parser.SqlParseException; + + import java.nio.ByteBuffer; + import java.util.HashMap; + import java.util.Map; + + public class ClientSubstrait { + public static void main(String[] args) { + String uri = "file:///data/tpch_parquet/nation.parquet"; + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + BufferAllocator allocator = new RootAllocator(); + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), + FileFormat.PARQUET, uri); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + // map table to reader + Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>(); + mapTableToArrowReader.put("NATION", reader); + // get binary plan + Plan plan = getPlan(); + ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length); + substraitPlan.put(plan.toByteArray()); + // run query + try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery( + substraitPlan, + mapTableToArrowReader + )) { + while (arrowReader.loadNextBatch()) { + System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString()); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + static Plan getPlan() throws SqlParseException { + String sql = "SELECT * from nation"; + String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " + + "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; + SqlToSubstrait sqlToSubstrait = new SqlToSubstrait(); + Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation)); + return plan; + } + } + +.. code-block:: text + + // Results example: + FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3) + 0 ALGERIA 0 haggle. carefully final deposits detect slyly agai + 1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon + +Substrait Producer +================== + +The following options are available for producing Substrait Plans: Acero, +Isthmus, Ibis, DuckDB, others. + +You can generate Substrait plans and then send them to Java Substrait for consumption. Review Comment: I'm not sure this is useful. ########## java/dataset/src/test/resources/substrait/named_table_users.binary: ########## Review Comment: We should not add binary files here. ########## java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.file.FileSystemDatasetFactory; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.Types; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAceroSubstraitConsumer extends TestDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + public static final String AVRO_SCHEMA_USER = "user.avsc"; + private RootAllocator allocator = null; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + protected RootAllocator rootAllocator() { + return allocator; + } + + @Test + public void testRunQueryLocalFiles() throws Exception { + //Query: + //SELECT id, name FROM Users + //Isthmus: + //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users" + ParquetWriteSupport writeSupport = ParquetWriteSupport + .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c"); + try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()) + .runQuery( + planReplaceLocalFileURI( + new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader() + .getResource("substrait/local_files_users.json").toURI()))), + writeSupport.getOutputURI() + ), + Collections.EMPTY_MAP + ) + ) { + while (arrowReader.loadNextBatch()) { Review Comment: Write these tests in a way that can catch if the loop body never executes (e.g. sum up the total rows read and compare it at the end). ########## java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; + +import org.apache.arrow.dataset.jni.JniLoader; + +/** + * Class that contains Native methods to call Acero C++ Substrait API. It internally depends on C++ function + * arrow::engine::ExecuteSerializedPlan. Currently supported input parameters supported are: + * <pre> + * - arrow::Buffer: Susbtrait Plan (JSON or Binary format). + * - arrow::engine::ConversionOptions: Mapping for arrow::engine::NamedTableProvider. + * </pre> + */ +final class JniWrapper { + private static final JniWrapper INSTANCE = new JniWrapper(); + + private JniWrapper() { + } + + public static JniWrapper get() { + JniLoader.get().ensureLoaded(); + return INSTANCE; + } + + /** + * Consume the JSON Substrait Plan that contains Local Files and export the RecordBatchReader into + * C-Data Interface ArrowArrayStream. + * + * @param planInput the JSON Substrait plan. + * @param memoryAddressOutput the memory address where RecordBatchReader is exported. + */ + public native void executeSerializedPlanLocalFiles(String planInput, long memoryAddressOutput); Review Comment: This has not been addressed. ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -261,6 +264,26 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { default_memory_pool_id = -1L; } +/// Iterate over an object array of Tables Name on position `i` and theirs +/// Memory Address representation on `i+1` position linearly. +/// Return a mapping of the Table Name to Query as a Key and a RecordBatchReader +/// as a Value. +std::unordered_map<std::string, std::shared_ptr<arrow::Table>> ToMapTableToArrowReader(JNIEnv* env, jobjectArray& str_array) { + std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader; + int length = env->GetArrayLength(str_array); + std::shared_ptr<arrow::Table> output_table; + for (int pos = 0; pos < length; pos++) { Review Comment: This needs to check if the array length is odd ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -578,3 +581,100 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() } + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles ( + JNIEnv* env, jobject, jstring substrait_plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, substrait_plan))); + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: callMeRN + * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::map<std::string, long> map_table_to_memory_address = ToMap(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = [&map_table_to_memory_address](const std::vector<std::string>& names, const arrow::Schema&) { + std::shared_ptr<arrow::Table> output_table; + for (const auto& name : names) { + // get table from memory address provided + long memory_address = map_table_to_memory_address[name]; + auto* arrow_stream_in = reinterpret_cast<ArrowArrayStream*>(memory_address); + std::shared_ptr<arrow::RecordBatchReader> readerIn = + JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in)); + std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder = + arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn); + JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool())); + auto scanner = JniGetOrThrow(scanner_builder->Finish()); + output_table = JniGetOrThrow(scanner->ToTable()); + } + std::shared_ptr<arrow::compute::ExecNodeOptions> options = + std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table)); + return arrow::compute::Declaration("table_source", {}, options, "java_source"); + }; + arrow::engine::ConversionOptions conversion_options; + conversion_options.named_table_provider = std::move(table_provider); + // execute plan + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan))); + std::shared_ptr<arrow::RecordBatchReader> readerOut = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR, conversion_options)); + auto* arrow_stream_out = reinterpret_cast<ArrowArrayStream*>(memory_address_output); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, arrow_stream_out)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: callMeRN + * Signature: (Ljava/lang/Object;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_Object_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jobject plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::map<std::string, long> map_table_to_memory_address = ToMap(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = [&map_table_to_memory_address](const std::vector<std::string>& names, const arrow::Schema&) { + std::shared_ptr<arrow::Table> output_table; + for (const auto& name : names) { Review Comment: This has not been addressed. ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -578,3 +601,102 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() } + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan))); + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/nio/ByteBuffer;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J ( + JNIEnv* env, jobject, jobject plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + // mapping arrow::Buffer + auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan)); + int length = env->GetDirectBufferCapacity(plan); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length)); + std::memcpy(buffer->mutable_data(), buff, length); + // execute plan + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanNamedTables + * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader = ToMapTableToArrowReader(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) { + std::shared_ptr<arrow::Table> output_table; + for (const auto& name : names) { + output_table = map_table_to_reader[name]; Review Comment: We should check if the table exists. ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -578,3 +601,102 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() } + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan))); + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/nio/ByteBuffer;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J ( + JNIEnv* env, jobject, jobject plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + // mapping arrow::Buffer + auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan)); + int length = env->GetDirectBufferCapacity(plan); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length)); + std::memcpy(buffer->mutable_data(), buff, length); + // execute plan + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanNamedTables + * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader = ToMapTableToArrowReader(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) { + std::shared_ptr<arrow::Table> output_table; + for (const auto& name : names) { + output_table = map_table_to_reader[name]; Review Comment: Doesn't this loop break if there's more than one table? ########## java/dataset/src/test/resources/substrait/named_table_users.binary: ########## Review Comment: If you really must, base64 it and embed it as a string. ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -261,6 +264,26 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { default_memory_pool_id = -1L; } +/// Iterate over an object array of Tables Name on position `i` and theirs +/// Memory Address representation on `i+1` position linearly. +/// Return a mapping of the Table Name to Query as a Key and a RecordBatchReader +/// as a Value. +std::unordered_map<std::string, std::shared_ptr<arrow::Table>> ToMapTableToArrowReader(JNIEnv* env, jobjectArray& str_array) { + std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader; + int length = env->GetArrayLength(str_array); + std::shared_ptr<arrow::Table> output_table; + for (int pos = 0; pos < length; pos++) { + auto j_string_key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos)); + pos++; + auto j_string_value = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos)); + auto* arrow_stream_in = reinterpret_cast<ArrowArrayStream*>(std::stol(JStringToCString(env, j_string_value))); Review Comment: We should catch errors from stol. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
