aokolnychyi commented on a change in pull request #3533:
URL: https://github.com/apache/iceberg/pull/3533#discussion_r747722269



##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -407,26 +412,36 @@ public void setBatchSize(int batchSize) {
   }
 
   private static final class PositionVectorReader extends 
VectorizedArrowReader {
+    private final Field arrowField = 
ArrowSchemaUtil.convert(MetadataColumns.ROW_POSITION);
+    private final BufferAllocator bufferAllocator = 
ArrowAllocation.rootAllocator();
+    private final boolean setArrowValidityVector;
     private long rowStart;
+    private int batchSize;
+    private FieldVector vec;
     private NullabilityHolder nulls;
 
+    PositionVectorReader(boolean setArrowValidityVector) {
+      this.setArrowValidityVector = setArrowValidityVector;
+    }
+
     @Override
     public VectorHolder read(VectorHolder reuse, int numValsToRead) {
-      Field arrowField = ArrowSchemaUtil.convert(MetadataColumns.ROW_POSITION);
-      FieldVector vec = 
arrowField.createVector(ArrowAllocation.rootAllocator());
-
-      if (reuse != null) {
+      if (reuse == null) {
+        this.vec = newVector();
+        this.nulls = new NullabilityHolder(batchSize);

Review comment:
       Technically, we can always reuse the same nullability holder with no 
nulls as the column is not nullable. The existing code still calls 
`setNotNulls` for each batch.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
##########
@@ -158,6 +168,62 @@ public void testSpecAndPartitionMetadataColumns() {
         sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id", 
TABLE_NAME));
   }
 
+  @Test
+  public void testPositionMetadataColumnWithMultipleRowGroups() throws 
NoSuchTableException {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+
+    table.updateProperties()
+        .set(PARQUET_ROW_GROUP_SIZE_BYTES, "100")
+        .commit();
+
+    List<Long> ids = Lists.newArrayList();
+    for (long id = 0L; id < 200L; id++) {
+      ids.add(id);
+    }
+    Dataset<Row> df = spark.createDataset(ids, Encoders.LONG())
+        .withColumnRenamed("value", "id")
+        .withColumn("category", lit("hr"))
+        .withColumn("data", lit("ABCDEF"));
+    df.coalesce(1).writeTo(TABLE_NAME).append();
+
+    Assert.assertEquals(200, spark.table(TABLE_NAME).count());
+
+    List<Object[]> expectedRows = ids.stream()
+        .map(this::row)
+        .collect(Collectors.toList());
+    assertEquals("Rows must match",
+        expectedRows,
+        sql("SELECT _pos FROM %s", TABLE_NAME));
+  }
+
+  @Test
+  public void testPositionMetadataColumnWithMultipleBatches() throws 
NoSuchTableException {

Review comment:
       This test would previously fail and output wrong results. I tested it 
triggers the bug.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/NullabilityHolder.java
##########
@@ -52,10 +52,12 @@ public void setNotNull(int index) {
   }
 
   public void setNulls(int startIndex, int num) {
+    // TODO: do we have to modify numNulls?

Review comment:
       I'd appreciate if someone could explain why we only modify `numNulls` in 
`setNull`.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -448,6 +469,14 @@ public String toString() {
 
     @Override
     public void setBatchSize(int batchSize) {
+      this.batchSize = (batchSize == 0) ? DEFAULT_BATCH_SIZE : batchSize;
+    }
+
+    @Override
+    public void close() {

Review comment:
       I think we did not close the vector before.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -407,26 +412,36 @@ public void setBatchSize(int batchSize) {
   }
 
   private static final class PositionVectorReader extends 
VectorizedArrowReader {
+    private final Field arrowField = 
ArrowSchemaUtil.convert(MetadataColumns.ROW_POSITION);
+    private final BufferAllocator bufferAllocator = 
ArrowAllocation.rootAllocator();
+    private final boolean setArrowValidityVector;
     private long rowStart;
+    private int batchSize;
+    private FieldVector vec;
     private NullabilityHolder nulls;
 
+    PositionVectorReader(boolean setArrowValidityVector) {
+      this.setArrowValidityVector = setArrowValidityVector;
+    }
+
     @Override
     public VectorHolder read(VectorHolder reuse, int numValsToRead) {
-      Field arrowField = ArrowSchemaUtil.convert(MetadataColumns.ROW_POSITION);
-      FieldVector vec = 
arrowField.createVector(ArrowAllocation.rootAllocator());
-
-      if (reuse != null) {
+      if (reuse == null) {
+        this.vec = newVector();
+        this.nulls = new NullabilityHolder(batchSize);
+      } else {
         vec.setValueCount(0);
         nulls.reset();
-      } else {
-        ((BigIntVector) vec).allocateNew(numValsToRead);
-        for (int i = 0; i < numValsToRead; i += 1) {
-          vec.getDataBuffer().setLong(i * Long.BYTES, rowStart + i);
-        }
-        for (int i = 0; i < numValsToRead; i += 1) {
-          BitVectorHelper.setBit(vec.getValidityBuffer(), i);
+      }
+
+      ArrowBuf dataBuffer = vec.getDataBuffer();
+      ArrowBuf validityBuffer = vec.getValidityBuffer();
+
+      for (int i = 0; i < numValsToRead; i += 1) {

Review comment:
       I switched to a single for loop and setting the Arrow validity vector 
only if required.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -407,26 +412,36 @@ public void setBatchSize(int batchSize) {
   }
 
   private static final class PositionVectorReader extends 
VectorizedArrowReader {

Review comment:
       We don't have to extend `VectorizedArrowReader` as this class has 
nothing in common but there are places in code that do a cast and it would fail 
otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to