Shane-Yu opened a new issue #4312:
URL: https://github.com/apache/iceberg/issues/4312


    I rewrite small files( whole file) with flink api on un-partitioned table,  
got the failed job and error log as follows.
   
    The environment like this:
         
        
   
   > TM's memory       16G
         numFiles              717
         numRows             176450547
    
   
   As we can see from the source code, it seems that the Eq-Delelte file should 
be loaded into memory when applyEqDeletes with records.  Is that too many  
Eq-Delete files lead to taskmanager OOM?
   
   
![image](https://user-images.githubusercontent.com/26053387/157817146-335d4391-75bf-4f7a-8ed1-28bb46f2cc1c.png)
   
   - error log:
   
   `  ERROR org.apache.flink.client.cli.CliFrontend                      [] - 
Error while running the command.
   org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Rewrite data file error.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_201]
        at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_201]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 [flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
   Caused by: java.lang.RuntimeException: Rewrite data file error.
        at 
org.apache.iceberg.flink.actions.RewriteDataFilesAction.rewriteDataForTasks(RewriteDataFilesAction.java:58)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.actions.BaseRewriteDataFilesAction.execute(BaseRewriteDataFilesAction.java:248)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
com.hdspacex.flink.CompactFileAndExpireSnapshot.main(CompactFileAndExpireSnapshot.java:94)
 ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_201]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_201]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_201]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 11 more
   Caused by: java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap 
space
        at 
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:150)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:91)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
   Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.stream.ReferencePipeline.map(ReferencePipeline.java:186) 
~[?:1.8.0_201]
        at 
org.apache.iceberg.types.Comparators$StructLikeComparator.<init>(Comparators.java:108)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.types.Comparators$StructLikeComparator.<init>(Comparators.java:102)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at org.apache.iceberg.types.Comparators.forType(Comparators.java:53) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.util.StructLikeWrapper.<init>(StructLikeWrapper.java:43) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.util.StructLikeWrapper.forType(StructLikeWrapper.java:34) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:103) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at org.apache.iceberg.util.StructLikeSet.add(StructLikeSet.java:33) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:356)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:79) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:156) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:185) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:126) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.flink.source.RowDataFileScanTaskReader.open(RowDataFileScanTaskReader.java:75)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.openTaskIterator(DataIterator.java:84)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:76)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:58) 
~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:130)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.iceberg.flink.source.RowDataRewriter$RewriteMap.map(RowDataRewriter.java:91)
 ~[iceberg-flink-runtime-1.12-0.13.1.jar:?]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$184/1365369498.runDefaultAction(Unknown
 Source) ~[?:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
   t scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to