Hi Till,

Thanks for your support on task manager, in continuation to above email
when we increased the TM and JM memory to run the job with increased
parallelism but the job is getting failed in *one day* with below exception.

Logs : *org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.*

New session configuration is below and parallelism we provided -*p 28* to
job to get more TM to process the heavy load.

*sudo flink-yarn-session -Djobmanager.memory.process.size=16000m
-Dtaskmanager.memory.process.size=40000m -s 14 -d*

*Note:- *
we have Retry Logic mentioned below :
environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, //
max failures per interval
Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
Time.of(60, TimeUnit.SECONDS) // delay ));

*Below is the exception which I am getting:*

org.apache.flink.runtime.JobException: Recovery is suppressed by
FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=1800000,backoffTimeMS=60000,maxFailuresPerInterval=5)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
        at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
        at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710)
        at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287)
        at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255)
        at 
org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954)
        at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
        at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
        at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
        at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
        at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
        at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFailingAllocatedSlot(SlotPoolImpl.java:733)
        at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failAllocation(SlotPoolImpl.java:713)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:562)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.notifyAllocationFailure(JobMaster.java:700)
        at sun.reflect.GeneratedMethodAccessor99.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Container released on a *lost* node
        at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
        ... 22 more
2021-01-10 20:07:48,974 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Stopping checkpoint coordinator for job
a7cffc31c4aeb01356c5132c908be314.
2021-01-10 20:07:48,974 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
 - Shutting down
2021-01-10 20:07:48,978 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Job
a7cffc31c4aeb01356c5132c908be314 reached globally terminal state
FAILED.
2021-01-10 20:07:49,006 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  -
Stopping the JobMaster for job Gas Job Runner
V2(a7cffc31c4aeb01356c5132c908be314).
2021-01-10 20:07:49,006 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
Suspending SlotPool.
2021-01-10 20:07:49,007 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  - Close
ResourceManager connection 39a33f865ba12bac16dd21b834527750:
JobManager is shutting down..
2021-01-10 20:07:49,007 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
Stopping SlotPool.
2021-01-10 20:07:49,007 INFO
org.apache.flink.yarn.YarnResourceManager                     -
Disconnect job manager
00000000000000000000000000000...@akka.tcp://flink@ip-10-6-0-231.ec2.internal:39039/user/rpc/jobmanager_2
for job a7cffc31c4aeb01356c5132c908be314 from the resource manager.


  Could you please help us here why it is failing now when we increased the
“parallelism”?


Thanks & Regards,

-Deep




On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh <about.d...@gmail.com>
wrote:

> Thanks Till, for the detailed explanation.I  tried and it is working fine.
>
> Once again thanks for your quick response.
>
> Regards,
> -Deep
>
> On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <trohrm...@apache.org> wrote:
>
>> Hi Deep,
>>
>> Flink has dropped support for specifying the number of TMs via -n since
>> the
>> introduction of Flip-6. Since then, Flink will automatically start TMs
>> depending on the required resources. Hence, there is no need to specify
>> the
>> -n parameter anymore. Instead, you should specify the parallelism with
>> which you would like to run your job via the -p option.
>>
>> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
>> limit the upper limit of slots a cluster is allowed to allocate [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16605
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <about.d...@gmail.com>
>> wrote:
>>
>> > Hi Guys,
>> >
>> > I’m struggling while initiating the task manager with flink 1.11.0 in
>> AWS
>> > EMR but with older versions it is not. Let me put the full context here.
>> >
>> > *When using Flink 1.9.1 and EMR 5.29.0*
>> >
>> > To create a long running session, we used the below command.
>> >
>> > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm
>> <memory>
>> > -tm <memory> -d*
>> >
>> > and followed by below command to run the final job.
>> >
>> > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys
>> > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>*
>> >
>> > and if “n” is 6 then it is used to create 6 task managers to start the
>> job,
>> > so whatever “n” is configured the result was that number of TM the job
>> is
>> > being started.
>> >
>> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
>> > EMR 6.1.0*) we are unable to achieve the desired values for TM.
>> >
>> > Please find the session Ids of new configuration,
>> >
>> > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB>
>> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of
>> > slot/core> -d*
>> >
>> > And the final Job command
>> >
>> > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar
>> > Path>*
>> >
>> > I have tried a lot of combinations, but nothing worked out so far. I
>> > request your help in this regard as the plan to have this configuration
>> in
>> > *PRODUCTION* soon.
>> >
>> > Thanks in advance.
>> >
>> >
>> > Regards,
>> >
>> > -Deep
>> >
>>
>

Reply via email to