[ 
https://issues.apache.org/jira/browse/FLINK-34132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-34132:
----------------------------
    Component/s: Documentation

> Batch WordCount job fails when run with AdaptiveBatch scheduler
> ---------------------------------------------------------------
>
>                 Key: FLINK-34132
>                 URL: https://issues.apache.org/jira/browse/FLINK-34132
>             Project: Flink
>          Issue Type: Bug
>          Components: Documentation
>    Affects Versions: 1.17.1, 1.18.1
>            Reporter: Prabhu Joseph
>            Assignee: Junrui Li
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.19.0
>
>
> Batch WordCount job fails when run with AdaptiveBatch scheduler.
> *Repro Steps*
> {code:java}
> flink-yarn-session -Djobmanager.scheduler=adaptive -d
>  flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input 
> s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
> {code}
> *Error logs*
> {code:java}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
>       at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
>       at 
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
>       at 
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>       ... 12 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
>       ... 20 more
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>       at 
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>       at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: At the moment, adaptive batch scheduler 
> requires batch workloads to be executed with types of all edges being 
> BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
> 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
> 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>       ... 3 more
> Caused by: java.lang.IllegalStateException: At the moment, adaptive batch 
> scheduler requires batch workloads to be executed with types of all edges 
> being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to 
> configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
> 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>       ... 3 more
> {code}
> *Analysis*
> I have configured the execution.batch-shuffle-mode to use either 
> ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, or 
> ALL_EXCHANGES_HYBRID_SELECTIVE, but all attempts resulted in the same error 
> message.
> The Wordcount program runs fine when setting below in the code
> {code:java}
> env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
> {code}
> Need to investigate why the execution.batch-shuffle-mode is not being 
> recognized and, if this behavior is intentional, correct the reported 
> misleading error message. Additionally, we need to address the Wordcount job 
> to ensure it runs seamlessly with both batch and adaptive scheduler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to