flyrain commented on a change in pull request #3287:
URL: https://github.com/apache/iceberg/pull/3287#discussion_r737884990



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +68,71 @@ public final ColumnarBatch read(ColumnarBatch reuse, int 
numRowsToRead) {
       closeVectors();
     }
 
+    Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
     for (int i = 0; i < readers.length; i += 1) {
       vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
       int numRowsInVector = vectorHolders[i].numValues();
       Preconditions.checkState(
           numRowsInVector == numRowsToRead,
           "Number of rows in the vector %s didn't match expected %s ", 
numRowsInVector,
           numRowsToRead);
-      arrowColumnVectors[i] =
-          IcebergArrowColumnVector.forHolder(vectorHolders[i], 
numRowsInVector);
+
+      if (rowIdMapping == null) {
+        arrowColumnVectors[i] = 
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+      } else {
+        int[] rowIdMap = rowIdMapping.first();
+        Integer numRows = rowIdMapping.second();
+        arrowColumnVectors[i] = 
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+      }
     }
+
+    rowStartPosInBatch += numRowsToRead;
     ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
-    batch.setNumRows(numRowsToRead);
+
+    if (rowIdMapping == null) {
+      batch.setNumRows(numRowsToRead);
+    } else {
+      Integer numRows = rowIdMapping.second();
+      batch.setNumRows(numRows);
+    }
     return batch;
   }
+
+  private Pair<int[], Integer> rowIdMapping(int numRows) {
+    if (deletes != null && deletes.hasPosDeletes()) {
+      return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Build a row id mapping inside a batch, which skips delete rows. For 
example, if the 1st and 3rd rows are deleted in
+   * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new 
num of rows is 3.
+   * @param deletedRowPositions a set of deleted row positions
+   * @param numRows the num of rows
+   * @return the mapping array and the new num of rows in a batch, null if no 
row is deleted
+   */
+  private Pair<int[], Integer> buildRowIdMapping(Roaring64Bitmap 
deletedRowPositions, int numRows) {

Review comment:
       Here is the summary of these two approaches:
   1. To save the mapping in an Array with initialization before hand. It may 
use more memory, and need more cycles to initialization, which needs to go 
through each row position in a batch, but fast at the accessing the vector, a 
plain random memory access.
   2. To save the mapping in a tree-like structure, which saves memory, and 
need less time for initialization, but more cycles for accessing the vector, 
for each method `getXXX`, it traverses the tree and calculates the new rowId.
   
   I feel like the second approach will use less memory, but may be a bit 
slower, will be more slower if the vector is visited multiple times. But i'm 
not sure without benchmark. It may be just minor differences.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to