lidavidm commented on code in PR #35570:
URL: https://github.com/apache/arrow/pull/35570#discussion_r1303404292
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -49,24 +51,72 @@ public ScanOptions(String[] columns, long batchSize) {
/**
* Constructor.
* @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
- * @param columns (Optional) Projected columns. {@link Optional#empty()} for
scanning all columns. Otherwise,
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
* Only columns present in the Array will be scanned.
*/
- public ScanOptions(long batchSize, Optional<String[]> columns) {
- Preconditions.checkNotNull(columns);
+ public ScanOptions(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
this.batchSize = batchSize;
- this.columns = columns;
+ this.columnsSubset = columnsSubset;
+ this.columnsProduceOrFilter = Optional.empty();
}
public ScanOptions(long batchSize) {
this(batchSize, Optional.empty());
}
- public Optional<String[]> getColumns() {
- return columns;
+ public Optional<String[]> getColumnsSubset() {
+ return columnsSubset;
}
public long getBatchSize() {
return batchSize;
}
+
+ public Optional<ByteBuffer> getColumnsProduceOrFilter() {
+ return columnsProduceOrFilter;
+ }
+
+ /**
+ * Builder for Options used during scanning.
+ */
+ public static class Builder {
+ private final long batchSize;
+ private final Optional<String[]> columnsSubset;
+ private Optional<ByteBuffer> columnsProduceOrFilter = Optional.empty();
+
+ /**
+ * Constructor.
+ * @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
+ * Only columns present in the Array will be scanned.
+ */
+ public Builder(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
+ this.batchSize = batchSize;
+ this.columnsSubset = columnsSubset;
+ }
+
+ /**
+ * Define binary extended expression message for projects new columns or
applies filter.
Review Comment:
```suggestion
* Set the Substrait extended expression.
*
* <p>Can be used to filter data and/or project new columns.
```
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -49,24 +51,72 @@ public ScanOptions(String[] columns, long batchSize) {
/**
* Constructor.
* @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
- * @param columns (Optional) Projected columns. {@link Optional#empty()} for
scanning all columns. Otherwise,
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
* Only columns present in the Array will be scanned.
*/
- public ScanOptions(long batchSize, Optional<String[]> columns) {
- Preconditions.checkNotNull(columns);
+ public ScanOptions(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
this.batchSize = batchSize;
- this.columns = columns;
+ this.columnsSubset = columnsSubset;
+ this.columnsProduceOrFilter = Optional.empty();
}
public ScanOptions(long batchSize) {
this(batchSize, Optional.empty());
}
- public Optional<String[]> getColumns() {
- return columns;
+ public Optional<String[]> getColumnsSubset() {
+ return columnsSubset;
}
public long getBatchSize() {
return batchSize;
}
+
+ public Optional<ByteBuffer> getColumnsProduceOrFilter() {
+ return columnsProduceOrFilter;
+ }
+
+ /**
+ * Builder for Options used during scanning.
+ */
+ public static class Builder {
+ private final long batchSize;
+ private final Optional<String[]> columnsSubset;
+ private Optional<ByteBuffer> columnsProduceOrFilter = Optional.empty();
+
+ /**
+ * Constructor.
+ * @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
+ * Only columns present in the Array will be scanned.
+ */
+ public Builder(long batchSize, Optional<String[]> columnsSubset) {
Review Comment:
I'd say for a builder's constructor, there's no need for arguments; just use
the builder methods.
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
+ int length = env->GetDirectBufferCapacity(columns_to_produce_or_filter);
+ std::shared_ptr<arrow::Buffer> buffer =
JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::compute::Expression filter_expr;
+ int filter_count = 0;
+ for(arrow::engine::NamedExpression named_expression :
bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_count > 0) {
+ JniThrow("The process only support one filter expression declared");
+ }
+ filter_expr = named_expression.expression;
+ filter_count++;
Review Comment:
You can track this instead with `optional<Expression> filter_expr`
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
+ int length = env->GetDirectBufferCapacity(columns_to_produce_or_filter);
+ std::shared_ptr<arrow::Buffer> buffer =
JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::compute::Expression filter_expr;
+ int filter_count = 0;
+ for(arrow::engine::NamedExpression named_expression :
bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_count > 0) {
+ JniThrow("The process only support one filter expression declared");
Review Comment:
```suggestion
JniThrow("Only one filter expression may be provided");
```
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,323 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's extended
expressions.
Review Comment:
Link to what this means
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,323 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's extended
expressions.
+This requires the substrait-java library.
+
+This Java program:
+
+- Loads a Parquet file containing the "nation" table from the TPC-H benchmark.
+- Projects two new columns:
+ - ``N_NAME || ' - ' || N_COMMENT``
+ - ``N_REGIONKEY + 10``
+- Applies a filter: ``N_NATIONKEY > 18``
+
+.. code-block:: Java
+
+ import com.google.protobuf.InvalidProtocolBufferException;
+ import io.substrait.extension.ExtensionCollector;
+ import io.substrait.proto.Expression;
+ import io.substrait.proto.ExpressionReference;
+ import io.substrait.proto.ExtendedExpression;
+ import io.substrait.proto.FunctionArgument;
+ import io.substrait.proto.SimpleExtensionDeclaration;
+ import io.substrait.proto.SimpleExtensionURI;
+ import io.substrait.type.NamedStruct;
+ import io.substrait.type.Type;
+ import io.substrait.type.TypeCreator;
+ import io.substrait.type.proto.TypeProtoConverter;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Optional;
+
+ public class ClientSubstraitExtendedExpressions {
+ public static void main(String[] args) throws Exception {
+ // create extended expression for: project two new columns + one
filter
+ ByteBuffer binaryExtendedExpressions =
createExtendedExpresionMessageUsingPOJOClasses();
+ // project and filter dataset using extended expression definition
- 03 Expressions:
+ // Expression 01 - CONCAT: N_NAME || ' - ' || N_COMMENT = col 1 ||
' - ' || col 3
+ // Expression 02 - ADD: N_REGIONKEY + 10 = col 1 + 10
+ // Expression 03 - FILTER: N_NATIONKEY > 18 = col 3 > 18
+ projectAndFilterDataset(binaryExtendedExpressions);
+ }
+
+ public static void projectAndFilterDataset(ByteBuffer
binaryExtendedExpressions) {
+ String uri =
"file:////Users/dsusanibar/voltron/fork/consumer-testing/tests/data/tpch_parquet/nation.parquet";
Review Comment:
Try not to put our company name in strings?
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -49,24 +51,72 @@ public ScanOptions(String[] columns, long batchSize) {
/**
* Constructor.
* @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
- * @param columns (Optional) Projected columns. {@link Optional#empty()} for
scanning all columns. Otherwise,
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
* Only columns present in the Array will be scanned.
*/
- public ScanOptions(long batchSize, Optional<String[]> columns) {
- Preconditions.checkNotNull(columns);
+ public ScanOptions(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
this.batchSize = batchSize;
- this.columns = columns;
+ this.columnsSubset = columnsSubset;
+ this.columnsProduceOrFilter = Optional.empty();
}
public ScanOptions(long batchSize) {
this(batchSize, Optional.empty());
}
- public Optional<String[]> getColumns() {
- return columns;
+ public Optional<String[]> getColumnsSubset() {
+ return columnsSubset;
}
public long getBatchSize() {
return batchSize;
}
+
+ public Optional<ByteBuffer> getColumnsProduceOrFilter() {
+ return columnsProduceOrFilter;
+ }
+
+ /**
+ * Builder for Options used during scanning.
+ */
+ public static class Builder {
+ private final long batchSize;
+ private final Optional<String[]> columnsSubset;
+ private Optional<ByteBuffer> columnsProduceOrFilter = Optional.empty();
+
+ /**
+ * Constructor.
+ * @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
+ * Only columns present in the Array will be scanned.
+ */
+ public Builder(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
+ this.batchSize = batchSize;
+ this.columnsSubset = columnsSubset;
+ }
+
+ /**
+ * Define binary extended expression message for projects new columns or
applies filter.
+ *
+ * @param columnsProduceOrFilter (Optional) Expressions to evaluate to
projects new columns or applies filter.
+ * @return the ScanOptions configured.
+ */
+ public Builder columnsProduceOrFilter(Optional<ByteBuffer>
columnsProduceOrFilter) {
Review Comment:
Please revise the rest of the code based on this.
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -25,8 +26,9 @@
* Options used during scanning.
*/
public class ScanOptions {
- private final Optional<String[]> columns;
+ private final Optional<String[]> columnsSubset;
private final long batchSize;
+ private Optional<ByteBuffer> columnsProduceOrFilter;
Review Comment:
Why is this not `final` like the others?
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
+ int length = env->GetDirectBufferCapacity(columns_to_produce_or_filter);
+ std::shared_ptr<arrow::Buffer> buffer =
JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::compute::Expression filter_expr;
+ int filter_count = 0;
+ for(arrow::engine::NamedExpression named_expression :
bounded_expression.named_expressions) {
Review Comment:
const&, or at least use & and move below
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
Review Comment:
const?
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -49,24 +51,72 @@ public ScanOptions(String[] columns, long batchSize) {
/**
* Constructor.
* @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
- * @param columns (Optional) Projected columns. {@link Optional#empty()} for
scanning all columns. Otherwise,
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
* Only columns present in the Array will be scanned.
*/
- public ScanOptions(long batchSize, Optional<String[]> columns) {
- Preconditions.checkNotNull(columns);
+ public ScanOptions(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
this.batchSize = batchSize;
- this.columns = columns;
+ this.columnsSubset = columnsSubset;
+ this.columnsProduceOrFilter = Optional.empty();
}
public ScanOptions(long batchSize) {
this(batchSize, Optional.empty());
}
- public Optional<String[]> getColumns() {
- return columns;
+ public Optional<String[]> getColumnsSubset() {
+ return columnsSubset;
}
public long getBatchSize() {
return batchSize;
}
+
+ public Optional<ByteBuffer> getColumnsProduceOrFilter() {
+ return columnsProduceOrFilter;
+ }
+
+ /**
+ * Builder for Options used during scanning.
+ */
+ public static class Builder {
+ private final long batchSize;
+ private final Optional<String[]> columnsSubset;
+ private Optional<ByteBuffer> columnsProduceOrFilter = Optional.empty();
+
+ /**
+ * Constructor.
+ * @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
+ * Only columns present in the Array will be scanned.
+ */
+ public Builder(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
+ this.batchSize = batchSize;
+ this.columnsSubset = columnsSubset;
+ }
+
+ /**
+ * Define binary extended expression message for projects new columns or
applies filter.
+ *
+ * @param columnsProduceOrFilter (Optional) Expressions to evaluate to
projects new columns or applies filter.
+ * @return the ScanOptions configured.
+ */
+ public Builder columnsProduceOrFilter(Optional<ByteBuffer>
columnsProduceOrFilter) {
Review Comment:
This needs to be named something clearer. `substraitExtendedExpression`?
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -49,24 +51,72 @@ public ScanOptions(String[] columns, long batchSize) {
/**
* Constructor.
* @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
- * @param columns (Optional) Projected columns. {@link Optional#empty()} for
scanning all columns. Otherwise,
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
* Only columns present in the Array will be scanned.
*/
- public ScanOptions(long batchSize, Optional<String[]> columns) {
- Preconditions.checkNotNull(columns);
+ public ScanOptions(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
this.batchSize = batchSize;
- this.columns = columns;
+ this.columnsSubset = columnsSubset;
+ this.columnsProduceOrFilter = Optional.empty();
}
public ScanOptions(long batchSize) {
this(batchSize, Optional.empty());
}
- public Optional<String[]> getColumns() {
- return columns;
+ public Optional<String[]> getColumnsSubset() {
Review Comment:
I don't think there's a compelling reason to rename everything. I do think
it is time to add proper docstrings.
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
+ int length = env->GetDirectBufferCapacity(columns_to_produce_or_filter);
+ std::shared_ptr<arrow::Buffer> buffer =
JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::compute::Expression filter_expr;
+ int filter_count = 0;
+ for(arrow::engine::NamedExpression named_expression :
bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_count > 0) {
+ JniThrow("The process only support one filter expression declared");
+ }
+ filter_expr = named_expression.expression;
+ filter_count++;
+ } else {
+ project_exprs.push_back(named_expression.expression);
+ project_names.push_back(named_expression.name);
+ }
+ }
+ JniAssertOkOrThrow(scanner_builder->Project(project_exprs, project_names));
Review Comment:
move
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -49,24 +51,72 @@ public ScanOptions(String[] columns, long batchSize) {
/**
* Constructor.
* @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
- * @param columns (Optional) Projected columns. {@link Optional#empty()} for
scanning all columns. Otherwise,
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
* Only columns present in the Array will be scanned.
*/
- public ScanOptions(long batchSize, Optional<String[]> columns) {
- Preconditions.checkNotNull(columns);
+ public ScanOptions(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
this.batchSize = batchSize;
- this.columns = columns;
+ this.columnsSubset = columnsSubset;
+ this.columnsProduceOrFilter = Optional.empty();
}
public ScanOptions(long batchSize) {
this(batchSize, Optional.empty());
}
- public Optional<String[]> getColumns() {
- return columns;
+ public Optional<String[]> getColumnsSubset() {
+ return columnsSubset;
}
public long getBatchSize() {
return batchSize;
}
+
+ public Optional<ByteBuffer> getColumnsProduceOrFilter() {
+ return columnsProduceOrFilter;
+ }
+
+ /**
+ * Builder for Options used during scanning.
+ */
+ public static class Builder {
+ private final long batchSize;
+ private final Optional<String[]> columnsSubset;
+ private Optional<ByteBuffer> columnsProduceOrFilter = Optional.empty();
+
+ /**
+ * Constructor.
+ * @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ * @param columnsSubset (Optional) Projected columns. {@link
Optional#empty()} for scanning all columns. Otherwise,
+ * Only columns present in the Array will be scanned.
+ */
+ public Builder(long batchSize, Optional<String[]> columnsSubset) {
+ Preconditions.checkNotNull(columnsSubset);
+ this.batchSize = batchSize;
+ this.columnsSubset = columnsSubset;
+ }
+
+ /**
+ * Define binary extended expression message for projects new columns or
applies filter.
+ *
+ * @param columnsProduceOrFilter (Optional) Expressions to evaluate to
projects new columns or applies filter.
+ * @return the ScanOptions configured.
+ */
+ public Builder columnsProduceOrFilter(Optional<ByteBuffer>
columnsProduceOrFilter) {
Review Comment:
Don't take Optional here.
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
+ int length = env->GetDirectBufferCapacity(columns_to_produce_or_filter);
+ std::shared_ptr<arrow::Buffer> buffer =
JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
Review Comment:
That said, you don't necessarily need to copy to a new buffer here if you
want to avoid that; you can directly wrap a pointer + length in a buffer (so
long as the buffer does not escape)
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
+ int length = env->GetDirectBufferCapacity(columns_to_produce_or_filter);
+ std::shared_ptr<arrow::Buffer> buffer =
JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
Review Comment:
I think we do this a few times by now, factor out a helper
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,323 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's extended
expressions.
+This requires the substrait-java library.
+
+This Java program:
+
+- Loads a Parquet file containing the "nation" table from the TPC-H benchmark.
+- Projects two new columns:
+ - ``N_NAME || ' - ' || N_COMMENT``
+ - ``N_REGIONKEY + 10``
+- Applies a filter: ``N_NATIONKEY > 18``
+
+.. code-block:: Java
+
+ import com.google.protobuf.InvalidProtocolBufferException;
+ import io.substrait.extension.ExtensionCollector;
+ import io.substrait.proto.Expression;
+ import io.substrait.proto.ExpressionReference;
+ import io.substrait.proto.ExtendedExpression;
+ import io.substrait.proto.FunctionArgument;
+ import io.substrait.proto.SimpleExtensionDeclaration;
+ import io.substrait.proto.SimpleExtensionURI;
+ import io.substrait.type.NamedStruct;
+ import io.substrait.type.Type;
+ import io.substrait.type.TypeCreator;
+ import io.substrait.type.proto.TypeProtoConverter;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Optional;
+
+ public class ClientSubstraitExtendedExpressions {
+ public static void main(String[] args) throws Exception {
+ // create extended expression for: project two new columns + one
filter
+ ByteBuffer binaryExtendedExpressions =
createExtendedExpresionMessageUsingPOJOClasses();
Review Comment:
There is no reason to be so verbose
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -470,12 +471,37 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
- if (columns != nullptr) {
- std::vector<std::string> column_vector = ToStringVector(env, columns);
+ if (columns_subset != nullptr) {
+ std::vector<std::string> column_vector = ToStringVector(env,
columns_subset);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
+ if (columns_to_produce_or_filter != nullptr) {
+ auto *buff =
reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(columns_to_produce_or_filter));
+ int length = env->GetDirectBufferCapacity(columns_to_produce_or_filter);
+ std::shared_ptr<arrow::Buffer> buffer =
JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
+ arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ arrow::compute::Expression filter_expr;
+ int filter_count = 0;
+ for(arrow::engine::NamedExpression named_expression :
bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_count > 0) {
+ JniThrow("The process only support one filter expression declared");
+ }
+ filter_expr = named_expression.expression;
+ filter_count++;
Review Comment:
That said, we should design the API to separate project and filter
expressions in the first place.
##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,323 @@ Here is an example of a Java program that queries a
Parquet file using Java Subs
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular
accounts. bold requests alon
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Dataset also supports projections and filters with Substrait's extended
expressions.
+This requires the substrait-java library.
+
+This Java program:
+
+- Loads a Parquet file containing the "nation" table from the TPC-H benchmark.
+- Projects two new columns:
+ - ``N_NAME || ' - ' || N_COMMENT``
+ - ``N_REGIONKEY + 10``
+- Applies a filter: ``N_NATIONKEY > 18``
+
+.. code-block:: Java
+
+ import com.google.protobuf.InvalidProtocolBufferException;
+ import io.substrait.extension.ExtensionCollector;
+ import io.substrait.proto.Expression;
+ import io.substrait.proto.ExpressionReference;
+ import io.substrait.proto.ExtendedExpression;
+ import io.substrait.proto.FunctionArgument;
+ import io.substrait.proto.SimpleExtensionDeclaration;
+ import io.substrait.proto.SimpleExtensionURI;
+ import io.substrait.type.NamedStruct;
+ import io.substrait.type.Type;
+ import io.substrait.type.TypeCreator;
+ import io.substrait.type.proto.TypeProtoConverter;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Optional;
+
+ public class ClientSubstraitExtendedExpressions {
+ public static void main(String[] args) throws Exception {
+ // create extended expression for: project two new columns + one
filter
+ ByteBuffer binaryExtendedExpressions =
createExtendedExpresionMessageUsingPOJOClasses();
Review Comment:
Long method names hurt clarity in the docs where the width is limited
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]