aokolnychyi commented on a change in pull request #3287:
URL: https://github.com/apache/iceberg/pull/3287#discussion_r736707518
##########
File path:
spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +68,71 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
+ Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] =
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ int[] rowIdMap = rowIdMapping.first();
+ Integer numRows = rowIdMapping.second();
+ arrowColumnVectors[i] =
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+ }
}
+
+ rowStartPosInBatch += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ Integer numRows = rowIdMapping.second();
+ batch.setNumRows(numRows);
+ }
return batch;
}
+
+ private Pair<int[], Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Build a row id mapping inside a batch, which skips delete rows. For
example, if the 1st and 3rd rows are deleted in
+ * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new
num of rows is 3.
+ * @param deletedRowPositions a set of deleted row positions
+ * @param numRows the num of rows
+ * @return the mapping array and the new num of rows in a batch, null if all
rows are deleted
+ */
+ private Pair<int[], Integer> buildRowIdMapping(Roaring64Bitmap
deletedRowPositions, int numRows) {
+ if (deletedRowPositions == null) {
+ return null;
+ }
+ int[] rowIdMapping = new int[numRows];
+ int originalRowId = 0;
+ int currentRowId = 0;
+ while (originalRowId < numRows) {
+ if (!deletedRowPositions.contains(originalRowId + rowStartPosInBatch)) {
+ rowIdMapping[currentRowId] = originalRowId;
+ currentRowId++;
+ }
+ originalRowId++;
+ }
+
+ if (currentRowId == 0) {
Review comment:
When will `currentRowId == 0` be true? When all elements in the batch
are deleted? If so, is it actually correct to return null here? We will use the
regular vector without the filter and will report the original rows that were
supposed to be deleted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]