flyrain commented on code in PR #4888:
URL: https://github.com/apache/iceberg/pull/4888#discussion_r927887360
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -66,35 +78,50 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
- ColumnBatchLoader batchLoader = new ColumnBatchLoader(numRowsToRead);
+ ColumnarBatch columnarBatch = new
ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
rowStartPosInBatch += numRowsToRead;
- return batchLoader.columnarBatch;
+ return columnarBatch;
}
private class ColumnBatchLoader {
- private int[] rowIdMapping; // the rowId mapping to skip deleted rows for
all column vectors inside a batch
- private int numRows;
- private ColumnarBatch columnarBatch;
+ private final int numRowsToRead;
+ // the rowId mapping to skip deleted rows for all column vectors inside a
batch, it is null when there is no deletes
+ private int[] rowIdMapping;
+ // the array to indicate if a row is deleted or not, it is null when there
is no "_deleted" metadata column
+ private boolean[] isDeleted;
ColumnBatchLoader(int numRowsToRead) {
- initRowIdMapping(numRowsToRead);
- loadDataToColumnBatch(numRowsToRead);
+ Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows
to read: %s", numRowsToRead);
+ this.numRowsToRead = numRowsToRead;
+ if (hasIsDeletedColumn) {
+ isDeleted = new boolean[numRowsToRead];
+ }
}
- ColumnarBatch loadDataToColumnBatch(int numRowsToRead) {
- Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows
to read: %s", numRowsToRead);
- ColumnVector[] arrowColumnVectors =
readDataToColumnVectors(numRowsToRead);
+ ColumnarBatch loadDataToColumnBatch() {
+ int numRowsUndeleted = initRowIdMapping();
+
+ ColumnVector[] arrowColumnVectors = readDataToColumnVectors();
- columnarBatch = new ColumnarBatch(arrowColumnVectors);
- columnarBatch.setNumRows(numRows);
+ ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
+ newColumnarBatch.setNumRows(numRowsUndeleted);
if (hasEqDeletes()) {
- applyEqDelete();
+ applyEqDelete(newColumnarBatch);
}
- return columnarBatch;
+
+ if (hasIsDeletedColumn && rowIdMapping != null) {
+ // reset the row id mapping array, so that it doesn't filter out the
deleted rows
+ for (int i = 0; i < numRowsToRead; i++) {
+ rowIdMapping[i] = i;
Review Comment:
That's a good question. In short, I'm using row ID mapping to improve eq
deletes perf when we have both pos deletes and eq deletes. I think it is worth
to do that since applying eq deletes is expensive, it has to go row by row.
Here is an example, after the pos deletes, we will only need to iterate 6 rows
instead of 8 rows for applying eq delete.
```
* Filter out the equality deleted rows. Here is an example,
* [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array
* [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array
* Position delete 2, 6
* [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records
to 6]
* [F,F,T,F,F,F,T,F] -- After applying position deletes
* Equality delete 1 <= x <= 3
* [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records
to 4]
* [F,T,T,T,F,F,T,F] -- After applying equality deletes
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]