This is an automated email from the ASF dual-hosted git repository.

brycemecum pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git


The following commit(s) were added to refs/heads/main by this push:
     new 51e2880  [Java] Refactor Java cookbooks to switch from JShell to Maven 
(#350)
51e2880 is described below

commit 51e288096ccaf7653dee5fd9f95d8419b8393e23
Author: Bryce Mecum <[email protected]>
AuthorDate: Thu May 23 09:40:29 2024 -0800

    [Java] Refactor Java cookbooks to switch from JShell to Maven (#350)
    
    In https://github.com/apache/arrow-cookbook/issues/347 we found the way
    we have been running cookbooks for Java (JShell) doesn't work well with
    JPMS which was introduced in Arrow 16. This refactors `javadoctest.py`
    to run examples directly with Maven using `exec:java` instead of with
    JShell. This PR also bumps the Java source/target version to 11 to fix
    some compiler errors and fixes a few compilation errors in cookbook
    code.
    
    I ran into one snag will require a follow-up commit to this PR: The way
    the examples in
    
[substrait.rst](https://raw.githubusercontent.com/apache/arrow-cookbook/main/java/source/substrait.rst)
    are written doesn't work with my approach. My approach splits each code
    snippet into its `import` statements and non-`import` statements, puts
    the imports outside the main class definition and puts the non-imports
    inside the class's main method. This works fine for every example except
    
[substrait.rst](https://raw.githubusercontent.com/apache/arrow-cookbook/main/java/source/substrait.rst)
    which needs some of its code to be defined in the main class, e.g.,
    
    We probably generally want to support examples that need this so I think
    we may need to rewrite all the Java cookbooks to have an explicit main
    class. @lidavidm suspected this might be the case in
    https://github.com/apache/arrow-cookbook/issues/347#issuecomment-2049725497
    but I do wonder if there is still a way to avoid this. Any ideas
    welcome.
    
    Fixes #347
    Related #348
---
 java/CONTRIBUTING.rst                              |  34 +-
 java/ext/javadoctest.py                            | 143 +++++---
 java/source/dataset.rst                            |   1 +
 java/source/demo/pom.xml                           |  16 +-
 .../demo/src/main/java/org/example/Example.java    |  17 +
 java/source/flight.rst                             |   1 +
 java/source/schema.rst                             |   4 +
 java/source/substrait.rst                          | 385 +++++++++++----------
 8 files changed, 366 insertions(+), 235 deletions(-)

diff --git a/java/CONTRIBUTING.rst b/java/CONTRIBUTING.rst
index 55f5345..9f3712c 100644
--- a/java/CONTRIBUTING.rst
+++ b/java/CONTRIBUTING.rst
@@ -30,22 +30,13 @@ install needed packages via pip, to build the Java 
cookbook.  The
 dependency packages managed via pip by build scripts are found at
 `requirements.txt <requirements.txt>`_.
 
-Java Shell
-^^^^^^^^^^^^^^^^^^^^^^^^^
-For Java cookbook we are running these with Java Shell tool -
-`JShell 
<https://docs.oracle.com/en/java/javase/11/jshell/introduction-jshell.html>`_
-
-.. code-block:: bash
-
-    > java --version
-    java 11.0.14 2022-01-18 LTS
-    Java(TM) SE Runtime Environment 18.9 (build 11.0.14+8-LTS-263)
-
-.. code-block:: bash
+Java
+^^^^
 
-    > jshell --version
-    jshell 11.0.14
+The Java cookbooks require:
 
+- Java JDK (11+)
+- Maven
 
 Build Process
 -------------------------
@@ -81,6 +72,21 @@ additional ``.rst`` files in the ``source`` directory. You 
just
 need to remember to add them to the ``index.rst`` file in the
 ``toctree`` for them to become visible.
 
+When run, Java code snippets are wrapped in a simple main class
+
+.. code-block:: java
+
+    // Any imports get put here
+
+    public class Example {
+        public static void main (String[] args) {
+            // Your code gets inserted here
+        }
+    }
+
+If your code is more complicated, you can explicitly define ``public class 
Example``,
+the above wrapping won't happen and the code will be run as-is.
+
 Java Sphinx Directive
 =====================
 
diff --git a/java/ext/javadoctest.py b/java/ext/javadoctest.py
index ae10930..296fe46 100644
--- a/java/ext/javadoctest.py
+++ b/java/ext/javadoctest.py
@@ -18,9 +18,16 @@ import os
 import pathlib
 import subprocess
 from typing import Any, Dict
+import tempfile
+import shutil
 
-from sphinx.ext.doctest import (DocTestBuilder, TestcodeDirective,
-                                TestoutputDirective, doctest, sphinx)
+from sphinx.ext.doctest import (
+    DocTestBuilder,
+    TestcodeDirective,
+    TestoutputDirective,
+    doctest,
+    sphinx,
+)
 from sphinx.locale import __
 
 
@@ -34,6 +41,10 @@ class JavaTestcodeDirective(TestcodeDirective):
 class JavaDocTestBuilder(DocTestBuilder):
     """
     Runs java test snippets in the documentation.
+
+    The code in each testcode block is insert into a template Maven project,
+    run through exec:java, its output captured and post-processed, and finally
+    compared to whatever's in the corresponding testoutput.
     """
 
     name = "javadoctest"
@@ -45,46 +56,49 @@ class JavaDocTestBuilder(DocTestBuilder):
     def compile(
         self, code: str, name: str, type: str, flags: Any, dont_inherit: bool
     ) -> Any:
-        # go to project that contains all your arrow maven dependencies
-        path_arrow_project = pathlib.Path(__file__).parent.parent / "source" / 
"demo"
-        # create list of all arrow jar dependencies
-        subprocess.check_call(
-            [
-                "mvn",
-                "-q",
-                "dependency:build-classpath",
-                "-DincludeTypes=jar",
-                "-Dmdep.outputFile=.cp.tmp",
-                f"-Darrow.version={self.env.config.version}",
-            ],
-            cwd=path_arrow_project,
-            text=True,
-        )
-        if not (path_arrow_project / ".cp.tmp").exists():
-            raise RuntimeError(
-                __("invalid process to create jshell dependencies library")
+        source_dir = pathlib.Path(__file__).parent.parent / "source" / "demo"
+
+        with tempfile.TemporaryDirectory() as project_dir:
+            shutil.copytree(source_dir, project_dir, dirs_exist_ok=True)
+
+            template_file_path = (
+                pathlib.Path(project_dir)
+                / "src"
+                / "main"
+                / "java"
+                / "org"
+                / "example"
+                / "Example.java"
             )
 
-        # get list of all arrow jar dependencies
-        with open(path_arrow_project / ".cp.tmp") as f:
-            stdout_dependency = f.read()
-        if not stdout_dependency:
-            raise RuntimeError(
-                __("invalid process to list jshell dependencies library")
+            with open(template_file_path, "r") as infile:
+                template = infile.read()
+
+            filled_template = self.fill_template(template, code)
+
+            with open(template_file_path, "w") as outfile:
+                outfile.write(filled_template)
+
+            # Support JPMS (since Arrow 16)
+            modified_env = os.environ.copy()
+            modified_env["_JAVA_OPTIONS"] = 
"--add-opens=java.base/java.nio=ALL-UNNAMED"
+
+            test_proc = subprocess.Popen(
+                [
+                    "mvn",
+                    "-f",
+                    project_dir,
+                    "compile",
+                    "exec:java",
+                    "-Dexec.mainClass=org.example.Example",
+                ],
+                stdin=subprocess.PIPE,
+                stdout=subprocess.PIPE,
+                text=True,
+                env=modified_env,
             )
 
-        # execute java testing code thru jshell and read output
-        # JDK11 support '-' This allows the pipe to work as expected without 
requiring a shell
-        # Migrating to /dev/stdin to also support JDK9+
-        proc_jshell_process = subprocess.Popen(
-            ["jshell", "-R--add-opens=java.base/java.nio=ALL-UNNAMED", 
"--class-path", stdout_dependency, "-s", "/dev/stdin"],
-            stdin=subprocess.PIPE,
-            stdout=subprocess.PIPE,
-            text=True,
-        )
-        out_java_arrow, err_java_arrow = proc_jshell_process.communicate(code)
-        if err_java_arrow:
-            raise RuntimeError(__("invalid process to run jshell"))
+            out_java_arrow, err_java_arrow = test_proc.communicate()
 
         # continue with python logic code to do java output validation
         output = f"print('''{self.clean_output(out_java_arrow)}''')"
@@ -93,12 +107,53 @@ class JavaDocTestBuilder(DocTestBuilder):
         return compile(output, name, self.type, flags, dont_inherit)
 
     def clean_output(self, output: str):
-        if output[-3:] == '-> ':
-            output = output[:-3]
-        if output[-1:] == '\n':
-            output = output[:-1]
-        output = (4*' ').join(output.split('\t'))
-        return output
+        lines = output.split("\n")
+
+        # Remove log lines from output
+        lines = [l for l in lines if not l.startswith("[INFO]")]
+        lines = [l for l in lines if not l.startswith("[WARNING]")]
+        lines = [l for l in lines if not l.startswith("Download")]
+        lines = [l for l in lines if not l.startswith("Progress")]
+
+        result = "\n".join(lines)
+
+        # Sometimes the testoutput content is smushed onto the same line as
+        # following log line, probably just when the testcode code doesn't 
print
+        # its own final newline. This example is the only case I found so I
+        # didn't pull out the re module (i.e., this works)
+        result = result.replace(
+            "[INFO] 
------------------------------------------------------------------------",
+            "",
+        )
+
+        # Convert all tabs to 4 spaces, Sphinx seems to eat tabs even if we
+        # explicitly put them in the testoutput block so we instead modify
+        # the output
+        result = (4 * " ").join(result.split("\t"))
+
+        return result.strip()
+
+    def fill_template(self, template, code):
+        # Detect the special case where cookbook code is already wrapped in a
+        # class and just use the code as-is without wrapping it up
+        if code.find("public class Example") >= 0:
+            return template + code
+
+        # Split input code into imports and not-imports
+        lines = code.split("\n")
+        code_imports = [l for l in lines if l.startswith("import")]
+        code_rest = [l for l in lines if not l.startswith("import")]
+
+        pieces = [
+            template,
+            "\n".join(code_imports),
+            "\n\npublic class Example {\n    public static void main(String[] 
args) {\n",
+            "\n".join(code_rest),
+            "    }\n}",
+        ]
+
+        return "\n".join(pieces)
+
 
 def setup(app) -> Dict[str, Any]:
     app.add_directive("testcode", JavaTestcodeDirective)
diff --git a/java/source/dataset.rst b/java/source/dataset.rst
index f7ee556..d859748 100644
--- a/java/source/dataset.rst
+++ b/java/source/dataset.rst
@@ -344,6 +344,7 @@ In case we need to project only certain columns we could 
configure ScanOptions w
    import org.apache.arrow.memory.RootAllocator;
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.ipc.ArrowReader;
+   import java.util.Optional;
 
    String uri = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/parquetfiles/data1.parquet";
    String[] projection = new String[] {"name"};
diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml
index 1a2c42d..02a5d8c 100644
--- a/java/source/demo/pom.xml
+++ b/java/source/demo/pom.xml
@@ -24,6 +24,13 @@
         <version>1.6.1</version>
       </extension>
     </extensions>
+    <plugins>
+        <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>3.0.0</version>
+        </plugin>
+    </plugins>
     </build>
     <repositories>
         <repository>
@@ -32,8 +39,8 @@
         </repository>
     </repositories>
     <properties>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
         <arrow.version>15.0.2</arrow.version>
     </properties>
     <dependencies>
@@ -107,5 +114,10 @@
             <artifactId>core</artifactId>
             <version>0.26.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-core</artifactId>
+            <version>1.37.0</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/java/source/demo/src/main/java/org/example/Example.java 
b/java/source/demo/src/main/java/org/example/Example.java
new file mode 100644
index 0000000..7b0693d
--- /dev/null
+++ b/java/source/demo/src/main/java/org/example/Example.java
@@ -0,0 +1,17 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.example;
diff --git a/java/source/flight.rst b/java/source/flight.rst
index 6c5c57b..875f19e 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -70,6 +70,7 @@ Flight Client and Server
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;
+   import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.ConcurrentHashMap;
 
    class Dataset implements AutoCloseable {
diff --git a/java/source/schema.rst b/java/source/schema.rst
index f5c33f3..4c3bd5a 100644
--- a/java/source/schema.rst
+++ b/java/source/schema.rst
@@ -61,6 +61,8 @@ Fields are used to denote the particular columns of tabular 
data.
    import org.apache.arrow.vector.types.pojo.ArrowType;
    import org.apache.arrow.vector.types.pojo.Field;
    import org.apache.arrow.vector.types.pojo.FieldType;
+   import java.util.ArrayList;
+   import java.util.List;
 
    FieldType intType = new FieldType(true, new ArrowType.Int(32, true), null);
    FieldType listType = new FieldType(true, new ArrowType.List(), null);
@@ -118,6 +120,8 @@ In case we need to add metadata to our Field we could use:
    import org.apache.arrow.vector.types.pojo.ArrowType;
    import org.apache.arrow.vector.types.pojo.Field;
    import org.apache.arrow.vector.types.pojo.FieldType;
+   import java.util.HashMap;
+   import java.util.Map;
 
    Map<String, String> metadata = new HashMap<>();
    metadata.put("A", "Id card");
diff --git a/java/source/substrait.rst b/java/source/substrait.rst
index 8fd9d4f..1feab88 100644
--- a/java/source/substrait.rst
+++ b/java/source/substrait.rst
@@ -61,48 +61,55 @@ Here is an example of a Java program that queries a Parquet 
file:
     import java.nio.ByteBuffer;
     import java.util.HashMap;
     import java.util.Map;
+    import java.util.Collections;
+
+    public class Example {
+        Plan queryTableNation() throws SqlParseException {
+           String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
+           String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, 
N_NAME CHAR(25), " +
+                   "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+           SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+           Plan plan = sqlToSubstrait.execute(sql, 
Collections.singletonList(nation));
+           return plan;
+        }
 
-    Plan queryTableNation() throws SqlParseException {
-       String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
-       String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, 
N_NAME CHAR(25), " +
-               "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
-       SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
-       Plan plan = sqlToSubstrait.execute(sql, 
Collections.singletonList(nation));
-       return plan;
-    }
-
-    void queryDatasetThruSubstraitPlanDefinition() {
-       String uri = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
-       ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
-       try (
-           BufferAllocator allocator = new RootAllocator();
-           DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
-                   FileFormat.PARQUET, uri);
-           Dataset dataset = datasetFactory.finish();
-           Scanner scanner = dataset.newScan(options);
-           ArrowReader reader = scanner.scanBatches()
-       ) {
-           Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
-           mapTableToArrowReader.put("NATION", reader);
-           // get binary plan
-           Plan plan = queryTableNation();
-           ByteBuffer substraitPlan = 
ByteBuffer.allocateDirect(plan.toByteArray().length);
-           substraitPlan.put(plan.toByteArray());
-           // run query
-           try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(allocator).runQuery(
-               substraitPlan,
-               mapTableToArrowReader
-           )) {
-               while (arrowReader.loadNextBatch()) {
-                   
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+        void queryDatasetThruSubstraitPlanDefinition() {
+           String uri = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
+           ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+           try (
+               BufferAllocator allocator = new RootAllocator();
+               DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
+                       FileFormat.PARQUET, uri);
+               Dataset dataset = datasetFactory.finish();
+               Scanner scanner = dataset.newScan(options);
+               ArrowReader reader = scanner.scanBatches()
+           ) {
+               Map<String, ArrowReader> mapTableToArrowReader = new 
HashMap<>();
+               mapTableToArrowReader.put("NATION", reader);
+               // get binary plan
+               Plan plan = queryTableNation();
+               ByteBuffer substraitPlan = 
ByteBuffer.allocateDirect(plan.toByteArray().length);
+               substraitPlan.put(plan.toByteArray());
+               // run query
+               try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(allocator).runQuery(
+                   substraitPlan,
+                   mapTableToArrowReader
+               )) {
+                   while (arrowReader.loadNextBatch()) {
+                       
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+                   }
                }
+           } catch (Exception e) {
+               e.printStackTrace();
            }
-       } catch (Exception e) {
-           e.printStackTrace();
-       }
-    }
+        }
 
-    queryDatasetThruSubstraitPlanDefinition();
+        public static void main(String[] args) {
+            Example ex = new Example();
+
+            ex.queryDatasetThruSubstraitPlanDefinition();
+        }
+    }
 
 .. testoutput::
 
@@ -134,66 +141,72 @@ For example, we can join the nation and customer tables 
from the TPC-H benchmark
     import java.util.HashMap;
     import java.util.Map;
 
-    Plan queryTableNationJoinCustomer() throws SqlParseException {
-        String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION 
n JOIN CUSTOMER c " +
-            "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
-            "GROUP BY n.n_name";
-        String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
-            "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT 
VARCHAR(152))";
-        String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " 
+
-            "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT 
NULL, " +
-            "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
-            "C_COMMENT VARCHAR(117) )";
-        SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
-        Plan plan = sqlToSubstrait.execute(sql,
-            Arrays.asList(nation, customer));
-        return plan;
-    }
+    public class Example {
+        Plan queryTableNationJoinCustomer() throws SqlParseException {
+            String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM 
NATION n JOIN CUSTOMER c " +
+                "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
+                "GROUP BY n.n_name";
+            String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, 
" +
+                "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT 
VARCHAR(152))";
+            String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT 
NULL, " +
+                "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT 
NOT NULL, " +
+                "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " 
+
+                "C_COMMENT VARCHAR(117) )";
+            SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+            Plan plan = sqlToSubstrait.execute(sql,
+                Arrays.asList(nation, customer));
+            return plan;
+        }
 
-    void queryTwoDatasetsThruSubstraitPlanDefinition() {
-        String uriNation = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
-        String uriCustomer = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/customer.parquet";
-        ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
-        try (
-            BufferAllocator allocator = new RootAllocator();
-            DatasetFactory datasetFactory = new FileSystemDatasetFactory(
-                allocator, NativeMemoryPool.getDefault(),
-                FileFormat.PARQUET, uriNation);
-            Dataset dataset = datasetFactory.finish();
-            Scanner scanner = dataset.newScan(options);
-            ArrowReader readerNation = scanner.scanBatches();
-            DatasetFactory datasetFactoryCustomer = new 
FileSystemDatasetFactory(
-                allocator, NativeMemoryPool.getDefault(),
-                FileFormat.PARQUET, uriCustomer);
-            Dataset datasetCustomer = datasetFactoryCustomer.finish();
-            Scanner scannerCustomer = datasetCustomer.newScan(options);
-            ArrowReader readerCustomer = scannerCustomer.scanBatches()
-        ) {
-            // map table to reader
-            Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
-            mapTableToArrowReader.put("NATION", readerNation);
-            mapTableToArrowReader.put("CUSTOMER", readerCustomer);
-            // get binary plan
-            Plan plan = queryTableNationJoinCustomer();
-            ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
-                plan.toByteArray().length);
-            substraitPlan.put(plan.toByteArray());
-            // run query
-            try (ArrowReader arrowReader = new AceroSubstraitConsumer(
-                allocator).runQuery(
-                substraitPlan,
-                mapTableToArrowReader
-            )) {
-                while (arrowReader.loadNextBatch()) {
-                    
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+        void queryTwoDatasetsThruSubstraitPlanDefinition() {
+            String uriNation = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
+            String uriCustomer = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/customer.parquet";
+            ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+            try (
+                BufferAllocator allocator = new RootAllocator();
+                DatasetFactory datasetFactory = new FileSystemDatasetFactory(
+                    allocator, NativeMemoryPool.getDefault(),
+                    FileFormat.PARQUET, uriNation);
+                Dataset dataset = datasetFactory.finish();
+                Scanner scanner = dataset.newScan(options);
+                ArrowReader readerNation = scanner.scanBatches();
+                DatasetFactory datasetFactoryCustomer = new 
FileSystemDatasetFactory(
+                    allocator, NativeMemoryPool.getDefault(),
+                    FileFormat.PARQUET, uriCustomer);
+                Dataset datasetCustomer = datasetFactoryCustomer.finish();
+                Scanner scannerCustomer = datasetCustomer.newScan(options);
+                ArrowReader readerCustomer = scannerCustomer.scanBatches()
+            ) {
+                // map table to reader
+                Map<String, ArrowReader> mapTableToArrowReader = new 
HashMap<>();
+                mapTableToArrowReader.put("NATION", readerNation);
+                mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+                // get binary plan
+                Plan plan = queryTableNationJoinCustomer();
+                ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
+                    plan.toByteArray().length);
+                substraitPlan.put(plan.toByteArray());
+                // run query
+                try (ArrowReader arrowReader = new AceroSubstraitConsumer(
+                    allocator).runQuery(
+                    substraitPlan,
+                    mapTableToArrowReader
+                )) {
+                    while (arrowReader.loadNextBatch()) {
+                        
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+                    }
                 }
+            } catch (Exception e) {
+                e.printStackTrace();
             }
-        } catch (Exception e) {
-            e.printStackTrace();
         }
-    }
 
-    queryTwoDatasetsThruSubstraitPlanDefinition();
+        public static void main(String[] args) {
+            Example ex = new Example();
+
+            ex.queryTwoDatasetsThruSubstraitPlanDefinition();
+        }
+    }
 
 .. testoutput::
 
@@ -223,6 +236,7 @@ Here is an example of a Java program that filters a Parquet 
file:
     import io.substrait.proto.ExtendedExpression;
     import java.nio.ByteBuffer;
     import java.util.Optional;
+    import java.util.Collections;
     import org.apache.arrow.dataset.file.FileFormat;
     import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
     import org.apache.arrow.dataset.jni.NativeMemoryPool;
@@ -235,43 +249,53 @@ Here is an example of a Java program that filters a 
Parquet file:
     import org.apache.arrow.vector.ipc.ArrowReader;
     import org.apache.calcite.sql.parser.SqlParseException;
 
-    ByteBuffer getFilterExpression() throws SqlParseException {
-      String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
-      String nation =
-          "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
-              + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
-      SqlExpressionToSubstrait expressionToSubstrait = new 
SqlExpressionToSubstrait();
-      ExtendedExpression expression =
-          expressionToSubstrait.convert(sqlExpression, 
Collections.singletonList(nation));
-      byte[] expressionToByte = expression.toByteArray();
-      ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(expressionToByte.length);
-      byteBuffer.put(expressionToByte);
-      return byteBuffer;
-    }
+    public class Example {
+        ByteBuffer getFilterExpression() throws SqlParseException {
+          String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
+          String nation =
+              "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), 
"
+                  + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
+          SqlExpressionToSubstrait expressionToSubstrait = new 
SqlExpressionToSubstrait();
+          ExtendedExpression expression =
+              expressionToSubstrait.convert(sqlExpression, 
Collections.singletonList(nation));
+          byte[] expressionToByte = expression.toByteArray();
+          ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(expressionToByte.length);
+          byteBuffer.put(expressionToByte);
+          return byteBuffer;
+        }
 
-    void filterDataset() throws SqlParseException {
-      String uri = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
-      ScanOptions options =
-          new ScanOptions.Builder(/*batchSize*/ 32768)
-              .columns(Optional.empty())
-              .substraitFilter(getFilterExpression())
-              .build();
-      try (BufferAllocator allocator = new RootAllocator();
-          DatasetFactory datasetFactory =
-              new FileSystemDatasetFactory(
-                  allocator, NativeMemoryPool.getDefault(), 
FileFormat.PARQUET, uri);
-          Dataset dataset = datasetFactory.finish();
-          Scanner scanner = dataset.newScan(options);
-          ArrowReader reader = scanner.scanBatches()) {
-        while (reader.loadNextBatch()) {
-          System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+        void filterDataset() throws SqlParseException {
+          String uri = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
+          ScanOptions options =
+              new ScanOptions.Builder(/*batchSize*/ 32768)
+                  .columns(Optional.empty())
+                  .substraitFilter(getFilterExpression())
+                  .build();
+          try (BufferAllocator allocator = new RootAllocator();
+              DatasetFactory datasetFactory =
+                  new FileSystemDatasetFactory(
+                      allocator, NativeMemoryPool.getDefault(), 
FileFormat.PARQUET, uri);
+              Dataset dataset = datasetFactory.finish();
+              Scanner scanner = dataset.newScan(options);
+              ArrowReader reader = scanner.scanBatches()) {
+            while (reader.loadNextBatch()) {
+              
System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+            }
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
         }
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
 
-    filterDataset();
+        public static void main(String[] args) {
+            Example ex = new Example();
+
+            try {
+              ex.filterDataset();
+            } catch (SqlParseException e) {
+              System.out.println(e.getMessage());
+            }
+        }
+    }
 
 .. testoutput::
 
@@ -302,6 +326,7 @@ a Parquet file:
     import io.substrait.proto.ExtendedExpression;
     import java.nio.ByteBuffer;
     import java.util.Optional;
+    import java.util.Collections;
     import org.apache.arrow.dataset.file.FileFormat;
     import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
     import org.apache.arrow.dataset.jni.NativeMemoryPool;
@@ -314,58 +339,68 @@ a Parquet file:
     import org.apache.arrow.vector.ipc.ArrowReader;
     import org.apache.calcite.sql.parser.SqlParseException;
 
-    ByteBuffer getProjectExpression() throws SqlParseException {
-      String[] sqlExpression = new String[]{"N_NAME", "N_NATIONKEY > 12", 
"N_NATIONKEY + 31"};
-      String nation =
-          "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
-              + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
-      SqlExpressionToSubstrait expressionToSubstrait = new 
SqlExpressionToSubstrait();
-      ExtendedExpression expression =
-          expressionToSubstrait.convert(sqlExpression, 
Collections.singletonList(nation));
-      byte[] expressionToByte = expression.toByteArray();
-      ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(expressionToByte.length);
-      byteBuffer.put(expressionToByte);
-      return byteBuffer;
-    }
+    public class Example {
+        ByteBuffer getProjectExpression() throws SqlParseException {
+        String[] sqlExpression = new String[]{"N_NAME", "N_NATIONKEY > 12", 
"N_NATIONKEY + 31"};
+        String nation =
+            "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
+                + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
+        SqlExpressionToSubstrait expressionToSubstrait = new 
SqlExpressionToSubstrait();
+        ExtendedExpression expression =
+            expressionToSubstrait.convert(sqlExpression, 
Collections.singletonList(nation));
+        byte[] expressionToByte = expression.toByteArray();
+        ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(expressionToByte.length);
+        byteBuffer.put(expressionToByte);
+        return byteBuffer;
+        }
 
-    ByteBuffer getFilterExpression() throws SqlParseException {
-      String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
-      String nation =
-          "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
-              + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
-      SqlExpressionToSubstrait expressionToSubstrait = new 
SqlExpressionToSubstrait();
-      ExtendedExpression expression =
-          expressionToSubstrait.convert(sqlExpression, 
Collections.singletonList(nation));
-      byte[] expressionToByte = expression.toByteArray();
-      ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(expressionToByte.length);
-      byteBuffer.put(expressionToByte);
-      return byteBuffer;
-    }
+        ByteBuffer getFilterExpression() throws SqlParseException {
+        String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
+        String nation =
+            "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
+                + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
+        SqlExpressionToSubstrait expressionToSubstrait = new 
SqlExpressionToSubstrait();
+        ExtendedExpression expression =
+            expressionToSubstrait.convert(sqlExpression, 
Collections.singletonList(nation));
+        byte[] expressionToByte = expression.toByteArray();
+        ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(expressionToByte.length);
+        byteBuffer.put(expressionToByte);
+        return byteBuffer;
+        }
 
-    void filterAndProjectDataset() throws SqlParseException {
-      String uri = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
-      ScanOptions options =
-          new ScanOptions.Builder(/*batchSize*/ 32768)
-              .columns(Optional.empty())
-              .substraitFilter(getFilterExpression())
-              .substraitProjection(getProjectExpression())
-              .build();
-      try (BufferAllocator allocator = new RootAllocator();
-          DatasetFactory datasetFactory =
-              new FileSystemDatasetFactory(
-                  allocator, NativeMemoryPool.getDefault(), 
FileFormat.PARQUET, uri);
-          Dataset dataset = datasetFactory.finish();
-          Scanner scanner = dataset.newScan(options);
-          ArrowReader reader = scanner.scanBatches()) {
-        while (reader.loadNextBatch()) {
-          System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+        void filterAndProjectDataset() throws SqlParseException {
+        String uri = "file:" + System.getProperty("user.dir") + 
"/thirdpartydeps/tpch/nation.parquet";
+        ScanOptions options =
+            new ScanOptions.Builder(/*batchSize*/ 32768)
+                .columns(Optional.empty())
+                .substraitFilter(getFilterExpression())
+                .substraitProjection(getProjectExpression())
+                .build();
+        try (BufferAllocator allocator = new RootAllocator();
+            DatasetFactory datasetFactory =
+                new FileSystemDatasetFactory(
+                    allocator, NativeMemoryPool.getDefault(), 
FileFormat.PARQUET, uri);
+            Dataset dataset = datasetFactory.finish();
+            Scanner scanner = dataset.newScan(options);
+            ArrowReader reader = scanner.scanBatches()) {
+            while (reader.loadNextBatch()) {
+            
System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
         }
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
 
-    filterAndProjectDataset();
+        public static void main (String[] args) {
+            Example ex = new Example();
+
+            try {
+                ex.filterAndProjectDataset();
+            } catch (SqlParseException e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
 
 .. testoutput::
 


Reply via email to