flyrain commented on code in PR #4870: URL: https://github.com/apache/iceberg/pull/4870#discussion_r894014725
########## api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * A scan task for deleted data records generated by adding delete files to the table. + */ +public interface DeletedRowsScanTask extends ChangelogScanTask { Review Comment: Think a bit more. We have 2 options. Option 1. Reader side change, based on what Anton proposed. 1. Need changes for deletes() in FileScanTask to return a list of historical delete files as well as current snapshot's delete files. 2. Output only if rows are deleted in the current deletes are not in the previous ones. 3. Need changes from the readers. Actually it needs a special reader only for this purpose, which return the deleted rows in this snapshot but not in the previous snapshots. This part of logic cannot be reused by `_deleted` metadata column. For example, a change like this: ``` public static <T> CloseableIterable<T> markDeleted(CloseableIterable<T> rows, Predicate<T> isDeleted, Consumer<T> deleteMarker) { return CloseableIterable.transform(rows, row -> { if (isDeleted.test(row) && isNotInHistoricalDelete.test(row)) { deleteMarker.accept(row); } return row; }); } ``` Option 2. Use the Spark dataframe to deduplicate 1. Read previous deleted rows into a dataframe df1, read the current deleted rows into a datadrame df2 2. `df2.exceptAll(df1)` to deduplicate 3. we need a new scan task for previous deletes, maybe just a minor change of the current DeletedRowsScanTask. For example, for data file f1, s1 added delete file d1, and s2 added delete file d2, and s3 added delete file d3, assume s3 is the current snapshot the scan task for previous deletes should has f1, d1, d2. 4. We need to output pos and data file name to identify the duplication. Option 1 needs changes from reader, but it should performance better than option 2. Option 2 need less code change for scan task and reader, but it may not perform well, and cannot be used across engines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
