SHuixo commented on issue #6104:
URL: https://github.com/apache/iceberg/issues/6104#issuecomment-1313404740
> > the repeated compression process of full data
>
> Do you mean each compaction will compact all data? The compaction won't be
repeated. I mean during one compaction, some files are compacted to one file.
In next compaction, it won't compact these files again. I don't think it's a
repeated compression process. From my side, it's still a incremental process.
>
> Back to your problem, actuall, there's a pr #2680 to use disk to avoid
OOM. And I think it'll also fix your problem. I think you can try to 1:
increase the memory of TM. 2: do compaction more frequently , so that every
compaction won't compact much data, which may well relieve the OOM.
The new features of **rocksdb** mentioned in the PR #2680 have not found the
relevant configuration items in the latest 1.0.0 version? Do you have to wait
for a subsequent version to be added?
In compressing data it is indeed possible to solve the **OOM** problem by
increasing the memory of TaskManger and increasing the compression frequency of
data.
However, when processing the Flink cdc real-time stream written to iceberg's
table, the checkpoint interval is set to 5 minutes and the compressor interval
is 30 minutes, and the **commit exception** problem will be encountered when
the data compression task is performed.
The subsequent restarted compression task will remain in the Map phase until
the task processing **timeout exception** and exited.
> commit exception:
```
2022-11-11 17:06:57,583 WARN
org.apache.iceberg.actions.BaseRewriteDataFilesAction [] - Failed to
commit rewrite, cleaning up rewritten files
org.apache.iceberg.exceptions.ValidationException: Cannot commit, found new
delete for replaced data file: GenericDataFile{content=data,
file_path=hdfs://nameservice1/user/hive/warehouse/dhome_db.db/ods_d_base_inf_229_iceberg/data/00001-0-70f034b7-9725-4d90-b1ad-95907d30ed19-00001.parquet,
file_format=PARQUET, spec_id=0, partition=PartitionData{},
record_count=951380, file_size_in_bytes=38224976, column_sizes=null,
value_counts=org.apache.iceberg.util.SerializableMap@187f476,
null_value_counts=org.apache.iceberg.util.SerializableMap@17a,
nan_value_counts=org.apache.iceberg.util.SerializableMap@0,
lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f05dccea,
upper_bounds=org.apache.iceberg.SerializableByteBufferMap@ffcfbef8,
key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=null}
at
org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:50)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:418)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:367)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.BaseRewriteFiles.validate(BaseRewriteFiles.java:108)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:175)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:296)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.actions.BaseSnapshotUpdateAction.commit(BaseSnapshotUpdateAction.java:41)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.actions.BaseRewriteDataFilesAction.doReplace(BaseRewriteDataFilesAction.java:298)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.actions.BaseRewriteDataFilesAction.replaceDataFiles(BaseRewriteDataFilesAction.java:277)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.apache.iceberg.actions.BaseRewriteDataFilesAction.execute(BaseRewriteDataFilesAction.java:252)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at
org.XXXX.FlinkCompactIcebergData.main(FlinkCompactIcebergData.java:73)
~[DevXXXX-0.14.1-1.13.5.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_74]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_74]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_74]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
~[flink-dist_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
~[flink-dist_2.11-1.13.5.jar:1.13.5]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_74]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_74]
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
[flink-dist_2.11-1.13.5.jar:1.13.5]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
[flink-dist_2.11-1.13.5.jar:1.13.5]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
[flink-dist_2.11-1.13.5.jar:1.13.5]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.13.5.jar:1.13.5]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.13.5.jar:1.13.5]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.13.5.jar:1.13.5]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.13.5.jar:1.13.5]
```
[compact-iceberg-file-commit-exception.log](https://github.com/apache/iceberg/files/10001421/compact-iceberg-file-commit-exception.log)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]