juiceyang opened a new issue, #4044:
URL: https://github.com/apache/amoro/issues/4044

   ### What happened?
   
   We have a Flink job that performs CDC synchronization from an upstream 
database to Iceberg. This job generates many equality delete files, and many of 
them are dangling delete files. When Amoro runs the clean-dangling-delete-files 
operation on this table, it fails with the following error.
   ```
   2026-01-14 02:49:39,401 WARN 
[async-dangling-delete-files-cleaning-executor-7] 
[org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer] [] - 
Failed to commit dangling delete file for table $catalog.$database.$table, but 
ignore it
   org.apache.iceberg.exceptions.ValidationException: Missing required files to 
delete: 
oss://masked-for-security-concern/data/_id_bucket_20=12/00000-0-5a087250-6137-4365-b5f9-9b04ceae2e47-305883.parquet,...dangling-delete-file-list-omitted
 
        at 
org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
 ~[iceberg-api-lilith-trino-1.6.1.1.jar:?]
        at 
org.apache.iceberg.ManifestFilterManager.validateRequiredDeletes(ManifestFilterManager.java:237)
 ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at 
org.apache.iceberg.ManifestFilterManager.filterManifests(ManifestFilterManager.java:199)
 ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at 
org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:837)
 ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:236) 
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:386) 
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) 
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) 
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) 
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) 
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at 
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:384) 
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
        at org.apache.amoro.op.MixedUpdate.commit(MixedUpdate.java:143) 
~[amoro-format-iceberg-0.8.0-incubating.jar:0.8.0-incubating]
        at 
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.clearInternalTableDanglingDeleteFiles(IcebergTableMaintainer.java:477)
 ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
        at 
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.cleanDanglingDeleteFiles(IcebergTableMaintainer.java:336)
 ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
        at 
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.cleanDanglingDeleteFiles(IcebergTableMaintainer.java:162)
 ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
        at 
org.apache.amoro.server.table.executor.DanglingDeleteFilesCleaningExecutor.execute(DanglingDeleteFilesCleaningExecutor.java:62)
 ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
        at 
org.apache.amoro.server.table.executor.BaseTableExecutor.executeTask(BaseTableExecutor.java:82)
 ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
        at 
org.apache.amoro.server.table.executor.BaseTableExecutor.lambda$null$0(BaseTableExecutor.java:72)
 ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_452]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_452]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_452]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_452]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_452]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_452]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_452]
   ```
   
   ### Affects Versions
   
   0.8.1-incubating
   
   ### What table formats are you seeing the problem on?
   
   Iceberg
   
   ### What engines are you seeing the problem on?
   
   Spark, Flink, AMS
   
   ### How to reproduce
   
   1. Create an Iceberg table using Flink (mainly to enable upsert operations 
later by leveraging the primary key).
   ```sql
   CREATE TABLE `iceberg`.`db`.`t` (
     `id` BIGINT COMMENT 'unique id',
     `data` STRING NOT NULL,
     PRIMARY KEY(`id`) NOT ENFORCED
   ) WITH (
     'format-version'='2',
     'write.upsert.enabled' = 'true',
     'identifier-fields' = '[id]'
   );
   ```
   
   2. Add a new partition column using Spark.
   ```
   ALTER TABLE iceberg.db.t ADD PARTITION FIELD bucket(5, id);
   ```
   
   3. Upsert a row in Flink. 
   At this point, Flink will create an equality delete file and a data file 
containing the same id. Both files are written under the partition path 
id_bucket_5=<some_value>.
   ```sql
   INSERT INTO iceberg.db.t /*+ OPTIONS('upsert-enabled'='true') */ values (5, 
'5');
   ```
   
   4. Modify the partition column using Spark.
   ```sql
   ALTER TABLE iceberg.guozhenyang.t REPLACE PARTITION FIELD bucket(5, id) WITH 
bucket(10, id);
   ```
   
   5. Upsert a row in Flink.
   ```sql
   INSERT INTO iceberg.db.t /*+ OPTIONS('upsert-enabled'='true') */ values (5, 
'55');
   ```
   
   6. Run clean-dangling-delete-files on this table in Amoro. 
   Restarting Amoro can trigger the clean-dangling-delete-files operation, but 
once it starts, it immediately throws an error.
   ```
   org.apache.iceberg.exceptions.ValidationException: Missing required files to 
delete: 
$warehouse_location/$database_name.db/t/data/id_bucket_10=3/00000-0-2be57c08-cf6a-46c8-a93f-529f953138be-00002.parquet
           at 
org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
           at 
org.apache.iceberg.ManifestFilterManager.validateRequiredDeletes(ManifestFilterManager.java:248)
           at 
org.apache.iceberg.ManifestFilterManager.filterManifests(ManifestFilterManager.java:200)
           at 
org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:838)
           at 
org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:226)
           at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:376)
           at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
           at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
           at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
           at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
           at 
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:374)
           ...
   ```
   
   ### Relevant log output
   
   ```shell
   
   ```
   
   ### Anything else
   
   The root cause is that in 
`org.apache.amoro.server.utils.IcebergTableUtil#getDanglingDeleteFiles`, the 
call to `org.apache.amoro.scan.TableEntriesScan#entries` returns an 
`IcebergFileEntry` whose `file()`'s `partition()` value is incorrect.
   
   The underlying issue is in the implementation of 
`org.apache.amoro.scan.TableEntriesScan#buildDeleteFile`:
   `fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME), 
StructLike.class)` returns values for all `PartitionSpec` columns, but the code 
assumes it only returns values for the columns in the current/latest 
`PartitionSpec`.
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's Code of Conduct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to