juiceyang opened a new issue, #4044:
URL: https://github.com/apache/amoro/issues/4044
### What happened?
We have a Flink job that performs CDC synchronization from an upstream
database to Iceberg. This job generates many equality delete files, and many of
them are dangling delete files. When Amoro runs the clean-dangling-delete-files
operation on this table, it fails with the following error.
```
2026-01-14 02:49:39,401 WARN
[async-dangling-delete-files-cleaning-executor-7]
[org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer] [] -
Failed to commit dangling delete file for table $catalog.$database.$table, but
ignore it
org.apache.iceberg.exceptions.ValidationException: Missing required files to
delete:
oss://masked-for-security-concern/data/_id_bucket_20=12/00000-0-5a087250-6137-4365-b5f9-9b04ceae2e47-305883.parquet,...dangling-delete-file-list-omitted
at
org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
~[iceberg-api-lilith-trino-1.6.1.1.jar:?]
at
org.apache.iceberg.ManifestFilterManager.validateRequiredDeletes(ManifestFilterManager.java:237)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at
org.apache.iceberg.ManifestFilterManager.filterManifests(ManifestFilterManager.java:199)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at
org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:837)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:236)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:386)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:384)
~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.amoro.op.MixedUpdate.commit(MixedUpdate.java:143)
~[amoro-format-iceberg-0.8.0-incubating.jar:0.8.0-incubating]
at
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.clearInternalTableDanglingDeleteFiles(IcebergTableMaintainer.java:477)
~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.cleanDanglingDeleteFiles(IcebergTableMaintainer.java:336)
~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.cleanDanglingDeleteFiles(IcebergTableMaintainer.java:162)
~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at
org.apache.amoro.server.table.executor.DanglingDeleteFilesCleaningExecutor.execute(DanglingDeleteFilesCleaningExecutor.java:62)
~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at
org.apache.amoro.server.table.executor.BaseTableExecutor.executeTask(BaseTableExecutor.java:82)
~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at
org.apache.amoro.server.table.executor.BaseTableExecutor.lambda$null$0(BaseTableExecutor.java:72)
~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_452]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_452]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_452]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_452]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_452]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_452]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_452]
```
### Affects Versions
0.8.1-incubating
### What table formats are you seeing the problem on?
Iceberg
### What engines are you seeing the problem on?
Spark, Flink, AMS
### How to reproduce
1. Create an Iceberg table using Flink (mainly to enable upsert operations
later by leveraging the primary key).
```sql
CREATE TABLE `iceberg`.`db`.`t` (
`id` BIGINT COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'format-version'='2',
'write.upsert.enabled' = 'true',
'identifier-fields' = '[id]'
);
```
2. Add a new partition column using Spark.
```
ALTER TABLE iceberg.db.t ADD PARTITION FIELD bucket(5, id);
```
3. Upsert a row in Flink.
At this point, Flink will create an equality delete file and a data file
containing the same id. Both files are written under the partition path
id_bucket_5=<some_value>.
```sql
INSERT INTO iceberg.db.t /*+ OPTIONS('upsert-enabled'='true') */ values (5,
'5');
```
4. Modify the partition column using Spark.
```sql
ALTER TABLE iceberg.guozhenyang.t REPLACE PARTITION FIELD bucket(5, id) WITH
bucket(10, id);
```
5. Upsert a row in Flink.
```sql
INSERT INTO iceberg.db.t /*+ OPTIONS('upsert-enabled'='true') */ values (5,
'55');
```
6. Run clean-dangling-delete-files on this table in Amoro.
Restarting Amoro can trigger the clean-dangling-delete-files operation, but
once it starts, it immediately throws an error.
```
org.apache.iceberg.exceptions.ValidationException: Missing required files to
delete:
$warehouse_location/$database_name.db/t/data/id_bucket_10=3/00000-0-2be57c08-cf6a-46c8-a93f-529f953138be-00002.parquet
at
org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
at
org.apache.iceberg.ManifestFilterManager.validateRequiredDeletes(ManifestFilterManager.java:248)
at
org.apache.iceberg.ManifestFilterManager.filterManifests(ManifestFilterManager.java:200)
at
org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:838)
at
org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:226)
at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:376)
at
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:374)
...
```
### Relevant log output
```shell
```
### Anything else
The root cause is that in
`org.apache.amoro.server.utils.IcebergTableUtil#getDanglingDeleteFiles`, the
call to `org.apache.amoro.scan.TableEntriesScan#entries` returns an
`IcebergFileEntry` whose `file()`'s `partition()` value is incorrect.
The underlying issue is in the implementation of
`org.apache.amoro.scan.TableEntriesScan#buildDeleteFile`:
`fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME),
StructLike.class)` returns values for all `PartitionSpec` columns, but the code
assumes it only returns values for the columns in the current/latest
`PartitionSpec`.
### Are you willing to submit a PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's Code of Conduct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]