Reo-LEI commented on code in PR #4539:
URL: https://github.com/apache/iceberg/pull/4539#discussion_r856860633
##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -73,6 +74,16 @@ public static <T> CloseableIterable<T>
filter(CloseableIterable<T> rows, Functio
return filter.filter(rows);
}
+ public static <T> CloseableIterable<T> marker(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
+ PositionDeleteIndex deleteSet,
Consumer<T> markDeleted) {
+ if (deleteSet.isEmpty()) {
+ return rows;
+ }
+
+ PositionSetDeleteMarker<T> deleteMarker = new
PositionSetDeleteMarker<>(rowToPosition, deleteSet, markDeleted);
+ return deleteMarker.filter(rows);
Review Comment:
Or maybe we can add a new interface called `Marker` to marker these rows. I
think it will be easier to understand than using `CloseableIterable`.
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -185,14 +203,35 @@ private CloseableIterable<T>
applyEqDeletes(CloseableIterable<T> records) {
.reduce(Predicate::and)
.orElse(t -> true);
- Filter<T> remainingRowsFilter = new Filter<T>() {
+ Filter<T> remainingRowsFilter = hasMetadataColumnIsDeleted ?
getMarker(remainingRows) : getFilter(remainingRows);
+
+ return remainingRowsFilter.filter(records);
+ }
+
+ private Filter<T> getFilter(Predicate<T> remainingRows) {
+ return new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return remainingRows.test(item);
}
};
+ }
- return remainingRowsFilter.filter(records);
+ private Filter<T> getMarker(Predicate<T> remainingRows) {
+ return new Filter<T>() {
+ @Override
+ protected boolean shouldKeep(T item) {
+ if (!remainingRows.test(item)) {
+ // if the row is deleted, set the metadata column to true
+ markRowDeleted(item);
+ }
+ return true; // keep the row even if it is deleted by equality
deletions
+ }
+ };
+ }
+
+ protected void markRowDeleted(T item) {
Review Comment:
+1 for use abstract method and force impl this. And I think the changes of
`Deletes` and the `DeleteFilter` should implement in other separated PR,
because implement read cdc for Flink will need this changes too.
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -185,14 +204,35 @@ private CloseableIterable<T>
applyEqDeletes(CloseableIterable<T> records) {
.reduce(Predicate::and)
.orElse(t -> true);
- Filter<T> remainingRowsFilter = new Filter<T>() {
+ Filter<T> remainingRowsFilter = hasMetadataColumnIsDeleted ?
getMarker(remainingRows) : getFilter(remainingRows);
Review Comment:
Maybe we shouldn't use this implicit way to decide whether to keep deleted
records. We can add the `_delete` column by default and use different methods
to decide whether to keep the deleted records, and then use different methods
to get the required data in different situations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]