aokolnychyi commented on code in PR #4888:
URL: https://github.com/apache/iceberg/pull/4888#discussion_r927231315
##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java:
##########
@@ -517,5 +517,32 @@ public void setBatchSize(int batchSize) {
}
}
+ /**
+ * A Dummy Vector Reader which doesn't actually read files, instead it
returns an
+ * IsDeleted Vector Holder which indicates whether a given row is deleted.
+ */
+ public static class DeletedVectorReader extends VectorizedArrowReader {
Review Comment:
nit: The naming seems inconsistent as we have `IsDeletedVectorHolder`.
##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java:
##########
@@ -104,6 +104,10 @@ public static <T> VectorHolder constantHolder(int numRows,
T constantValue) {
return new ConstantVectorHolder(numRows, constantValue);
}
+ public static <T> VectorHolder isDeletedHolder(int numRows) {
Review Comment:
Why do we have <T> here? Is it actually being used?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -66,35 +78,50 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
- ColumnBatchLoader batchLoader = new ColumnBatchLoader(numRowsToRead);
+ ColumnarBatch columnarBatch = new
ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
rowStartPosInBatch += numRowsToRead;
- return batchLoader.columnarBatch;
+ return columnarBatch;
}
private class ColumnBatchLoader {
- private int[] rowIdMapping; // the rowId mapping to skip deleted rows for
all column vectors inside a batch
- private int numRows;
- private ColumnarBatch columnarBatch;
+ private final int numRowsToRead;
+ // the rowId mapping to skip deleted rows for all column vectors inside a
batch, it is null when there is no deletes
+ private int[] rowIdMapping;
+ // the array to indicate if a row is deleted or not, it is null when there
is no "_deleted" metadata column
+ private boolean[] isDeleted;
ColumnBatchLoader(int numRowsToRead) {
- initRowIdMapping(numRowsToRead);
- loadDataToColumnBatch(numRowsToRead);
+ Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows
to read: %s", numRowsToRead);
+ this.numRowsToRead = numRowsToRead;
+ if (hasIsDeletedColumn) {
+ isDeleted = new boolean[numRowsToRead];
+ }
}
- ColumnarBatch loadDataToColumnBatch(int numRowsToRead) {
- Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows
to read: %s", numRowsToRead);
- ColumnVector[] arrowColumnVectors =
readDataToColumnVectors(numRowsToRead);
+ ColumnarBatch loadDataToColumnBatch() {
+ int numRowsUndeleted = initRowIdMapping();
+
+ ColumnVector[] arrowColumnVectors = readDataToColumnVectors();
- columnarBatch = new ColumnarBatch(arrowColumnVectors);
- columnarBatch.setNumRows(numRows);
+ ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
+ newColumnarBatch.setNumRows(numRowsUndeleted);
if (hasEqDeletes()) {
- applyEqDelete();
+ applyEqDelete(newColumnarBatch);
}
- return columnarBatch;
+
+ if (hasIsDeletedColumn && rowIdMapping != null) {
+ // reset the row id mapping array, so that it doesn't filter out the
deleted rows
+ for (int i = 0; i < numRowsToRead; i++) {
+ rowIdMapping[i] = i;
Review Comment:
Question: do we have to populate the row ID mapping initially if we know we
have `_deleted` metadata column?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -44,9 +44,21 @@
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
+ private final boolean hasIsDeletedColumn;
Review Comment:
nit: Final vars should be defined before mutable ones
##########
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java:
##########
@@ -80,50 +81,79 @@ public void tearDownBenchmark() throws IOException {
@Benchmark
@Threads(1)
- public void readIceberg() {
+ public void readIceberg(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 *
1024));
+ tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED,
"false");
Review Comment:
nit: Let's add a static import like we have for `SPLIT_OPEN_FILE_COST` for
consistency
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -44,9 +44,21 @@
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
+ private final boolean hasIsDeletedColumn;
public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
super(readers);
+ this.hasIsDeletedColumn = hasDeletedVectorReader(readers);
+ }
+
+ private boolean hasDeletedVectorReader(List<VectorizedReader<?>> readers) {
Review Comment:
nit: Can this be simplified like this? I'd also prefer a direct import of
`DeletedVectorReader` to shorten the line.
```
this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader
instanceof DeletedVectorReader);
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -105,35 +132,31 @@ ColumnVector[] readDataToColumnVectors(int numRowsToRead)
{
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] = hasDeletes() ?
- ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping,
numRows) :
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+ arrowColumnVectors[i] = new ColumnVectorBuilder(vectorHolders[i],
numRowsInVector)
Review Comment:
Do we have to construct a column vector builder for every column? What about
having a constructor accepting the row ID mapping and is deleted array and
making `build(VectorHolder holder, int numRows)`? That way you can init the
builder outside of the for loop and call build inside the loop for a particular
`vectorHolder`.
```
ColumnVectorBuilder columnVectorBuilder = new
ColumnVectorBuilder(rowIdMapping, isDeleted);
for (int i = 0; i < readers.length; i += 1) {
...
arrowColumnVectors[i] = columnVectorBuilder.build(vectorHolders[i],
numRowsInVector);
}
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.IsDeletedVectorHolder;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.vectorized.ColumnVector;
+
+public class ColumnVectorBuilder {
Review Comment:
Do this class and its constructors/methods have to be public?
##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java:
##########
@@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns {
private static final Schema PROJECTION_SCHEMA = new Schema(
required(100, "id", Types.LongType.get()),
required(101, "data", Types.StringType.get()),
- MetadataColumns.ROW_POSITION,
Review Comment:
Why is this change needed?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.IsDeletedVectorHolder;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.vectorized.ColumnVector;
+
+public class ColumnVectorBuilder {
+ private final VectorHolder holder;
+ private final int numRows;
+
+ private boolean[] isDeleted;
+ private int[] rowIdMapping;
+
+ public ColumnVectorBuilder(VectorHolder holder, int numRows) {
+ this.holder = holder;
+ this.numRows = numRows;
+ }
+
+ public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray,
boolean[] isDeletedArray) {
+ this.rowIdMapping = rowIdMappingArray;
+ this.isDeleted = isDeletedArray;
+ return this;
+ }
+
+ public ColumnVector build() {
+ if (holder.isDummy()) {
+ if (holder instanceof IsDeletedVectorHolder) {
+ return new DeletedMetaColumnVector(Types.BooleanType.get(), isDeleted);
+ } else if (holder instanceof ConstantVectorHolder) {
+ return new ConstantColumnVector(Types.IntegerType.get(), numRows,
+ ((ConstantVectorHolder) holder).getConstant());
Review Comment:
nit: `ConstantVectorHolder` -> `ConstantVectorHolder<?>`.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedMetaColumnVector.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class DeletedMetaColumnVector extends ColumnVector {
Review Comment:
The naming in new classes is a bit inconsistent. Can we align that?
```
IsDeletedVectorHolder
DeletedMetaColumnVector
DeletedVectorReader
```
##########
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java:
##########
@@ -80,50 +81,79 @@ public void tearDownBenchmark() throws IOException {
@Benchmark
@Threads(1)
- public void readIceberg() {
+ public void readIceberg(Blackhole blackhole) {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 *
1024));
+ tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED,
"false");
Review Comment:
Applies to all places in this class.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.IsDeletedVectorHolder;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.vectorized.ColumnVector;
+
+public class ColumnVectorBuilder {
+ private final VectorHolder holder;
+ private final int numRows;
+
+ private boolean[] isDeleted;
+ private int[] rowIdMapping;
+
+ public ColumnVectorBuilder(VectorHolder holder, int numRows) {
+ this.holder = holder;
+ this.numRows = numRows;
+ }
+
+ public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray,
boolean[] isDeletedArray) {
+ this.rowIdMapping = rowIdMappingArray;
+ this.isDeleted = isDeletedArray;
+ return this;
+ }
+
+ public ColumnVector build() {
+ if (holder.isDummy()) {
+ if (holder instanceof IsDeletedVectorHolder) {
+ return new DeletedMetaColumnVector(Types.BooleanType.get(), isDeleted);
+ } else if (holder instanceof ConstantVectorHolder) {
+ return new ConstantColumnVector(Types.IntegerType.get(), numRows,
+ ((ConstantVectorHolder) holder).getConstant());
Review Comment:
nit: I think this should fit on a single line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]