flyrain commented on a change in pull request #3287:
URL: https://github.com/apache/iceberg/pull/3287#discussion_r730015550



##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
##########
@@ -33,6 +33,7 @@
 public abstract class BaseBatchReader<T> implements VectorizedReader<T> {
   protected final VectorizedArrowReader[] readers;
   protected final VectorHolder[] vectorHolders;
+  protected long rowStartPos = 0;

Review comment:
       It is a row start position in a batch, not a row group starting 
position. I'll change it to something like `rowStartPosInBatch`

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
                 expectedSchema, fileSchema, setArrowValidityVector,
                 idToConstant, ColumnarBatchReader::new));
   }
+
+  public static ColumnarBatchReader buildReader(
+      Schema expectedSchema,
+      MessageType fileSchema,
+      boolean setArrowValidityVector,
+      Map<Integer, ?> idToConstant,
+      DeleteFilter deleteFilter) {
+    return (ColumnarBatchReader)
+        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+            new ReaderBuilder(
+                expectedSchema, fileSchema, setArrowValidityVector,
+                idToConstant, ColumnarBatchReader::new, deleteFilter));
+  }
+
+  private static class ReaderBuilder extends VectorizedReaderBuilder {

Review comment:
       I tried to modify `VectorizedReaderBuilder`, but to add `DeleteFilter` 
into `VectorizedReaderBuilder`, we need to add module `iceberg-data` to  the 
dependency of `iceberg-arrow`. 
   I guess that's fine? Will make the change in build.gradle. Let me know if 
there is any concern.

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
                 expectedSchema, fileSchema, setArrowValidityVector,
                 idToConstant, ColumnarBatchReader::new));
   }
+
+  public static ColumnarBatchReader buildReader(
+      Schema expectedSchema,
+      MessageType fileSchema,
+      boolean setArrowValidityVector,
+      Map<Integer, ?> idToConstant,
+      DeleteFilter deleteFilter) {
+    return (ColumnarBatchReader)
+        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+            new ReaderBuilder(
+                expectedSchema, fileSchema, setArrowValidityVector,
+                idToConstant, ColumnarBatchReader::new, deleteFilter));
+  }
+
+  private static class ReaderBuilder extends VectorizedReaderBuilder {

Review comment:
       Oh, to extend VectroizedReader with setDeleteFilter, we need to let 
module `iceberg-parquet` depend on `iceberg-data`, which will form a cyclic 
dependency, since `iceberg-data` has depended on `iceberg-parquet`, 
https://github.com/apache/iceberg/blob/dbfa71e3e06ee5a3c2f126cb20fe4d7acabf9c64/build.gradle#L241.
 
   I'd suggest to move interface `VectorizedReader` to module `api` if we 
really want to do this. 
   
   Also, is it possible to merge the module `data` into `core`, so that we 
don't need to make module `arrow` depend on `data`.

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
                 expectedSchema, fileSchema, setArrowValidityVector,
                 idToConstant, ColumnarBatchReader::new));
   }
+
+  public static ColumnarBatchReader buildReader(
+      Schema expectedSchema,
+      MessageType fileSchema,
+      boolean setArrowValidityVector,
+      Map<Integer, ?> idToConstant,
+      DeleteFilter deleteFilter) {
+    return (ColumnarBatchReader)
+        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+            new ReaderBuilder(
+                expectedSchema, fileSchema, setArrowValidityVector,
+                idToConstant, ColumnarBatchReader::new, deleteFilter));
+  }
+
+  private static class ReaderBuilder extends VectorizedReaderBuilder {

Review comment:
       Oh, to extend VectroizedReader with setDeleteFilter, we need to let 
module `iceberg-parquet` depend on `iceberg-data`, which will form a cyclic 
dependency, since `iceberg-data` has depended on `iceberg-parquet`, 
https://github.com/apache/iceberg/blob/dbfa71e3e06ee5a3c2f126cb20fe4d7acabf9c64/build.gradle#L241.
 
   I'd suggest to move interface `VectorizedReader` to module `api` if we 
really want to do this. 
   
   Also, is it possible to merge the module `data` into `core`, so that we 
don't need to make module `arrow` depend on `data`.
   
   This seems a big change, I'd suggest to have a separated PR for it.

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
                 expectedSchema, fileSchema, setArrowValidityVector,
                 idToConstant, ColumnarBatchReader::new));
   }
+
+  public static ColumnarBatchReader buildReader(
+      Schema expectedSchema,
+      MessageType fileSchema,
+      boolean setArrowValidityVector,
+      Map<Integer, ?> idToConstant,
+      DeleteFilter deleteFilter) {
+    return (ColumnarBatchReader)
+        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+            new ReaderBuilder(
+                expectedSchema, fileSchema, setArrowValidityVector,
+                idToConstant, ColumnarBatchReader::new, deleteFilter));
+  }
+
+  private static class ReaderBuilder extends VectorizedReaderBuilder {

Review comment:
       Oh, to extend VectroizedReader with setDeleteFilter, we need to let 
module `iceberg-parquet` depend on `iceberg-data`, which will form a cyclic 
dependency, since `iceberg-data` has depended on `iceberg-parquet`, 
https://github.com/apache/iceberg/blob/dbfa71e3e06ee5a3c2f126cb20fe4d7acabf9c64/build.gradle#L241
   I'd suggest to move interface `VectorizedReader` to module `api` if we 
really want to do this. 
   
   Also, is it possible to merge the module `data` into `core`, so that we 
don't need to make module `arrow` depend on `data`.
   
   This seems a big change, I'd suggest to have a separated PR for it.

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -33,11 +38,16 @@
  * {@linkplain VectorizedArrowReader VectorReader(s)}.
  */
 public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
+  private DeleteFilter deletes = null;
 
   public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
     super(readers);
   }
 
+  public void setDeleteFilter(DeleteFilter deleteFilter) {
+    this.deletes = deleteFilter;

Review comment:
       The method `hasEqDeletes` is invoked by Spark 2/3 to ensure it falls 
back to non-vectorized read if any eq-delete is there. We got this check to 
only work with pos-delete in class `ColumnarBatchReader`.
   ```
     private Map<Integer, Integer> rowIdMapping(int numRows) {
       if (deletes != null && deletes.hasPosDeletes()) {
   ```

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
##########
@@ -33,6 +33,7 @@
 public abstract class BaseBatchReader<T> implements VectorizedReader<T> {
   protected final VectorizedArrowReader[] readers;
   protected final VectorHolder[] vectorHolders;
+  protected long rowStartPos = 0;

Review comment:
       I will move it to `ColumnarBatchReader` in the next commit.

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -183,6 +189,18 @@ protected boolean shouldKeep(T item) {
     return remainingRowsFilter.filter(records);
   }
 
+  public Set<Long> posDeletedRowIds() {

Review comment:
       Thanks @pvary and @aokolnychyi. Will use bitmap in the next commit, so 
that we don't have to worry about the memory constraint, considering bitmap is 
super compacted comparing to `Set<Long>`.

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -183,6 +189,18 @@ protected boolean shouldKeep(T item) {
     return remainingRowsFilter.filter(records);
   }
 
+  public Set<Long> posDeletedRowIds() {

Review comment:
       Java's BitSet has its limitations
   1. Its max length is the max value of an integer, but the row position is a 
`Long`.
   2. For a large file with rows deleted at the end the files, their positions 
are big number, which means we need a lot of memory to store these positions. 
The worst case is we need a Integer.Max/8 bytes(about 0.25G) memory to store a 
single position value at the end of the file.
   
   We need a Sparse BitSet which can support set a `Long` position.

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -183,6 +189,18 @@ protected boolean shouldKeep(T item) {
     return remainingRowsFilter.filter(records);
   }
 
+  public Set<Long> posDeletedRowIds() {

Review comment:
       Java's BitSet has its limitations
   1. Its max length is the max value of an integer, but the row position is a 
`Long`.
   2. For a large file with a few rows deleted at the end the files, their 
positions are big number, which means we need a lot of memory to store these 
positions. The worst case is we need a Integer.Max/8 bytes(about 0.25G) memory 
to store a single position value at the end of the file.
   
   We need a Sparse BitSet which can support set a `Long` position.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to