lidavidm commented on code in PR #35570:
URL: https://github.com/apache/arrow/pull/35570#discussion_r1306090414
##########
java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java:
##########
@@ -32,4 +32,14 @@ public interface Dataset extends AutoCloseable {
* @return the Scanner instance
*/
Scanner newScan(ScanOptions options);
+
+ /**
+ * Create a new Scanner, using the provided options,
+ * that contains the binary representation of the Substrait
+ * Extended Expression.
+ *
+ * @param options options used during creating Scanner
+ * @return the Scanner instance
+ */
+ Scanner newSubstraitScan(ScanOptions options);
Review Comment:
The point of having a ScanOptions object is that we shouldn't need a second
function like this, right? newScan should just do the right thing based on the
options.
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -69,4 +83,8 @@ public Optional<String[]> getColumns() {
public long getBatchSize() {
return batchSize;
}
+
+ public ByteBuffer getSubstraitExtendedExpression() {
Review Comment:
Why don't we separate out projection and filter? C++ does this.
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -484,6 +493,56 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
JNI_METHOD_END(-1L)
}
+/*
+ * Class: org_apache_arrow_dataset_jni_JniWrapper
+ * Method: createSubstraitScanner
+ * Signature: (JLjava/nio/ByteBuffer;JJ)J
+ */
+JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createSubstraitScanner(
+ JNIEnv* env, jobject, jlong dataset_id, jobject
substrait_expr_produce_or_filter, jlong batch_size,
+ jlong memory_pool_id) {
+ JNI_METHOD_START
+ arrow::MemoryPool* pool =
reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
+ if (pool == nullptr) {
+ JniThrow("Memory pool does not exist or has been closed");
+ }
+ std::shared_ptr<arrow::dataset::Dataset> dataset =
+ RetrieveNativeInstance<arrow::dataset::Dataset>(dataset_id);
+ std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+ JniGetOrThrow(dataset->NewScan());
+ JniAssertOkOrThrow(scanner_builder->Pool(pool));
+ if (substrait_expr_produce_or_filter != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_expr_produce_or_filter);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ std::optional<arrow::compute::Expression> filter_expr;
+ const arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression named_expression :
Review Comment:
```suggestion
arrow::engine::BoundExpressions bounded_expression =
JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
for(arrow::engine::NamedExpression& named_expression :
```
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -484,6 +493,56 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
JNI_METHOD_END(-1L)
}
+/*
+ * Class: org_apache_arrow_dataset_jni_JniWrapper
+ * Method: createSubstraitScanner
+ * Signature: (JLjava/nio/ByteBuffer;JJ)J
+ */
+JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createSubstraitScanner(
+ JNIEnv* env, jobject, jlong dataset_id, jobject
substrait_expr_produce_or_filter, jlong batch_size,
+ jlong memory_pool_id) {
+ JNI_METHOD_START
+ arrow::MemoryPool* pool =
reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
+ if (pool == nullptr) {
+ JniThrow("Memory pool does not exist or has been closed");
+ }
+ std::shared_ptr<arrow::dataset::Dataset> dataset =
+ RetrieveNativeInstance<arrow::dataset::Dataset>(dataset_id);
+ std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+ JniGetOrThrow(dataset->NewScan());
+ JniAssertOkOrThrow(scanner_builder->Pool(pool));
+ if (substrait_expr_produce_or_filter != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_expr_produce_or_filter);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ std::optional<arrow::compute::Expression> filter_expr;
+ const arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression named_expression :
+ bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_expr.has_value()) {
+ JniThrow("Only one filter expression may be provided");
+ }
+ filter_expr = named_expression.expression;
+ } else {
+ project_exprs.push_back(named_expression.expression);
+ project_names.push_back(named_expression.name);
Review Comment:
```suggestion
project_exprs.push_back(std::move(named_expression.expression));
project_names.push_back(std::move(named_expression.name));
```
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -484,6 +493,56 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
JNI_METHOD_END(-1L)
}
+/*
+ * Class: org_apache_arrow_dataset_jni_JniWrapper
+ * Method: createSubstraitScanner
+ * Signature: (JLjava/nio/ByteBuffer;JJ)J
+ */
+JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createSubstraitScanner(
+ JNIEnv* env, jobject, jlong dataset_id, jobject
substrait_expr_produce_or_filter, jlong batch_size,
+ jlong memory_pool_id) {
+ JNI_METHOD_START
+ arrow::MemoryPool* pool =
reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
+ if (pool == nullptr) {
+ JniThrow("Memory pool does not exist or has been closed");
+ }
+ std::shared_ptr<arrow::dataset::Dataset> dataset =
+ RetrieveNativeInstance<arrow::dataset::Dataset>(dataset_id);
+ std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+ JniGetOrThrow(dataset->NewScan());
+ JniAssertOkOrThrow(scanner_builder->Pool(pool));
+ if (substrait_expr_produce_or_filter != nullptr) {
Review Comment:
this wasn't renamed?
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -69,4 +83,8 @@ public Optional<String[]> getColumns() {
public long getBatchSize() {
return batchSize;
}
+
+ public ByteBuffer getSubstraitExtendedExpression() {
Review Comment:
Either both getters should use Optional or neither should.
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -27,6 +28,7 @@
public class ScanOptions {
private final Optional<String[]> columns;
private final long batchSize;
+ private ByteBuffer substraitExtendedExpression;
Review Comment:
This must be final.
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -484,6 +493,56 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
JNI_METHOD_END(-1L)
}
+/*
+ * Class: org_apache_arrow_dataset_jni_JniWrapper
+ * Method: createSubstraitScanner
+ * Signature: (JLjava/nio/ByteBuffer;JJ)J
+ */
+JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createSubstraitScanner(
+ JNIEnv* env, jobject, jlong dataset_id, jobject
substrait_expr_produce_or_filter, jlong batch_size,
+ jlong memory_pool_id) {
+ JNI_METHOD_START
+ arrow::MemoryPool* pool =
reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
+ if (pool == nullptr) {
+ JniThrow("Memory pool does not exist or has been closed");
+ }
+ std::shared_ptr<arrow::dataset::Dataset> dataset =
+ RetrieveNativeInstance<arrow::dataset::Dataset>(dataset_id);
+ std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+ JniGetOrThrow(dataset->NewScan());
+ JniAssertOkOrThrow(scanner_builder->Pool(pool));
+ if (substrait_expr_produce_or_filter != nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
+
substrait_expr_produce_or_filter);
+ std::vector<arrow::compute::Expression> project_exprs;
+ std::vector<std::string> project_names;
+ std::optional<arrow::compute::Expression> filter_expr;
+ const arrow::engine::BoundExpressions bounded_expression =
+ JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
+ for(arrow::engine::NamedExpression named_expression :
+ bounded_expression.named_expressions) {
+ if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
+ if (filter_expr.has_value()) {
+ JniThrow("Only one filter expression may be provided");
+ }
+ filter_expr = named_expression.expression;
+ } else {
+ project_exprs.push_back(named_expression.expression);
+ project_names.push_back(named_expression.name);
+ }
+ }
+ JniAssertOkOrThrow(scanner_builder->Project(std::move(project_exprs),
std::move(project_names)));
+ JniAssertOkOrThrow(scanner_builder->Filter(*std::move(filter_expr)));
Review Comment:
```suggestion
JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr));
```
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -58,6 +60,18 @@ public ScanOptions(long batchSize, Optional<String[]>
columns) {
this.columns = columns;
}
+ /**
+ * Constructor.
+ * @param batchSize Maximum row number of each returned {@link
org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ * @param substraitExtendedExpression Extended expression to evaluate for
project new columns or apply filter.
+ */
+ public ScanOptions(long batchSize, ByteBuffer substraitExtendedExpression) {
Review Comment:
We can and still should have a builder, I was just saying that it doesn't
make sense to pass Optional as a constructor parameter here...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]