davisusanibar commented on code in PR #35570:
URL: https://github.com/apache/arrow/pull/35570#discussion_r1327264182
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,349 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's `Extended
Expression`_.
+This requires the substrait-java library.
+
+This Java program:
+
+- Loads a Parquet file containing the "nation" table from the TPC-H benchmark.
+- Projects two new columns:
+ - ``N_NAME || ' - ' || N_COMMENT``
+ - ``N_REGIONKEY + 10``
+- Applies a filter: ``N_NATIONKEY > 18``
+
+.. code-block:: Java
+
+ import io.substrait.extension.ExtensionCollector;
+ import io.substrait.proto.Expression;
+ import io.substrait.proto.ExpressionReference;
+ import io.substrait.proto.ExtendedExpression;
+ import io.substrait.proto.FunctionArgument;
+ import io.substrait.proto.SimpleExtensionDeclaration;
+ import io.substrait.proto.SimpleExtensionURI;
+ import io.substrait.type.NamedStruct;
+ import io.substrait.type.Type;
+ import io.substrait.type.TypeCreator;
+ import io.substrait.type.proto.TypeProtoConverter;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Optional;
+
+ public class ClientSubstraitExtendedExpressionsCookbook {
+ public static void main(String[] args) throws Exception {
+ // project and filter dataset using extended expression definition
- 03 Expressions:
+ // Expression 01 - CONCAT: N_NAME || ' - ' || N_COMMENT = col 1 ||
' - ' || col 3
+ // Expression 02 - ADD: N_REGIONKEY + 10 = col 1 + 10
+ // Expression 03 - FILTER: N_NATIONKEY > 18 = col 3 > 18
+ projectAndFilterDataset();
+ }
+
+ public static void projectAndFilterDataset() {
+ String uri = "file:///Users/data/tpch_parquet/nation.parquet";
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitFilter(getSubstraitExpressionFilter())
+ .substraitProjection(getSubstraitExpressionProjection())
+ .build();
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ while (reader.loadNextBatch()) {
+ System.out.println(
+ reader.getVectorSchemaRoot().contentToTSVString());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static ByteBuffer getSubstraitExpressionProjection() {
+ // Expression: N_REGIONKEY + 10 = col 3 + 10
+ Expression.Builder selectionBuilderProjectOne =
Expression.newBuilder().
+ setSelection(
+ Expression.FieldReference.newBuilder().
+ setDirectReference(
+
Expression.ReferenceSegment.newBuilder().
+ setStructField(
+
Expression.ReferenceSegment.StructField.newBuilder().setField(
+ 2)
+ )
+ )
+ );
Review Comment:
changed
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,349 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's `Extended
Expression`_.
+This requires the substrait-java library.
+
+This Java program:
+
+- Loads a Parquet file containing the "nation" table from the TPC-H benchmark.
+- Projects two new columns:
+ - ``N_NAME || ' - ' || N_COMMENT``
+ - ``N_REGIONKEY + 10``
+- Applies a filter: ``N_NATIONKEY > 18``
+
+.. code-block:: Java
+
+ import io.substrait.extension.ExtensionCollector;
+ import io.substrait.proto.Expression;
+ import io.substrait.proto.ExpressionReference;
+ import io.substrait.proto.ExtendedExpression;
+ import io.substrait.proto.FunctionArgument;
+ import io.substrait.proto.SimpleExtensionDeclaration;
+ import io.substrait.proto.SimpleExtensionURI;
+ import io.substrait.type.NamedStruct;
+ import io.substrait.type.Type;
+ import io.substrait.type.TypeCreator;
+ import io.substrait.type.proto.TypeProtoConverter;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Optional;
+
+ public class ClientSubstraitExtendedExpressionsCookbook {
+ public static void main(String[] args) throws Exception {
+ // project and filter dataset using extended expression definition
- 03 Expressions:
+ // Expression 01 - CONCAT: N_NAME || ' - ' || N_COMMENT = col 1 ||
' - ' || col 3
+ // Expression 02 - ADD: N_REGIONKEY + 10 = col 1 + 10
+ // Expression 03 - FILTER: N_NATIONKEY > 18 = col 3 > 18
+ projectAndFilterDataset();
+ }
+
+ public static void projectAndFilterDataset() {
+ String uri = "file:///Users/data/tpch_parquet/nation.parquet";
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitFilter(getSubstraitExpressionFilter())
+ .substraitProjection(getSubstraitExpressionProjection())
+ .build();
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ while (reader.loadNextBatch()) {
+ System.out.println(
+ reader.getVectorSchemaRoot().contentToTSVString());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
Review Comment:
added
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -474,6 +484,39 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (substrait_projection != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_projection);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (!(named_expression.expression.type()->id() == arrow::Type::BOOL)) {
+ project_exprs.push_back(std::move(named_expression.expression));
+ project_names.push_back(std::move(named_expression.name));
+ }
+ }
+ JniAssertOkOrThrow(scanner_builder->Project(std::move(project_exprs),
std::move(project_names)));
+ }
+ if (substrait_filter != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_filter);
+ std::optional<arrow::compute::Expression> filter_expr;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_expr.has_value()) {
+ JniThrow("Only one filter expression may be provided");
+ }
+ filter_expr = named_expression.expression;
+ }
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]