Shane-Yu opened a new issue #4312:
URL: https://github.com/apache/iceberg/issues/4312
I rewrite small files( whole file) with flink api on un-partitioned table,
got the failed job and error log as follows.
The environment like this:
> TM's memory 16G
numFiles 717
numRows 176450547
As we can see from the source code, it seems that the Eq-Delelte file should
be loaded into memory when applyEqDeletes with records. Is that too many
Eq-Delete files lead to taskmanager OOM?

- error log:
` ERROR org.apache.flink.client.cli.CliFrontend [] -
Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Rewrite data file error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_201]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_201]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.lang.RuntimeException: Rewrite data file error.
at
org.apache.iceberg.flink.actions.RewriteDataFilesAction.rewriteDataForTasks(RewriteDataFilesAction.java:58)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.actions.BaseRewriteDataFilesAction.execute(BaseRewriteDataFilesAction.java:248)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
com.hdspacex.flink.CompactFileAndExpireSnapshot.main(CompactFileAndExpireSnapshot.java:94)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_201]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_201]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 11 more
Caused by: java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap
space
at
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:150)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:91)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.stream.ReferencePipeline.map(ReferencePipeline.java:186)
~[?:1.8.0_201]
at
org.apache.iceberg.types.Comparators$StructLikeComparator.<init>(Comparators.java:108)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.types.Comparators$StructLikeComparator.<init>(Comparators.java:102)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at org.apache.iceberg.types.Comparators.forType(Comparators.java:53)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.util.StructLikeWrapper.<init>(StructLikeWrapper.java:43)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.util.StructLikeWrapper.forType(StructLikeWrapper.java:34)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:103)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:33)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:356)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:79)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:156)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:185)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:126)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:75)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:84)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:76)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:58)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:130)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:91)
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$184/1365369498.runDefaultAction(Unknown
Source) ~[?:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
t scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]