flyrain commented on a change in pull request #3287:
URL: https://github.com/apache/iceberg/pull/3287#discussion_r743072665



##########
File path: spark/v3.2/build.gradle
##########
@@ -64,8 +64,6 @@ project(':iceberg-spark:iceberg-spark-3.2') {
     compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
-      // to make sure io.netty.buffer only comes from project(':iceberg-arrow')

Review comment:
       We should not remove this. It messed up when I merge multiple commits.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +68,71 @@ public final ColumnarBatch read(ColumnarBatch reuse, int 
numRowsToRead) {
       closeVectors();
     }
 
+    Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
     for (int i = 0; i < readers.length; i += 1) {
       vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
       int numRowsInVector = vectorHolders[i].numValues();
       Preconditions.checkState(
           numRowsInVector == numRowsToRead,
           "Number of rows in the vector %s didn't match expected %s ", 
numRowsInVector,
           numRowsToRead);
-      arrowColumnVectors[i] =
-          IcebergArrowColumnVector.forHolder(vectorHolders[i], 
numRowsInVector);
+
+      if (rowIdMapping == null) {
+        arrowColumnVectors[i] = 
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+      } else {
+        int[] rowIdMap = rowIdMapping.first();
+        Integer numRows = rowIdMapping.second();
+        arrowColumnVectors[i] = 
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+      }
     }
+
+    rowStartPosInBatch += numRowsToRead;
     ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
-    batch.setNumRows(numRowsToRead);
+
+    if (rowIdMapping == null) {
+      batch.setNumRows(numRowsToRead);
+    } else {
+      Integer numRows = rowIdMapping.second();
+      batch.setNumRows(numRows);
+    }
     return batch;
   }
+
+  private Pair<int[], Integer> rowIdMapping(int numRows) {
+    if (deletes != null && deletes.hasPosDeletes()) {
+      return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Build a row id mapping inside a batch, which skips delete rows. For 
example, if the 1st and 3rd rows are deleted in
+   * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new 
num of rows is 3.
+   * @param deletedRowPositions a set of deleted row positions
+   * @param numRows the num of rows
+   * @return the mapping array and the new num of rows in a batch, null if no 
row is deleted
+   */
+  private Pair<int[], Integer> buildRowIdMapping(Roaring64Bitmap 
deletedRowPositions, int numRows) {
+    if (deletedRowPositions == null) {
+      return null;
+    }
+    int[] rowIdMapping = new int[numRows];
+    int originalRowId = 0;
+    int currentRowId = 0;
+    while (originalRowId < numRows) {
+      if (!deletedRowPositions.contains(originalRowId + rowStartPosInBatch)) {
+        rowIdMapping[currentRowId] = originalRowId;
+        currentRowId++;
+      }
+      originalRowId++;
+    }
+
+    if (currentRowId == numRows) {

Review comment:
       yes, will add the comment.

##########
File path: 
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
##########
@@ -187,4 +202,45 @@ protected void withTableProperties(Map<String, String> 
props, Action action) {
       restoreProperties.commit();
     }
   }
+
+  protected void writePosDeletes(CharSequence path, long numRows, double 
percentage) throws IOException {
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory = 
SparkFileWriterFactory.builderFor(table())
+            .dataFileFormat(fileFormat())

Review comment:
       This method isn't needed any more. They are moved to class 
`IcebergSourceDeleteBenchmark`. I will delete them.

##########
File path: spark/v3.0/build.gradle
##########
@@ -80,6 +80,8 @@ project(':iceberg-spark:iceberg-spark3') {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
 
+    implementation "org.roaringbitmap:RoaringBitmap"

Review comment:
       The compilation used to fail if I don't do this. But it is OK now, I 
will remove it.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
##########
@@ -61,6 +61,10 @@ public VectorizedReaderBuilder(
     this.readerFactory = readerFactory;
   }
 
+  public Function<List<VectorizedReader<?>>, VectorizedReader<?>> 
readerFactory() {

Review comment:
       Nice suggestion!

##########
File path: spark/v3.0/build.gradle
##########
@@ -80,6 +80,8 @@ project(':iceberg-spark:iceberg-spark3') {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
 
+    implementation "org.roaringbitmap:RoaringBitmap"

Review comment:
       Looks like they still fail. My local one just don't compile them, only 
3.2. I will add it to all spark 3.x versions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to