singhpk234 commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r863504754
##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java:
##########
@@ -104,6 +104,10 @@ public static <T> VectorHolder constantHolder(int numRows,
T constantValue) {
return new ConstantVectorHolder(numRows, constantValue);
}
+ public static <T> VectorHolder deleteMetaColumnHolder(int numRows) {
+ return new DeletedVectorHolder(numRows);
+ }
Review Comment:
[question] should we rename this to `IsDeletedMetaColumnHolder` and
`IsDeletedVectorHolder` considering the metaDataColumn it is holding is named
as `IsDeleted` ?
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -94,6 +96,23 @@ protected DeleteFilter(String filePath, List<DeleteFile>
deletes, Schema tableSc
this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema,
posDeletes, eqDeletes);
this.posAccessor =
requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+ this.hasColumnIsDeleted =
requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
+ this.columnIsDeletedIndex = getIndexOfIsDeletedMetadataColumn();
+ }
+
+ private int getIndexOfIsDeletedMetadataColumn() {
+ List<Types.NestedField> icebergFields = requiredSchema.asStruct().fields();
+ for (int i = 0; i < icebergFields.size(); i++) {
+ if (icebergFields.get(i).fieldId() ==
MetadataColumns.IS_DELETED.fieldId()) {
+ return i;
+ }
+ }
+
+ return -1;
Review Comment:
[question] should we add this helper (which now takes fieldId and returns
it's column position) in
[Schema](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/Schema.java),
Your thoughts ?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java:
##########
@@ -195,5 +195,10 @@ protected StructLike asStructLike(InternalRow row) {
protected InputFile getInputFile(String location) {
return RowDataReader.this.getInputFile(location);
}
+
+ @Override
+ protected void markRowDeleted(InternalRow row) {
+ row.setBoolean(columnIsDeletedIndex(), true);
+ }
Review Comment:
[question] should we add a precondition here that `columnIsDeletedIndex`
should not be -1 ? Your thoughts
presently we have made sure we will invoke markRowDelete only when
hasColumnIsDeleted is true implying `columnIsDeletedIndex` can't be ideally -1.
##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -227,6 +251,37 @@ public void close() {
}
}
+ private static class PositionStreamDeleteMarker<T> extends
PositionStreamDeleteFilter<T> {
+ private final Consumer<T> markDeleted;
+
+ private PositionStreamDeleteMarker(CloseableIterable<T> rows, Function<T,
Long> extractPos,
+ CloseableIterable<Long>
deletePositions, Consumer<T> markDeleted) {
+ super(rows, extractPos, deletePositions);
+ this.markDeleted = markDeleted;
+ }
+
+ @Override
+ protected PositionFilterIterator
createPosDeleteIterator(CloseableIterator<T> items,
+
CloseableIterator<Long> deletePosIterator) {
+ return new PositionDeleteMarkerIterator(items, deletePosIterator);
+ }
+
+ private class PositionDeleteMarkerIterator extends PositionFilterIterator {
+ private PositionDeleteMarkerIterator(CloseableIterator<T> items,
CloseableIterator<Long> deletePositions) {
+ super(items, deletePositions);
+ }
+
+ @Override
+ protected boolean shouldKeep(T row) {
+ boolean isDeleted = !super.shouldKeep(row);
+ if (isDeleted) {
+ markDeleted.accept(row);
+ }
+ return true;
Review Comment:
[question] we will return shouldKeep as true even when isDeleted is true, as
per my understanding the intention here is to always send all rows
[CP](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/io/FilterIterator.java#L67-L70),
should we add a comment , or is it self explanatory ? Your thoughts
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -94,6 +96,23 @@ protected DeleteFilter(String filePath, List<DeleteFile>
deletes, Schema tableSc
this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema,
posDeletes, eqDeletes);
this.posAccessor =
requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+ this.hasColumnIsDeleted =
requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
+ this.columnIsDeletedIndex = getIndexOfIsDeletedMetadataColumn();
+ }
+
+ private int getIndexOfIsDeletedMetadataColumn() {
+ List<Types.NestedField> icebergFields = requiredSchema.asStruct().fields();
Review Comment:
[minor]
```suggestion
List<Types.NestedField> icebergFields = requiredSchema.columns();
```
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -80,9 +88,9 @@ public TestSparkReaderDeletes(boolean vectorized) {
@Parameterized.Parameters(name = "vectorized = {0}")
public static Object[][] parameters() {
- return new Object[][] {
- new Object[] {false},
- new Object[] {true}
+ return new Object[][]{
+ new Object[]{false},
+ new Object[]{true}
Review Comment:
can revert it back if it's un-intentional
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]