aokolnychyi commented on a change in pull request #3287:
URL: https://github.com/apache/iceberg/pull/3287#discussion_r729937426
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
##########
@@ -33,6 +33,7 @@
public abstract class BaseBatchReader<T> implements VectorizedReader<T> {
protected final VectorizedArrowReader[] readers;
protected final VectorHolder[] vectorHolders;
+ protected long rowStartPos = 0;
Review comment:
nit: can we make the name a little bit more descriptive? Is it the row
group starting position?
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
expectedSchema, fileSchema, setArrowValidityVector,
idToConstant, ColumnarBatchReader::new));
}
+
+ public static ColumnarBatchReader buildReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ DeleteFilter deleteFilter) {
+ return (ColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector,
+ idToConstant, ColumnarBatchReader::new, deleteFilter));
+ }
+
+ private static class ReaderBuilder extends VectorizedReaderBuilder {
+ private final DeleteFilter deleteFilter;
Review comment:
nit: raw use of a parameterized class
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
##########
@@ -44,6 +45,7 @@ protected BaseBatchReader(List<VectorizedReader<?>> readers) {
@Override
public final void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData,
long rowPosition) {
+ rowStartPos = rowPosition;
Review comment:
nit: `this.rowStartPos` to indicate we are setting an instance field.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +57,58 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
+ Map<Integer, Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] =
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ arrowColumnVectors[i] =
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping,
numRowsInVector);
+ }
}
+
+ rowStartPos += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ batch.setNumRows(rowIdMapping.size());
+ }
return batch;
}
+
+ private Map<Integer, Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ Set<Long> deletedRows = deletes.posDeletedRowIds();
+ if (deletedRows != null) {
+ Set<Integer> deletedRowIds = Sets.newHashSet();
+ deletedRows.stream().forEach(x -> deletedRowIds.add((int) (x -
rowStartPos)));
+ return buildRowIdMapping(deletedRowIds, numRows);
+ }
+ }
+
+ return null;
+ }
+
+ private Map<Integer, Integer> buildRowIdMapping(Set<Integer> deletedRowIds,
int numRows) {
+ final Map<Integer, Integer> rowIdMapping = Maps.newHashMap();
+ int realRowId = 0;
Review comment:
It is a bit unclear what real and current mean in this case. Could you
add a comment to this method?
I suppose real id is the the original position?
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -38,24 +40,28 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final Schema expectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
private final int batchSize;
+ private final Schema tableSchema;
BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive, int size) {
super(table, task);
this.expectedSchema = expectedSchema;
this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
this.batchSize = size;
+ tableSchema = table.schema();
Review comment:
Or you could even expose a protected `Table table()` method from the
parent and call `table().schema()` where needed.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +57,58 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
+ Map<Integer, Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] =
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ arrowColumnVectors[i] =
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping,
numRowsInVector);
+ }
}
+
+ rowStartPos += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ batch.setNumRows(rowIdMapping.size());
+ }
return batch;
}
+
+ private Map<Integer, Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ Set<Long> deletedRows = deletes.posDeletedRowIds();
+ if (deletedRows != null) {
+ Set<Integer> deletedRowIds = Sets.newHashSet();
+ deletedRows.stream().forEach(x -> deletedRowIds.add((int) (x -
rowStartPos)));
Review comment:
Does this mean we will have to shift all positions for every single
batch? I think that will impact the performance greatly if we have a large
number of deletes.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -33,11 +38,16 @@
* {@linkplain VectorizedArrowReader VectorReader(s)}.
*/
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
+ private DeleteFilter deletes = null;
public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
super(readers);
}
+ public void setDeleteFilter(DeleteFilter deleteFilter) {
+ this.deletes = deleteFilter;
Review comment:
Add some validation the delete filter contains only position deletes for
now?
If we get a filter with equality deletes by this time, something went wrong.
I'd add exception to be safe.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -38,24 +40,28 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final Schema expectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
private final int batchSize;
+ private final Schema tableSchema;
BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive, int size) {
super(table, task);
this.expectedSchema = expectedSchema;
this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
this.batchSize = size;
+ tableSchema = table.schema();
Review comment:
nit: `this.tableSchema = ...`
##########
File path: core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
##########
@@ -37,6 +37,15 @@ public static boolean hasDeletes(CombinedScanTask task) {
return task.files().stream().anyMatch(TableScanUtil::hasDeletes);
}
+ /**
+ * This is temporarily introduced since we plan to support pos-delete
vectorized read first, then get to the
+ * equality-delete support. We will remove this method once both are
supported.
+ */
+ public static boolean hasEqDeletes(CombinedScanTask task) {
+ return task.files().stream().anyMatch(
+ t -> t.deletes().stream().anyMatch(c ->
c.content().equals(org.apache.iceberg.FileContent.EQUALITY_DELETES)));
Review comment:
nit: can we import `FileContent` instead using a fully qualified name?
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
expectedSchema, fileSchema, setArrowValidityVector,
idToConstant, ColumnarBatchReader::new));
}
+
+ public static ColumnarBatchReader buildReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ DeleteFilter deleteFilter) {
+ return (ColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector,
+ idToConstant, ColumnarBatchReader::new, deleteFilter));
+ }
+
+ private static class ReaderBuilder extends VectorizedReaderBuilder {
Review comment:
How common is the new functionality? Would it make sense to modify
`VectorizedReaderBuilder` instead of extending it here?
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -33,11 +38,16 @@
* {@linkplain VectorizedArrowReader VectorReader(s)}.
*/
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
+ private DeleteFilter deletes = null;
Review comment:
nit: raw use of a parameterized class
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -71,11 +77,14 @@
InputFile location = getInputFile(task);
Preconditions.checkNotNull(location, "Could not find InputFile associated
with FileScanTask");
if (task.file().format() == FileFormat.PARQUET) {
+ DeleteFilter deleteFilter = deleteFilter(task);
Review comment:
nit: usage of raw type
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +57,58 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
+ Map<Integer, Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] =
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ arrowColumnVectors[i] =
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMapping,
numRowsInVector);
+ }
}
+
+ rowStartPos += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ batch.setNumRows(rowIdMapping.size());
+ }
return batch;
}
+
+ private Map<Integer, Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ Set<Long> deletedRows = deletes.posDeletedRowIds();
+ if (deletedRows != null) {
+ Set<Integer> deletedRowIds = Sets.newHashSet();
+ deletedRows.stream().forEach(x -> deletedRowIds.add((int) (x -
rowStartPos)));
+ return buildRowIdMapping(deletedRowIds, numRows);
+ }
+ }
+
+ return null;
+ }
+
+ private Map<Integer, Integer> buildRowIdMapping(Set<Integer> deletedRowIds,
int numRows) {
+ final Map<Integer, Integer> rowIdMapping = Maps.newHashMap();
Review comment:
I would consider using a direct access table (i.e. an array) instead of
a map as a dictionary here. We will save both space and time as there will be
no extra wrappers for map entries, no calls to hash and equals, no boxing. We
will simply use a single JVM bytecode instruction to load a position from an
array.
##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -183,6 +189,18 @@ protected boolean shouldKeep(T item) {
return remainingRowsFilter.filter(records);
}
+ public Set<Long> posDeletedRowIds() {
Review comment:
As far as I see, `ColumnarBatchReader` will be checking if a particular
position has been deleted for every position by probing this set. Using
anything other than the set will make the current implementation extremely slow.
Large deletes are a valid concern. I wonder whether we should not enable
vectorization in such cases at all.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -114,4 +123,27 @@
}
return iter.iterator();
}
+
+ private SparkDeleteFilter deleteFilter(FileScanTask task) {
+ return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task,
tableSchema, expectedSchema);
+ }
+
+ public class SparkDeleteFilter extends DeleteFilter<InternalRow> {
Review comment:
Why public?
##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -183,6 +189,18 @@ protected boolean shouldKeep(T item) {
return remainingRowsFilter.filter(records);
}
+ public Set<Long> posDeletedRowIds() {
Review comment:
Can we use a bitmap here? We just need to know whether a particular bit
is set.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
expectedSchema, fileSchema, setArrowValidityVector,
idToConstant, ColumnarBatchReader::new));
}
+
+ public static ColumnarBatchReader buildReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ DeleteFilter deleteFilter) {
+ return (ColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector,
+ idToConstant, ColumnarBatchReader::new, deleteFilter));
+ }
+
+ private static class ReaderBuilder extends VectorizedReaderBuilder {
Review comment:
If needed, we can probably extend `VectroizedReader` with
`setDeleteFilter`?
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -38,24 +40,28 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final Schema expectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
private final int batchSize;
+ private final Schema tableSchema;
BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive, int size) {
super(table, task);
this.expectedSchema = expectedSchema;
this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
this.batchSize = size;
+ tableSchema = table.schema();
Review comment:
I'd probably define it as the first field too, right above
`expectedSchema` so that they are together.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.Map;
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class ColumnVectorWithFilter extends IcebergArrowColumnVector {
+ private Map<Integer, Integer> rowIdMapping;
Review comment:
I'd consider replacing this with a direct access table too for the same
reasons described above.
##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -183,6 +189,18 @@ protected boolean shouldKeep(T item) {
return remainingRowsFilter.filter(records);
}
+ public Set<Long> posDeletedRowIds() {
Review comment:
Yeah, I think a bitmap of deleted positions would work fine here and we
will be able to support vectorization in all cases.
##########
File path:
spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataDeleteBenchmark.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.IcebergSourceBenchmark;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+/**
+ * A benchmark that evaluates the non-vectorized read and vectorized read with
pos-delete in the Spark data source for
+ * Iceberg. 5% of rows are deleted in each data file.
+ * <p>
+ * This class uses a dataset with a flat schema.
+ * To run this benchmark for either spark-2 or spark-3:
+ * <code>
+ * ./gradlew :iceberg-spark[2|3]:jmh
-PjmhIncludeRegex=IcebergSourceFlatParquetDataDeleteBenchmark
+ *
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-delete-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceFlatParquetDataDeleteBenchmark extends
IcebergSourceBenchmark {
+
+ private static final int NUM_FILES = 500;
Review comment:
I'd say we should focus on a smaller number of files with way larger
number of records.
It would be great to see numbers for these use cases:
- Deletes vs no deletes (non-vectorized, 5%, 25% deleted records)
- Vectorized vs non-vectorized deletes (5%, 25%)
- Equality vs position delete performance (non-vectorized)
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
##########
@@ -33,6 +33,7 @@
public abstract class BaseBatchReader<T> implements VectorizedReader<T> {
protected final VectorizedArrowReader[] readers;
protected final VectorHolder[] vectorHolders;
+ protected long rowStartPos = 0;
Review comment:
Aahh, it is just initialized in `setRowGroupInfo`. It it a little bit
confusing that this var is defined here but actually modified in
`ColumnarBatchReader` in Spark. I think we either need to move it to
`ColumnarBatchReader` or support in all children of `BaseBatchReader`.
Otherwise, `ArrowBatchReader` has access to it but the value is invalid.
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +53,44 @@ public static ColumnarBatchReader buildReader(
expectedSchema, fileSchema, setArrowValidityVector,
idToConstant, ColumnarBatchReader::new));
}
+
+ public static ColumnarBatchReader buildReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ DeleteFilter deleteFilter) {
+ return (ColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector,
+ idToConstant, ColumnarBatchReader::new, deleteFilter));
+ }
+
+ private static class ReaderBuilder extends VectorizedReaderBuilder {
Review comment:
We could move `DeleteFilter` to `core`. But it's minor. Let's not worry
about it now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]