SHuixo commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1313404740

   > > the repeated compression process of full data
   > 
   > Do you mean each compaction will compact all data? The compaction won't be 
repeated. I mean during one compaction, some files are compacted to one file. 
In next compaction, it won't compact these files again. I don't think it's a 
repeated compression process. From my side, it's still a incremental process.
   > 
   > Back to your problem, actuall, there's a pr #2680 to use disk to avoid 
OOM. And I think it'll also fix your problem. I think you can try to 1: 
increase the memory of TM. 2: do compaction more frequently , so that every 
compaction won't compact much data, which may well relieve the OOM.
   
   The new features of **rocksdb** mentioned in the PR #2680 have not found the 
relevant configuration items in the latest 1.0.0 version? Do you have to wait 
for a subsequent version to be added?    
   
   In compressing data it is indeed possible to solve the **OOM** problem by 
increasing the memory of TaskManger and increasing the compression frequency of 
data.
   
   However, when processing the Flink cdc real-time stream written to iceberg's 
table, the checkpoint interval is set to 5 minutes and the compressor interval 
is 30 minutes, and the **commit exception** problem will be encountered when 
the data compression task is performed.
   The subsequent restarted compression task will remain in the Map phase until 
the task processing **timeout exception** and exited.
   
   > commit exception:
   
   ``` 
   2022-11-11 17:06:57,583 WARN  
org.apache.iceberg.actions.BaseRewriteDataFilesAction        [] - Failed to 
commit rewrite, cleaning up rewritten files
   org.apache.iceberg.exceptions.ValidationException: Cannot commit, found new 
delete for replaced data file: GenericDataFile{content=data, 
file_path=hdfs://nameservice1/user/hive/warehouse/dhome_db.db/ods_d_base_inf_229_iceberg/data/00001-0-70f034b7-9725-4d90-b1ad-95907d30ed19-00001.parquet,
 file_format=PARQUET, spec_id=0, partition=PartitionData{}, 
record_count=951380, file_size_in_bytes=38224976, column_sizes=null, 
value_counts=org.apache.iceberg.util.SerializableMap@187f476, 
null_value_counts=org.apache.iceberg.util.SerializableMap@17a, 
nan_value_counts=org.apache.iceberg.util.SerializableMap@0, 
lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f05dccea, 
upper_bounds=org.apache.iceberg.SerializableByteBufferMap@ffcfbef8, 
key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=null}
        at 
org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:50)
 ~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:418)
 ~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:367)
 ~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.BaseRewriteFiles.validate(BaseRewriteFiles.java:108) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:175) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:296) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.actions.BaseSnapshotUpdateAction.commit(BaseSnapshotUpdateAction.java:41)
 ~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.actions.BaseRewriteDataFilesAction.doReplace(BaseRewriteDataFilesAction.java:298)
 ~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.actions.BaseRewriteDataFilesAction.replaceDataFiles(BaseRewriteDataFilesAction.java:277)
 ~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.apache.iceberg.actions.BaseRewriteDataFilesAction.execute(BaseRewriteDataFilesAction.java:252)
 ~[DevXXXX-0.14.1-1.13.5.jar:?]
        at 
org.XXXX.FlinkCompactIcebergData.main(FlinkCompactIcebergData.java:73) 
~[DevXXXX-0.14.1-1.13.5.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_74]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_74]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_74]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_74]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_74]
        at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
   ```
   
   
[compact-iceberg-file-commit-exception.log](https://github.com/apache/iceberg/files/10001421/compact-iceberg-file-commit-exception.log)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to