danepitkin commented on code in PR #35570:
URL: https://github.com/apache/arrow/pull/35570#discussion_r1313211229
##########
docs/source/java/dataset.rst:
##########
@@ -159,6 +157,26 @@ Or use shortcut construtor:
Then all columns will be emitted during scanning.
+Projection (Produce New Columns) and Filters
+============================================
+
+User can specify projections (new columns) or filters in ScanOptions. For
example:
Review Comment:
```suggestion
User can specify projections (new columns) or filters in ScanOptions using
Substrait. For example:
```
##########
docs/source/java/dataset.rst:
##########
@@ -159,6 +157,26 @@ Or use shortcut construtor:
Then all columns will be emitted during scanning.
+Projection (Produce New Columns) and Filters
+============================================
+
+User can specify projections (new columns) or filters in ScanOptions. For
example:
+
+.. code-block:: Java
+
+ ByteBuffer substraitExtendedExpressions = ...;
Review Comment:
We need two `ByteBuffer`s in the example now, one for filter and one for
projection. They should be passed into the options builder below (instead of
the getSubstraitExpressionX() methods).
##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -69,4 +74,77 @@ public Optional<String[]> getColumns() {
public long getBatchSize() {
return batchSize;
}
+
+ public Optional<ByteBuffer> getSubstraitExpressionProjection() {
+ return substraitExpressionProjection;
+ }
+
+ public Optional<ByteBuffer> getSubstraitExpressionFilter() {
+ return substraitExpressionFilter;
+ }
Review Comment:
Can we call these `substrait_projection` and `substrait_filter`? I think we
can either leave out the word "expression" or else change it
"susbstrait_extended_expression_X" if we want to be verbose. I'm curious if
other folks have thoughts on readability. Substrait will probably be a new
concept to many Arrow Java users so I think it would be good have consistent
and clear naming here.
If we change the naming, it would be best to change it everywhere e.g. in
JNI/C++, too.
##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws
Exception {
}
}
}
+
+ @Test
+ public void testBaseParquetReadWithExtendedExpressionsFilter() throws
Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Filter:
+ // Expression 01: WHERE ID < 20
+ String binarySubstraitExpressionFilter =
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
+
"ueRo3ChwaGggCGgQKAhABIggaBhIECgISACIGGgQKAigUGhdmaWx0ZXJfaWRfbG93ZXJfdGhhbl8yMCIaCgJJRAoETkFNRRIOCgQqAhA"
+
+ "BCgRiAhABGAI=";
+ byte[] arrayByteSubstraitExpressionFilter =
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+ ByteBuffer substraitExpressionFilter =
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+ substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1,
"value_1",
+ 11, "value_11", 21, "value_21", 45, "value_45");
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitExpressionFilter(substraitExpressionFilter)
+ .build();
+ try (
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowcount = 0;
+ while (reader.loadNextBatch()) {
+ rowcount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testBaseParquetReadWithExtendedExpressionsFilterException()
throws Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Filter:
+ // Expression 01: WHERE ID < 20
+ // Expression 02: WHERE ID < 10
+ String binarySubstraitExpressionFilter =
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW5" +
+
"5X2FueRISGhAIAhACGgpsdDphbnlfYW55GjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKBQaF2ZpbHRlcl9pZF9sb3dlcl9"
+
+
"0aGFuXzIwGjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKAoaF2ZpbHRlcl9pZF9sb3dlcl90aGFuXzEwIhoKAklECgROQU1F"
+
+ "Eg4KBCoCEAEKBGICEAEYAg==";
+ byte[] arrayByteSubstraitExpressionFilter =
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+ ByteBuffer substraitExpressionFilter =
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+ substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1,
"value_1",
+ 11, "value_11", 21, "value_21", 45, "value_45");
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitExpressionFilter(substraitExpressionFilter)
+ .build();
+ try (
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowcount = 0;
+ while (reader.loadNextBatch()) {
+ rowcount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
Review Comment:
Can we delete the code that should not run after the expected Exception is
thrown?
##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws
Exception {
}
}
}
+
+ @Test
+ public void testBaseParquetReadWithExtendedExpressionsFilter() throws
Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Filter:
+ // Expression 01: WHERE ID < 20
+ String binarySubstraitExpressionFilter =
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
+
"ueRo3ChwaGggCGgQKAhABIggaBhIECgISACIGGgQKAigUGhdmaWx0ZXJfaWRfbG93ZXJfdGhhbl8yMCIaCgJJRAoETkFNRRIOCgQqAhA"
+
+ "BCgRiAhABGAI=";
+ byte[] arrayByteSubstraitExpressionFilter =
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+ ByteBuffer substraitExpressionFilter =
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+ substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1,
"value_1",
+ 11, "value_11", 21, "value_21", 45, "value_45");
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitExpressionFilter(substraitExpressionFilter)
+ .build();
+ try (
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowcount = 0;
+ while (reader.loadNextBatch()) {
+ rowcount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testBaseParquetReadWithExtendedExpressionsFilterException()
throws Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Filter:
+ // Expression 01: WHERE ID < 20
+ // Expression 02: WHERE ID < 10
+ String binarySubstraitExpressionFilter =
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW5" +
+
"5X2FueRISGhAIAhACGgpsdDphbnlfYW55GjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKBQaF2ZpbHRlcl9pZF9sb3dlcl9"
+
+
"0aGFuXzIwGjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKAoaF2ZpbHRlcl9pZF9sb3dlcl90aGFuXzEwIhoKAklECgROQU1F"
+
+ "Eg4KBCoCEAEKBGICEAEYAg==";
+ byte[] arrayByteSubstraitExpressionFilter =
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+ ByteBuffer substraitExpressionFilter =
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+ substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1,
"value_1",
+ 11, "value_11", 21, "value_21", 45, "value_45");
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitExpressionFilter(substraitExpressionFilter)
+ .build();
+ try (
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowcount = 0;
+ while (reader.loadNextBatch()) {
+ rowcount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
+
+ @Test
+ public void testBaseParquetReadWithExtendedExpressionsProject() throws
Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("add_two_to_column_a", new ArrowType.Int(32, true)),
+ Field.nullable("concat_column_a_and_b", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Project New Column:
+ // Expression ADD: id + 2
+ // Expression CONCAT: name + '-' + name
+ String binarySubstraitExpressionProject =
"Ch4IARIaL2Z1bmN0aW9uc19hcml0aG1ldGljLnlhbWwSERoPCAEaC2FkZDppM" +
+
"zJfaTMyEhQaEggCEAEaDGNvbmNhdDp2Y2hhchoxChoaGBoEKgIQASIIGgYSBAoCEgAiBhoECgIoAhoTYWRkX3R3b190b19jb2x1"
+
+
"bW5fYRpGCi0aKwgBGgRiAhABIgoaCBIGCgQSAggBIgkaBwoFYgMgLSAiChoIEgYKBBICCAEaFWNvbmNhdF9jb2x1bW5fYV9hbmR"
+
+ "fYiIaCgJJRAoETkFNRRIOCgQqAhABCgRiAhABGAI=";
+ byte[] arrayByteSubstraitExpressionProject =
Base64.getDecoder().decode(binarySubstraitExpressionProject);
+ ByteBuffer substraitExpressionProject =
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionProject.length);
+ substraitExpressionProject.put(arrayByteSubstraitExpressionProject);
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1,
"value_1",
+ 11, "value_11", 21, "value_21", 45, "value_45");
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitExpressionProjection(substraitExpressionProject)
+ .build();
+ try (
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowcount = 0;
+ while (reader.loadNextBatch()) {
+ rowcount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(5, rowcount);
+ }
+ }
+
+ @Test
+ public void testBaseParquetReadWithExtendedExpressionsProjectAndFilter()
throws Exception {
Review Comment:
Optional: There's a decent amount of duplicated code in the test cases. Do
you think it would be possible to create a generic function that can be
parameterized for each test case?
maybe something like this?
```
private ArrowReader scanParquetFileUsingSubstrait(String
base64EncodedSubstraitFilter, String base64EncodedSubstraitProjection) throws
Exception {...}
```
##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws
Exception {
}
}
}
+
+ @Test
+ public void testBaseParquetReadWithExtendedExpressionsFilter() throws
Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Filter:
+ // Expression 01: WHERE ID < 20
+ String binarySubstraitExpressionFilter =
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
Review Comment:
Nit: Would it be more descriptive to call this
`base64EncodedSubstraitFilter`? Then it would get decoded into a var called
`substraitFilter`. I usually don't like to get too picky with naming, but in
this case it would be nice to be highly descriptive of what these
random-looking strings are.
##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws
Exception {
}
}
}
+
+ @Test
+ public void testBaseParquetReadWithExtendedExpressionsFilter() throws
Exception {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", new ArrowType.Utf8())
+ ), null);
+ // Substrait Extended Expression: Filter:
+ // Expression 01: WHERE ID < 20
+ String binarySubstraitExpressionFilter =
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
+
"ueRo3ChwaGggCGgQKAhABIggaBhIECgISACIGGgQKAigUGhdmaWx0ZXJfaWRfbG93ZXJfdGhhbl8yMCIaCgJJRAoETkFNRRIOCgQqAhA"
+
+ "BCgRiAhABGAI=";
+ byte[] arrayByteSubstraitExpressionFilter =
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+ ByteBuffer substraitExpressionFilter =
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+ substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1,
"value_1",
+ 11, "value_11", 21, "value_21", 45, "value_45");
+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .substraitExpressionFilter(substraitExpressionFilter)
+ .build();
+ try (
+ DatasetFactory datasetFactory = new
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowcount = 0;
+ while (reader.loadNextBatch()) {
+ rowcount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
Review Comment:
Optional: Would it be easy to replace the final assertion in these test
cases with an exact match of values instead of just row count? If not, then
ignore this comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]