flyrain commented on code in PR #4588:
URL: https://github.com/apache/iceberg/pull/4588#discussion_r945957184
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -170,6 +173,7 @@ Pair<int[], Integer> posDelRowIdMapping() {
* @return the mapping array and the new num of rows in a batch, null if
no row is deleted
*/
Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex
deletedRowPositions) {
+ LOG.debug("Building row id mapping from positional deletes");
Review Comment:
Nit: is the log necessary? There would be a lot of messages in a real use
case when debug level is on. I doubt how useful it is.
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -130,7 +140,8 @@ protected Table createTable(String name, Schema schema,
PartitionSpec spec) {
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
- if (vectorized) {
+ table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT,
format).commit();
+ if (format.equals("parquet") && vectorized) {
Review Comment:
This is all about parquet format. We may do this
```
if (format.equals("parquet")) {
if (vectorized) {
table
.updateProperties()
.set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
.set(
TableProperties.PARQUET_BATCH_SIZE,
"4") // split 7 records to two batches to cover more code paths
.commit();
} else {
table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED,
"false").commit();
}
}
```
Also since we added orc and avro, we may set the vectorized property
explicitly like we did for `parquet` other than using the default value. The
default value may change in the future, which will break the tests.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -183,6 +187,8 @@ Pair<int[], Integer>
buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit
currentRowId++;
} else if (hasIsDeletedColumn) {
isDeleted[originalRowId] = true;
+ } else {
+ deletes.incrementDeleteCount();
}
Review Comment:
We still need the metrics in case of `_deleted` column, right?
```
else {
if (hasIsDeletedColumn) {
isDeleted[originalRowId] = true;
}
deletes.incrementDeleteCount();
}
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -217,6 +223,7 @@ int[] initEqDeleteRowIdMapping() {
* @param columnarBatch the {@link ColumnarBatch} to apply the equality
delete
*/
void applyEqDelete(ColumnarBatch columnarBatch) {
+ LOG.debug("Applying equality deletes to row id mapping");
Review Comment:
The same comment here.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -229,6 +236,8 @@ void applyEqDelete(ColumnarBatch columnarBatch) {
currentRowId++;
} else if (hasIsDeletedColumn) {
isDeleted[rowIdMapping[rowId]] = true;
+ } else {
+ deletes.incrementDeleteCount();
Review Comment:
The same comment here.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -77,6 +78,7 @@
private final NameMapping nameMapping;
private final ScanTaskGroup<TaskT> taskGroup;
private final Iterator<TaskT> tasks;
+ private final DeleteCounter counter;
Review Comment:
+1 to put it here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]