This is an automated email from the ASF dual-hosted git repository.
brycemecum pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git
The following commit(s) were added to refs/heads/main by this push:
new 51e2880 [Java] Refactor Java cookbooks to switch from JShell to Maven
(#350)
51e2880 is described below
commit 51e288096ccaf7653dee5fd9f95d8419b8393e23
Author: Bryce Mecum <[email protected]>
AuthorDate: Thu May 23 09:40:29 2024 -0800
[Java] Refactor Java cookbooks to switch from JShell to Maven (#350)
In https://github.com/apache/arrow-cookbook/issues/347 we found the way
we have been running cookbooks for Java (JShell) doesn't work well with
JPMS which was introduced in Arrow 16. This refactors `javadoctest.py`
to run examples directly with Maven using `exec:java` instead of with
JShell. This PR also bumps the Java source/target version to 11 to fix
some compiler errors and fixes a few compilation errors in cookbook
code.
I ran into one snag will require a follow-up commit to this PR: The way
the examples in
[substrait.rst](https://raw.githubusercontent.com/apache/arrow-cookbook/main/java/source/substrait.rst)
are written doesn't work with my approach. My approach splits each code
snippet into its `import` statements and non-`import` statements, puts
the imports outside the main class definition and puts the non-imports
inside the class's main method. This works fine for every example except
[substrait.rst](https://raw.githubusercontent.com/apache/arrow-cookbook/main/java/source/substrait.rst)
which needs some of its code to be defined in the main class, e.g.,
We probably generally want to support examples that need this so I think
we may need to rewrite all the Java cookbooks to have an explicit main
class. @lidavidm suspected this might be the case in
https://github.com/apache/arrow-cookbook/issues/347#issuecomment-2049725497
but I do wonder if there is still a way to avoid this. Any ideas
welcome.
Fixes #347
Related #348
---
java/CONTRIBUTING.rst | 34 +-
java/ext/javadoctest.py | 143 +++++---
java/source/dataset.rst | 1 +
java/source/demo/pom.xml | 16 +-
.../demo/src/main/java/org/example/Example.java | 17 +
java/source/flight.rst | 1 +
java/source/schema.rst | 4 +
java/source/substrait.rst | 385 +++++++++++----------
8 files changed, 366 insertions(+), 235 deletions(-)
diff --git a/java/CONTRIBUTING.rst b/java/CONTRIBUTING.rst
index 55f5345..9f3712c 100644
--- a/java/CONTRIBUTING.rst
+++ b/java/CONTRIBUTING.rst
@@ -30,22 +30,13 @@ install needed packages via pip, to build the Java
cookbook. The
dependency packages managed via pip by build scripts are found at
`requirements.txt <requirements.txt>`_.
-Java Shell
-^^^^^^^^^^^^^^^^^^^^^^^^^
-For Java cookbook we are running these with Java Shell tool -
-`JShell
<https://docs.oracle.com/en/java/javase/11/jshell/introduction-jshell.html>`_
-
-.. code-block:: bash
-
- > java --version
- java 11.0.14 2022-01-18 LTS
- Java(TM) SE Runtime Environment 18.9 (build 11.0.14+8-LTS-263)
-
-.. code-block:: bash
+Java
+^^^^
- > jshell --version
- jshell 11.0.14
+The Java cookbooks require:
+- Java JDK (11+)
+- Maven
Build Process
-------------------------
@@ -81,6 +72,21 @@ additional ``.rst`` files in the ``source`` directory. You
just
need to remember to add them to the ``index.rst`` file in the
``toctree`` for them to become visible.
+When run, Java code snippets are wrapped in a simple main class
+
+.. code-block:: java
+
+ // Any imports get put here
+
+ public class Example {
+ public static void main (String[] args) {
+ // Your code gets inserted here
+ }
+ }
+
+If your code is more complicated, you can explicitly define ``public class
Example``,
+the above wrapping won't happen and the code will be run as-is.
+
Java Sphinx Directive
=====================
diff --git a/java/ext/javadoctest.py b/java/ext/javadoctest.py
index ae10930..296fe46 100644
--- a/java/ext/javadoctest.py
+++ b/java/ext/javadoctest.py
@@ -18,9 +18,16 @@ import os
import pathlib
import subprocess
from typing import Any, Dict
+import tempfile
+import shutil
-from sphinx.ext.doctest import (DocTestBuilder, TestcodeDirective,
- TestoutputDirective, doctest, sphinx)
+from sphinx.ext.doctest import (
+ DocTestBuilder,
+ TestcodeDirective,
+ TestoutputDirective,
+ doctest,
+ sphinx,
+)
from sphinx.locale import __
@@ -34,6 +41,10 @@ class JavaTestcodeDirective(TestcodeDirective):
class JavaDocTestBuilder(DocTestBuilder):
"""
Runs java test snippets in the documentation.
+
+ The code in each testcode block is insert into a template Maven project,
+ run through exec:java, its output captured and post-processed, and finally
+ compared to whatever's in the corresponding testoutput.
"""
name = "javadoctest"
@@ -45,46 +56,49 @@ class JavaDocTestBuilder(DocTestBuilder):
def compile(
self, code: str, name: str, type: str, flags: Any, dont_inherit: bool
) -> Any:
- # go to project that contains all your arrow maven dependencies
- path_arrow_project = pathlib.Path(__file__).parent.parent / "source" /
"demo"
- # create list of all arrow jar dependencies
- subprocess.check_call(
- [
- "mvn",
- "-q",
- "dependency:build-classpath",
- "-DincludeTypes=jar",
- "-Dmdep.outputFile=.cp.tmp",
- f"-Darrow.version={self.env.config.version}",
- ],
- cwd=path_arrow_project,
- text=True,
- )
- if not (path_arrow_project / ".cp.tmp").exists():
- raise RuntimeError(
- __("invalid process to create jshell dependencies library")
+ source_dir = pathlib.Path(__file__).parent.parent / "source" / "demo"
+
+ with tempfile.TemporaryDirectory() as project_dir:
+ shutil.copytree(source_dir, project_dir, dirs_exist_ok=True)
+
+ template_file_path = (
+ pathlib.Path(project_dir)
+ / "src"
+ / "main"
+ / "java"
+ / "org"
+ / "example"
+ / "Example.java"
)
- # get list of all arrow jar dependencies
- with open(path_arrow_project / ".cp.tmp") as f:
- stdout_dependency = f.read()
- if not stdout_dependency:
- raise RuntimeError(
- __("invalid process to list jshell dependencies library")
+ with open(template_file_path, "r") as infile:
+ template = infile.read()
+
+ filled_template = self.fill_template(template, code)
+
+ with open(template_file_path, "w") as outfile:
+ outfile.write(filled_template)
+
+ # Support JPMS (since Arrow 16)
+ modified_env = os.environ.copy()
+ modified_env["_JAVA_OPTIONS"] =
"--add-opens=java.base/java.nio=ALL-UNNAMED"
+
+ test_proc = subprocess.Popen(
+ [
+ "mvn",
+ "-f",
+ project_dir,
+ "compile",
+ "exec:java",
+ "-Dexec.mainClass=org.example.Example",
+ ],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ text=True,
+ env=modified_env,
)
- # execute java testing code thru jshell and read output
- # JDK11 support '-' This allows the pipe to work as expected without
requiring a shell
- # Migrating to /dev/stdin to also support JDK9+
- proc_jshell_process = subprocess.Popen(
- ["jshell", "-R--add-opens=java.base/java.nio=ALL-UNNAMED",
"--class-path", stdout_dependency, "-s", "/dev/stdin"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- text=True,
- )
- out_java_arrow, err_java_arrow = proc_jshell_process.communicate(code)
- if err_java_arrow:
- raise RuntimeError(__("invalid process to run jshell"))
+ out_java_arrow, err_java_arrow = test_proc.communicate()
# continue with python logic code to do java output validation
output = f"print('''{self.clean_output(out_java_arrow)}''')"
@@ -93,12 +107,53 @@ class JavaDocTestBuilder(DocTestBuilder):
return compile(output, name, self.type, flags, dont_inherit)
def clean_output(self, output: str):
- if output[-3:] == '-> ':
- output = output[:-3]
- if output[-1:] == '\n':
- output = output[:-1]
- output = (4*' ').join(output.split('\t'))
- return output
+ lines = output.split("\n")
+
+ # Remove log lines from output
+ lines = [l for l in lines if not l.startswith("[INFO]")]
+ lines = [l for l in lines if not l.startswith("[WARNING]")]
+ lines = [l for l in lines if not l.startswith("Download")]
+ lines = [l for l in lines if not l.startswith("Progress")]
+
+ result = "\n".join(lines)
+
+ # Sometimes the testoutput content is smushed onto the same line as
+ # following log line, probably just when the testcode code doesn't
print
+ # its own final newline. This example is the only case I found so I
+ # didn't pull out the re module (i.e., this works)
+ result = result.replace(
+ "[INFO]
------------------------------------------------------------------------",
+ "",
+ )
+
+ # Convert all tabs to 4 spaces, Sphinx seems to eat tabs even if we
+ # explicitly put them in the testoutput block so we instead modify
+ # the output
+ result = (4 * " ").join(result.split("\t"))
+
+ return result.strip()
+
+ def fill_template(self, template, code):
+ # Detect the special case where cookbook code is already wrapped in a
+ # class and just use the code as-is without wrapping it up
+ if code.find("public class Example") >= 0:
+ return template + code
+
+ # Split input code into imports and not-imports
+ lines = code.split("\n")
+ code_imports = [l for l in lines if l.startswith("import")]
+ code_rest = [l for l in lines if not l.startswith("import")]
+
+ pieces = [
+ template,
+ "\n".join(code_imports),
+ "\n\npublic class Example {\n public static void main(String[]
args) {\n",
+ "\n".join(code_rest),
+ " }\n}",
+ ]
+
+ return "\n".join(pieces)
+
def setup(app) -> Dict[str, Any]:
app.add_directive("testcode", JavaTestcodeDirective)
diff --git a/java/source/dataset.rst b/java/source/dataset.rst
index f7ee556..d859748 100644
--- a/java/source/dataset.rst
+++ b/java/source/dataset.rst
@@ -344,6 +344,7 @@ In case we need to project only certain columns we could
configure ScanOptions w
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
+ import java.util.Optional;
String uri = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/parquetfiles/data1.parquet";
String[] projection = new String[] {"name"};
diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml
index 1a2c42d..02a5d8c 100644
--- a/java/source/demo/pom.xml
+++ b/java/source/demo/pom.xml
@@ -24,6 +24,13 @@
<version>1.6.1</version>
</extension>
</extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ </plugin>
+ </plugins>
</build>
<repositories>
<repository>
@@ -32,8 +39,8 @@
</repository>
</repositories>
<properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
<arrow.version>15.0.2</arrow.version>
</properties>
<dependencies>
@@ -107,5 +114,10 @@
<artifactId>core</artifactId>
<version>0.26.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>1.37.0</version>
+ </dependency>
</dependencies>
</project>
diff --git a/java/source/demo/src/main/java/org/example/Example.java
b/java/source/demo/src/main/java/org/example/Example.java
new file mode 100644
index 0000000..7b0693d
--- /dev/null
+++ b/java/source/demo/src/main/java/org/example/Example.java
@@ -0,0 +1,17 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.example;
diff --git a/java/source/flight.rst b/java/source/flight.rst
index 6c5c57b..875f19e 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -70,6 +70,7 @@ Flight Client and Server
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
class Dataset implements AutoCloseable {
diff --git a/java/source/schema.rst b/java/source/schema.rst
index f5c33f3..4c3bd5a 100644
--- a/java/source/schema.rst
+++ b/java/source/schema.rst
@@ -61,6 +61,8 @@ Fields are used to denote the particular columns of tabular
data.
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
+ import java.util.ArrayList;
+ import java.util.List;
FieldType intType = new FieldType(true, new ArrowType.Int(32, true), null);
FieldType listType = new FieldType(true, new ArrowType.List(), null);
@@ -118,6 +120,8 @@ In case we need to add metadata to our Field we could use:
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
+ import java.util.HashMap;
+ import java.util.Map;
Map<String, String> metadata = new HashMap<>();
metadata.put("A", "Id card");
diff --git a/java/source/substrait.rst b/java/source/substrait.rst
index 8fd9d4f..1feab88 100644
--- a/java/source/substrait.rst
+++ b/java/source/substrait.rst
@@ -61,48 +61,55 @@ Here is an example of a Java program that queries a Parquet
file:
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+ import java.util.Collections;
+
+ public class Example {
+ Plan queryTableNation() throws SqlParseException {
+ String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
+ String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL,
N_NAME CHAR(25), " +
+ "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+ SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+ Plan plan = sqlToSubstrait.execute(sql,
Collections.singletonList(nation));
+ return plan;
+ }
- Plan queryTableNation() throws SqlParseException {
- String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
- String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL,
N_NAME CHAR(25), " +
- "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
- SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
- Plan plan = sqlToSubstrait.execute(sql,
Collections.singletonList(nation));
- return plan;
- }
-
- void queryDatasetThruSubstraitPlanDefinition() {
- String uri = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
- ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
- try (
- BufferAllocator allocator = new RootAllocator();
- DatasetFactory datasetFactory = new
FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
- FileFormat.PARQUET, uri);
- Dataset dataset = datasetFactory.finish();
- Scanner scanner = dataset.newScan(options);
- ArrowReader reader = scanner.scanBatches()
- ) {
- Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
- mapTableToArrowReader.put("NATION", reader);
- // get binary plan
- Plan plan = queryTableNation();
- ByteBuffer substraitPlan =
ByteBuffer.allocateDirect(plan.toByteArray().length);
- substraitPlan.put(plan.toByteArray());
- // run query
- try (ArrowReader arrowReader = new
AceroSubstraitConsumer(allocator).runQuery(
- substraitPlan,
- mapTableToArrowReader
- )) {
- while (arrowReader.loadNextBatch()) {
-
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+ void queryDatasetThruSubstraitPlanDefinition() {
+ String uri = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ Map<String, ArrowReader> mapTableToArrowReader = new
HashMap<>();
+ mapTableToArrowReader.put("NATION", reader);
+ // get binary plan
+ Plan plan = queryTableNation();
+ ByteBuffer substraitPlan =
ByteBuffer.allocateDirect(plan.toByteArray().length);
+ substraitPlan.put(plan.toByteArray());
+ // run query
+ try (ArrowReader arrowReader = new
AceroSubstraitConsumer(allocator).runQuery(
+ substraitPlan,
+ mapTableToArrowReader
+ )) {
+ while (arrowReader.loadNextBatch()) {
+
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+ }
}
+ } catch (Exception e) {
+ e.printStackTrace();
}
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ }
- queryDatasetThruSubstraitPlanDefinition();
+ public static void main(String[] args) {
+ Example ex = new Example();
+
+ ex.queryDatasetThruSubstraitPlanDefinition();
+ }
+ }
.. testoutput::
@@ -134,66 +141,72 @@ For example, we can join the nation and customer tables
from the TPC-H benchmark
import java.util.HashMap;
import java.util.Map;
- Plan queryTableNationJoinCustomer() throws SqlParseException {
- String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION
n JOIN CUSTOMER c " +
- "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
- "GROUP BY n.n_name";
- String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
- "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT
VARCHAR(152))";
- String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, "
+
- "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT
NULL, " +
- "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
- "C_COMMENT VARCHAR(117) )";
- SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
- Plan plan = sqlToSubstrait.execute(sql,
- Arrays.asList(nation, customer));
- return plan;
- }
+ public class Example {
+ Plan queryTableNationJoinCustomer() throws SqlParseException {
+ String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM
NATION n JOIN CUSTOMER c " +
+ "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
+ "GROUP BY n.n_name";
+ String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL,
" +
+ "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT
VARCHAR(152))";
+ String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT
NULL, " +
+ "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT
NOT NULL, " +
+ "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), "
+
+ "C_COMMENT VARCHAR(117) )";
+ SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+ Plan plan = sqlToSubstrait.execute(sql,
+ Arrays.asList(nation, customer));
+ return plan;
+ }
- void queryTwoDatasetsThruSubstraitPlanDefinition() {
- String uriNation = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
- String uriCustomer = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/customer.parquet";
- ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
- try (
- BufferAllocator allocator = new RootAllocator();
- DatasetFactory datasetFactory = new FileSystemDatasetFactory(
- allocator, NativeMemoryPool.getDefault(),
- FileFormat.PARQUET, uriNation);
- Dataset dataset = datasetFactory.finish();
- Scanner scanner = dataset.newScan(options);
- ArrowReader readerNation = scanner.scanBatches();
- DatasetFactory datasetFactoryCustomer = new
FileSystemDatasetFactory(
- allocator, NativeMemoryPool.getDefault(),
- FileFormat.PARQUET, uriCustomer);
- Dataset datasetCustomer = datasetFactoryCustomer.finish();
- Scanner scannerCustomer = datasetCustomer.newScan(options);
- ArrowReader readerCustomer = scannerCustomer.scanBatches()
- ) {
- // map table to reader
- Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
- mapTableToArrowReader.put("NATION", readerNation);
- mapTableToArrowReader.put("CUSTOMER", readerCustomer);
- // get binary plan
- Plan plan = queryTableNationJoinCustomer();
- ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
- plan.toByteArray().length);
- substraitPlan.put(plan.toByteArray());
- // run query
- try (ArrowReader arrowReader = new AceroSubstraitConsumer(
- allocator).runQuery(
- substraitPlan,
- mapTableToArrowReader
- )) {
- while (arrowReader.loadNextBatch()) {
-
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+ void queryTwoDatasetsThruSubstraitPlanDefinition() {
+ String uriNation = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
+ String uriCustomer = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/customer.parquet";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uriNation);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader readerNation = scanner.scanBatches();
+ DatasetFactory datasetFactoryCustomer = new
FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uriCustomer);
+ Dataset datasetCustomer = datasetFactoryCustomer.finish();
+ Scanner scannerCustomer = datasetCustomer.newScan(options);
+ ArrowReader readerCustomer = scannerCustomer.scanBatches()
+ ) {
+ // map table to reader
+ Map<String, ArrowReader> mapTableToArrowReader = new
HashMap<>();
+ mapTableToArrowReader.put("NATION", readerNation);
+ mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+ // get binary plan
+ Plan plan = queryTableNationJoinCustomer();
+ ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
+ plan.toByteArray().length);
+ substraitPlan.put(plan.toByteArray());
+ // run query
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(
+ allocator).runQuery(
+ substraitPlan,
+ mapTableToArrowReader
+ )) {
+ while (arrowReader.loadNextBatch()) {
+
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+ }
}
+ } catch (Exception e) {
+ e.printStackTrace();
}
- } catch (Exception e) {
- e.printStackTrace();
}
- }
- queryTwoDatasetsThruSubstraitPlanDefinition();
+ public static void main(String[] args) {
+ Example ex = new Example();
+
+ ex.queryTwoDatasetsThruSubstraitPlanDefinition();
+ }
+ }
.. testoutput::
@@ -223,6 +236,7 @@ Here is an example of a Java program that filters a Parquet
file:
import io.substrait.proto.ExtendedExpression;
import java.nio.ByteBuffer;
import java.util.Optional;
+ import java.util.Collections;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
@@ -235,43 +249,53 @@ Here is an example of a Java program that filters a
Parquet file:
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
- ByteBuffer getFilterExpression() throws SqlParseException {
- String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
- String nation =
- "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
- + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
- SqlExpressionToSubstrait expressionToSubstrait = new
SqlExpressionToSubstrait();
- ExtendedExpression expression =
- expressionToSubstrait.convert(sqlExpression,
Collections.singletonList(nation));
- byte[] expressionToByte = expression.toByteArray();
- ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(expressionToByte.length);
- byteBuffer.put(expressionToByte);
- return byteBuffer;
- }
+ public class Example {
+ ByteBuffer getFilterExpression() throws SqlParseException {
+ String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
+ String nation =
+ "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25),
"
+ + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
+ SqlExpressionToSubstrait expressionToSubstrait = new
SqlExpressionToSubstrait();
+ ExtendedExpression expression =
+ expressionToSubstrait.convert(sqlExpression,
Collections.singletonList(nation));
+ byte[] expressionToByte = expression.toByteArray();
+ ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(expressionToByte.length);
+ byteBuffer.put(expressionToByte);
+ return byteBuffer;
+ }
- void filterDataset() throws SqlParseException {
- String uri = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
- ScanOptions options =
- new ScanOptions.Builder(/*batchSize*/ 32768)
- .columns(Optional.empty())
- .substraitFilter(getFilterExpression())
- .build();
- try (BufferAllocator allocator = new RootAllocator();
- DatasetFactory datasetFactory =
- new FileSystemDatasetFactory(
- allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
- Dataset dataset = datasetFactory.finish();
- Scanner scanner = dataset.newScan(options);
- ArrowReader reader = scanner.scanBatches()) {
- while (reader.loadNextBatch()) {
- System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+ void filterDataset() throws SqlParseException {
+ String uri = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
+ ScanOptions options =
+ new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitFilter(getFilterExpression())
+ .build();
+ try (BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory =
+ new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()) {
+ while (reader.loadNextBatch()) {
+
System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- filterDataset();
+ public static void main(String[] args) {
+ Example ex = new Example();
+
+ try {
+ ex.filterDataset();
+ } catch (SqlParseException e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
.. testoutput::
@@ -302,6 +326,7 @@ a Parquet file:
import io.substrait.proto.ExtendedExpression;
import java.nio.ByteBuffer;
import java.util.Optional;
+ import java.util.Collections;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
@@ -314,58 +339,68 @@ a Parquet file:
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
- ByteBuffer getProjectExpression() throws SqlParseException {
- String[] sqlExpression = new String[]{"N_NAME", "N_NATIONKEY > 12",
"N_NATIONKEY + 31"};
- String nation =
- "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
- + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
- SqlExpressionToSubstrait expressionToSubstrait = new
SqlExpressionToSubstrait();
- ExtendedExpression expression =
- expressionToSubstrait.convert(sqlExpression,
Collections.singletonList(nation));
- byte[] expressionToByte = expression.toByteArray();
- ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(expressionToByte.length);
- byteBuffer.put(expressionToByte);
- return byteBuffer;
- }
+ public class Example {
+ ByteBuffer getProjectExpression() throws SqlParseException {
+ String[] sqlExpression = new String[]{"N_NAME", "N_NATIONKEY > 12",
"N_NATIONKEY + 31"};
+ String nation =
+ "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
+ + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
+ SqlExpressionToSubstrait expressionToSubstrait = new
SqlExpressionToSubstrait();
+ ExtendedExpression expression =
+ expressionToSubstrait.convert(sqlExpression,
Collections.singletonList(nation));
+ byte[] expressionToByte = expression.toByteArray();
+ ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(expressionToByte.length);
+ byteBuffer.put(expressionToByte);
+ return byteBuffer;
+ }
- ByteBuffer getFilterExpression() throws SqlParseException {
- String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
- String nation =
- "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
- + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
- SqlExpressionToSubstrait expressionToSubstrait = new
SqlExpressionToSubstrait();
- ExtendedExpression expression =
- expressionToSubstrait.convert(sqlExpression,
Collections.singletonList(nation));
- byte[] expressionToByte = expression.toByteArray();
- ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(expressionToByte.length);
- byteBuffer.put(expressionToByte);
- return byteBuffer;
- }
+ ByteBuffer getFilterExpression() throws SqlParseException {
+ String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15";
+ String nation =
+ "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), "
+ + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
+ SqlExpressionToSubstrait expressionToSubstrait = new
SqlExpressionToSubstrait();
+ ExtendedExpression expression =
+ expressionToSubstrait.convert(sqlExpression,
Collections.singletonList(nation));
+ byte[] expressionToByte = expression.toByteArray();
+ ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(expressionToByte.length);
+ byteBuffer.put(expressionToByte);
+ return byteBuffer;
+ }
- void filterAndProjectDataset() throws SqlParseException {
- String uri = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
- ScanOptions options =
- new ScanOptions.Builder(/*batchSize*/ 32768)
- .columns(Optional.empty())
- .substraitFilter(getFilterExpression())
- .substraitProjection(getProjectExpression())
- .build();
- try (BufferAllocator allocator = new RootAllocator();
- DatasetFactory datasetFactory =
- new FileSystemDatasetFactory(
- allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
- Dataset dataset = datasetFactory.finish();
- Scanner scanner = dataset.newScan(options);
- ArrowReader reader = scanner.scanBatches()) {
- while (reader.loadNextBatch()) {
- System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+ void filterAndProjectDataset() throws SqlParseException {
+ String uri = "file:" + System.getProperty("user.dir") +
"/thirdpartydeps/tpch/nation.parquet";
+ ScanOptions options =
+ new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitFilter(getFilterExpression())
+ .substraitProjection(getProjectExpression())
+ .build();
+ try (BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory =
+ new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()) {
+ while (reader.loadNextBatch()) {
+
System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- filterAndProjectDataset();
+ public static void main (String[] args) {
+ Example ex = new Example();
+
+ try {
+ ex.filterAndProjectDataset();
+ } catch (SqlParseException e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
.. testoutput::