lidavidm commented on code in PR #35570:
URL: https://github.com/apache/arrow/pull/35570#discussion_r1326390844
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -474,6 +484,39 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (substrait_projection != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_projection);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (!(named_expression.expression.type()->id() == arrow::Type::BOOL)) {
+ project_exprs.push_back(std::move(named_expression.expression));
+ project_names.push_back(std::move(named_expression.name));
+ }
+ }
+ JniAssertOkOrThrow(scanner_builder->Project(std::move(project_exprs),
std::move(project_names)));
+ }
+ if (substrait_filter != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_filter);
+ std::optional<arrow::compute::Expression> filter_expr;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_expr.has_value()) {
+ JniThrow("Only one filter expression may be provided");
+ }
+ filter_expr = named_expression.expression;
+ }
+ }
+ JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr));
Review Comment:
This will crash if you provide an empty list of expressions.
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -474,6 +484,39 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (substrait_projection != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_projection);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (!(named_expression.expression.type()->id() == arrow::Type::BOOL)) {
Review Comment:
```suggestion
if (named_expression.expression.type()->id() != arrow::Type::BOOL) {
```
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,349 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's `Extended
Expression`_.
+This requires the substrait-java library.
+
+This Java program:
+
+- Loads a Parquet file containing the "nation" table from the TPC-H benchmark.
+- Projects two new columns:
+ - ``N_NAME || ' - ' || N_COMMENT``
+ - ``N_REGIONKEY + 10``
+- Applies a filter: ``N_NATIONKEY > 18``
+
+.. code-block:: Java
+
+ import io.substrait.extension.ExtensionCollector;
+ import io.substrait.proto.Expression;
+ import io.substrait.proto.ExpressionReference;
+ import io.substrait.proto.ExtendedExpression;
+ import io.substrait.proto.FunctionArgument;
+ import io.substrait.proto.SimpleExtensionDeclaration;
+ import io.substrait.proto.SimpleExtensionURI;
+ import io.substrait.type.NamedStruct;
+ import io.substrait.type.Type;
+ import io.substrait.type.TypeCreator;
+ import io.substrait.type.proto.TypeProtoConverter;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Optional;
+
+ public class ClientSubstraitExtendedExpressionsCookbook {
+ public static void main(String[] args) throws Exception {
+ // project and filter dataset using extended expression definition
- 03 Expressions:
+ // Expression 01 - CONCAT: N_NAME || ' - ' || N_COMMENT = col 1 ||
' - ' || col 3
+ // Expression 02 - ADD: N_REGIONKEY + 10 = col 1 + 10
+ // Expression 03 - FILTER: N_NATIONKEY > 18 = col 3 > 18
+ projectAndFilterDataset();
+ }
+
+ public static void projectAndFilterDataset() {
+ String uri = "file:///Users/data/tpch_parquet/nation.parquet";
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitFilter(getSubstraitExpressionFilter())
+ .substraitProjection(getSubstraitExpressionProjection())
+ .build();
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ while (reader.loadNextBatch()) {
+ System.out.println(
+ reader.getVectorSchemaRoot().contentToTSVString());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
Review Comment:
just declare everything as `throws Exception`
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -474,6 +484,39 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (substrait_projection != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_projection);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (!(named_expression.expression.type()->id() == arrow::Type::BOOL)) {
+ project_exprs.push_back(std::move(named_expression.expression));
+ project_names.push_back(std::move(named_expression.name));
+ }
+ }
+ JniAssertOkOrThrow(scanner_builder->Project(std::move(project_exprs),
std::move(project_names)));
+ }
+ if (substrait_filter != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_filter);
+ std::optional<arrow::compute::Expression> filter_expr;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_expr.has_value()) {
+ JniThrow("Only one filter expression may be provided");
+ }
+ filter_expr = named_expression.expression;
+ }
Review Comment:
Throw if the expression is not of type BOOL.
##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,170 @@ public void testRunBinaryQueryNamedTableNation() throws
Exception {
}
}
}
+
+ @Test
+ public void testBaseParquetReadWithExtendedExpressionsFilter() throws
Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Filter:
+ // Expression 01: WHERE ID < 20
+ String base64EncodedSubstraitFilter =
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
+
"ueRo3ChwaGggCGgQKAhABIggaBhIECgISACIGGgQKAigUGhdmaWx0ZXJfaWRfbG93ZXJfdGhhbl8yMCIaCgJJRAoETkFNRRIOCgQqAhA"
+
+ "BCgRiAhABGAI=";
+ ByteBuffer substraitExpressionFilter =
getByteBuffer(base64EncodedSubstraitFilter);
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1,
"value_1",
+ 11, "value_11", 21, "value_21", 45, "value_45");
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitFilter(substraitExpressionFilter)
+ .build();
+ try (
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowcount = 0;
+ while (reader.loadNextBatch()) {
+ rowcount += reader.getVectorSchemaRoot().getRowCount();
+
assertTrue(reader.getVectorSchemaRoot().getVector("id").toString().equals("[19,
1, 11]"));
+ assertTrue(reader.getVectorSchemaRoot().getVector("name").toString()
+ .equals("[value_19, value_1, value_11]"));
Review Comment:
I think we should consider what to do here; possibly an
Iterator/Iterable/Stream that gives Java objects would be sufficient (and you
could collect into a Java collection and then use standard assertions). Can you
file a follow-up task?
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -474,6 +484,39 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (substrait_projection != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_projection);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression& named_expression :
+ bounded_expression.named_expressions) {
+ if (!(named_expression.expression.type()->id() == arrow::Type::BOOL)) {
Review Comment:
Why do we have this in the first place? I think this was left over from a
refactor as seen below. It should be perfectly fine to project a BOOL column.
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,349 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's `Extended
Expression`_.
+This requires the substrait-java library.
+
+This Java program:
+
+- Loads a Parquet file containing the "nation" table from the TPC-H benchmark.
+- Projects two new columns:
+ - ``N_NAME || ' - ' || N_COMMENT``
+ - ``N_REGIONKEY + 10``
+- Applies a filter: ``N_NATIONKEY > 18``
+
+.. code-block:: Java
+
+ import io.substrait.extension.ExtensionCollector;
+ import io.substrait.proto.Expression;
+ import io.substrait.proto.ExpressionReference;
+ import io.substrait.proto.ExtendedExpression;
+ import io.substrait.proto.FunctionArgument;
+ import io.substrait.proto.SimpleExtensionDeclaration;
+ import io.substrait.proto.SimpleExtensionURI;
+ import io.substrait.type.NamedStruct;
+ import io.substrait.type.Type;
+ import io.substrait.type.TypeCreator;
+ import io.substrait.type.proto.TypeProtoConverter;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Optional;
+
+ public class ClientSubstraitExtendedExpressionsCookbook {
+ public static void main(String[] args) throws Exception {
+ // project and filter dataset using extended expression definition
- 03 Expressions:
+ // Expression 01 - CONCAT: N_NAME || ' - ' || N_COMMENT = col 1 ||
' - ' || col 3
+ // Expression 02 - ADD: N_REGIONKEY + 10 = col 1 + 10
+ // Expression 03 - FILTER: N_NATIONKEY > 18 = col 3 > 18
+ projectAndFilterDataset();
+ }
+
+ public static void projectAndFilterDataset() {
+ String uri = "file:///Users/data/tpch_parquet/nation.parquet";
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitFilter(getSubstraitExpressionFilter())
+ .substraitProjection(getSubstraitExpressionProjection())
+ .build();
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ while (reader.loadNextBatch()) {
+ System.out.println(
+ reader.getVectorSchemaRoot().contentToTSVString());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static ByteBuffer getSubstraitExpressionProjection() {
+ // Expression: N_REGIONKEY + 10 = col 3 + 10
+ Expression.Builder selectionBuilderProjectOne =
Expression.newBuilder().
+ setSelection(
+ Expression.FieldReference.newBuilder().
+ setDirectReference(
+
Expression.ReferenceSegment.newBuilder().
+ setStructField(
+
Expression.ReferenceSegment.StructField.newBuilder().setField(
+ 2)
+ )
+ )
+ );
Review Comment:
nit: how are you formatting examples? I think it would make sense to just
use `google-java-format` which has a more compact style. In our docs, going too
far to the right is unreadable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]