danepitkin commented on code in PR #35570:
URL: https://github.com/apache/arrow/pull/35570#discussion_r1313211229


##########
docs/source/java/dataset.rst:
##########
@@ -159,6 +157,26 @@ Or use shortcut construtor:
 
 Then all columns will be emitted during scanning.
 
+Projection (Produce New Columns) and Filters
+============================================
+
+User can specify projections (new columns) or filters in ScanOptions. For 
example:

Review Comment:
   ```suggestion
   User can specify projections (new columns) or filters in ScanOptions using 
Substrait. For example:
   ```



##########
docs/source/java/dataset.rst:
##########
@@ -159,6 +157,26 @@ Or use shortcut construtor:
 
 Then all columns will be emitted during scanning.
 
+Projection (Produce New Columns) and Filters
+============================================
+
+User can specify projections (new columns) or filters in ScanOptions. For 
example:
+
+.. code-block:: Java
+
+   ByteBuffer substraitExtendedExpressions = ...;

Review Comment:
   We need two `ByteBuffer`s in the example now, one for filter and one for 
projection. They should be passed into the options builder below (instead of 
the getSubstraitExpressionX() methods).



##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java:
##########
@@ -69,4 +74,77 @@ public Optional<String[]> getColumns() {
   public long getBatchSize() {
     return batchSize;
   }
+
+  public Optional<ByteBuffer> getSubstraitExpressionProjection() {
+    return substraitExpressionProjection;
+  }
+
+  public Optional<ByteBuffer> getSubstraitExpressionFilter() {
+    return substraitExpressionFilter;
+  }

Review Comment:
   Can we call these `substrait_projection` and `substrait_filter`? I think we 
can either leave out the word "expression" or else change it 
"susbstrait_extended_expression_X" if we want to be verbose. I'm curious if 
other folks have thoughts on readability. Substrait will probably be a new 
concept to many Arrow Java users so I think it would be good have consistent 
and clear naming here.
   
   If we change the naming, it would be best to change it everywhere e.g. in 
JNI/C++, too.



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testBaseParquetReadWithExtendedExpressionsFilter() throws 
Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("id", new ArrowType.Int(32, true)),
+        Field.nullable("name", new ArrowType.Utf8())
+    ), null);
+    // Substrait Extended Expression: Filter:
+    // Expression 01: WHERE ID < 20
+    String binarySubstraitExpressionFilter = 
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
+        
"ueRo3ChwaGggCGgQKAhABIggaBhIECgISACIGGgQKAigUGhdmaWx0ZXJfaWRfbG93ZXJfdGhhbl8yMCIaCgJJRAoETkFNRRIOCgQqAhA"
 +
+        "BCgRiAhABGAI=";
+    byte[] arrayByteSubstraitExpressionFilter = 
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+    ByteBuffer substraitExpressionFilter = 
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+    substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1, 
"value_1",
+            11, "value_11", 21, "value_21", 45, "value_45");
+    ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+        .columns(Optional.empty())
+        .substraitExpressionFilter(substraitExpressionFilter)
+        .build();
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+      int rowcount = 0;
+      while (reader.loadNextBatch()) {
+        rowcount += reader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);
+    }
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testBaseParquetReadWithExtendedExpressionsFilterException() 
throws Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("id", new ArrowType.Int(32, true)),
+        Field.nullable("name", new ArrowType.Utf8())
+    ), null);
+    // Substrait Extended Expression: Filter:
+    // Expression 01: WHERE ID < 20
+    // Expression 02: WHERE ID < 10
+    String binarySubstraitExpressionFilter = 
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW5" +
+        
"5X2FueRISGhAIAhACGgpsdDphbnlfYW55GjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKBQaF2ZpbHRlcl9pZF9sb3dlcl9"
 +
+        
"0aGFuXzIwGjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKAoaF2ZpbHRlcl9pZF9sb3dlcl90aGFuXzEwIhoKAklECgROQU1F"
 +
+        "Eg4KBCoCEAEKBGICEAEYAg==";
+    byte[] arrayByteSubstraitExpressionFilter = 
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+    ByteBuffer substraitExpressionFilter = 
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+    substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1, 
"value_1",
+            11, "value_11", 21, "value_21", 45, "value_45");
+    ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+        .columns(Optional.empty())
+        .substraitExpressionFilter(substraitExpressionFilter)
+        .build();
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+      int rowcount = 0;
+      while (reader.loadNextBatch()) {
+        rowcount += reader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);
+    }
+  }

Review Comment:
   Can we delete the code that should not run after the expected Exception is 
thrown?



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testBaseParquetReadWithExtendedExpressionsFilter() throws 
Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("id", new ArrowType.Int(32, true)),
+        Field.nullable("name", new ArrowType.Utf8())
+    ), null);
+    // Substrait Extended Expression: Filter:
+    // Expression 01: WHERE ID < 20
+    String binarySubstraitExpressionFilter = 
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
+        
"ueRo3ChwaGggCGgQKAhABIggaBhIECgISACIGGgQKAigUGhdmaWx0ZXJfaWRfbG93ZXJfdGhhbl8yMCIaCgJJRAoETkFNRRIOCgQqAhA"
 +
+        "BCgRiAhABGAI=";
+    byte[] arrayByteSubstraitExpressionFilter = 
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+    ByteBuffer substraitExpressionFilter = 
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+    substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1, 
"value_1",
+            11, "value_11", 21, "value_21", 45, "value_45");
+    ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+        .columns(Optional.empty())
+        .substraitExpressionFilter(substraitExpressionFilter)
+        .build();
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+      int rowcount = 0;
+      while (reader.loadNextBatch()) {
+        rowcount += reader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);
+    }
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testBaseParquetReadWithExtendedExpressionsFilterException() 
throws Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("id", new ArrowType.Int(32, true)),
+        Field.nullable("name", new ArrowType.Utf8())
+    ), null);
+    // Substrait Extended Expression: Filter:
+    // Expression 01: WHERE ID < 20
+    // Expression 02: WHERE ID < 10
+    String binarySubstraitExpressionFilter = 
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW5" +
+        
"5X2FueRISGhAIAhACGgpsdDphbnlfYW55GjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKBQaF2ZpbHRlcl9pZF9sb3dlcl9"
 +
+        
"0aGFuXzIwGjcKHBoaCAIaBAoCEAEiCBoGEgQKAhIAIgYaBAoCKAoaF2ZpbHRlcl9pZF9sb3dlcl90aGFuXzEwIhoKAklECgROQU1F"
 +
+        "Eg4KBCoCEAEKBGICEAEYAg==";
+    byte[] arrayByteSubstraitExpressionFilter = 
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+    ByteBuffer substraitExpressionFilter = 
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+    substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1, 
"value_1",
+            11, "value_11", 21, "value_21", 45, "value_45");
+    ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+        .columns(Optional.empty())
+        .substraitExpressionFilter(substraitExpressionFilter)
+        .build();
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+      int rowcount = 0;
+      while (reader.loadNextBatch()) {
+        rowcount += reader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);
+    }
+  }
+
+  @Test
+  public void testBaseParquetReadWithExtendedExpressionsProject() throws 
Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("add_two_to_column_a", new ArrowType.Int(32, true)),
+        Field.nullable("concat_column_a_and_b", new ArrowType.Utf8())
+    ), null);
+    // Substrait Extended Expression: Project New Column:
+    // Expression ADD: id + 2
+    // Expression CONCAT: name + '-' + name
+    String binarySubstraitExpressionProject = 
"Ch4IARIaL2Z1bmN0aW9uc19hcml0aG1ldGljLnlhbWwSERoPCAEaC2FkZDppM" +
+        
"zJfaTMyEhQaEggCEAEaDGNvbmNhdDp2Y2hhchoxChoaGBoEKgIQASIIGgYSBAoCEgAiBhoECgIoAhoTYWRkX3R3b190b19jb2x1"
 +
+        
"bW5fYRpGCi0aKwgBGgRiAhABIgoaCBIGCgQSAggBIgkaBwoFYgMgLSAiChoIEgYKBBICCAEaFWNvbmNhdF9jb2x1bW5fYV9hbmR"
 +
+        "fYiIaCgJJRAoETkFNRRIOCgQqAhABCgRiAhABGAI=";
+    byte[] arrayByteSubstraitExpressionProject = 
Base64.getDecoder().decode(binarySubstraitExpressionProject);
+    ByteBuffer substraitExpressionProject = 
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionProject.length);
+    substraitExpressionProject.put(arrayByteSubstraitExpressionProject);
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1, 
"value_1",
+            11, "value_11", 21, "value_21", 45, "value_45");
+    ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+        .columns(Optional.empty())
+         .substraitExpressionProjection(substraitExpressionProject)
+        .build();
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+      int rowcount = 0;
+      while (reader.loadNextBatch()) {
+        rowcount += reader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(5, rowcount);
+    }
+  }
+
+  @Test
+  public void testBaseParquetReadWithExtendedExpressionsProjectAndFilter() 
throws Exception {

Review Comment:
   Optional: There's a decent amount of duplicated code in the test cases. Do 
you think it would be possible to create a generic function that can be 
parameterized for each test case?
   
   maybe something like this?
   ```
   private ArrowReader scanParquetFileUsingSubstrait(String 
base64EncodedSubstraitFilter, String base64EncodedSubstraitProjection) throws 
Exception {...}
   ```
   



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testBaseParquetReadWithExtendedExpressionsFilter() throws 
Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("id", new ArrowType.Int(32, true)),
+        Field.nullable("name", new ArrowType.Utf8())
+    ), null);
+    // Substrait Extended Expression: Filter:
+    // Expression 01: WHERE ID < 20
+    String binarySubstraitExpressionFilter = 
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +

Review Comment:
   Nit: Would it be more descriptive to call this 
`base64EncodedSubstraitFilter`? Then it would get decoded into a var called 
`substraitFilter`. I usually don't like to get too picky with naming, but in 
this case it would be nice to be highly descriptive of what these 
random-looking strings are. 



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -204,4 +205,167 @@ public void testRunBinaryQueryNamedTableNation() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testBaseParquetReadWithExtendedExpressionsFilter() throws 
Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("id", new ArrowType.Int(32, true)),
+        Field.nullable("name", new ArrowType.Utf8())
+    ), null);
+    // Substrait Extended Expression: Filter:
+    // Expression 01: WHERE ID < 20
+    String binarySubstraitExpressionFilter = 
"Ch4IARIaL2Z1bmN0aW9uc19jb21wYXJpc29uLnlhbWwSEhoQCAIQAhoKbHQ6YW55X2F" +
+        
"ueRo3ChwaGggCGgQKAhABIggaBhIECgISACIGGgQKAigUGhdmaWx0ZXJfaWRfbG93ZXJfdGhhbl8yMCIaCgJJRAoETkFNRRIOCgQqAhA"
 +
+        "BCgRiAhABGAI=";
+    byte[] arrayByteSubstraitExpressionFilter = 
Base64.getDecoder().decode(binarySubstraitExpressionFilter);
+    ByteBuffer substraitExpressionFilter = 
ByteBuffer.allocateDirect(arrayByteSubstraitExpressionFilter.length);
+    substraitExpressionFilter.put(arrayByteSubstraitExpressionFilter);
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 19, "value_19", 1, 
"value_1",
+            11, "value_11", 21, "value_21", 45, "value_45");
+    ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
+        .columns(Optional.empty())
+        .substraitExpressionFilter(substraitExpressionFilter)
+        .build();
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+      int rowcount = 0;
+      while (reader.loadNextBatch()) {
+        rowcount += reader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);

Review Comment:
   Optional: Would it be easy to replace the final assertion in these test 
cases with an exact match of values instead of just row count? If not, then 
ignore this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to