[ 
https://issues.apache.org/jira/browse/FLINK-34132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-34132:
----------------------------------
    Description: 
Batch WordCount job fails when run with AdaptiveBatch scheduler.

*Repro Steps*
{code:java}
flink-yarn-session -Djobmanager.scheduler=adaptive -d

 flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input 
s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
{code}
*Error logs*
{code:java}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
        at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
        at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
        at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 12 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
        ... 20 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
        at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: At the moment, adaptive batch scheduler 
requires batch workloads to be executed with types of all edges being BLOCKING 
or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
        ... 3 more
Caused by: java.lang.IllegalStateException: At the moment, adaptive batch 
scheduler requires batch workloads to be executed with types of all edges being 
BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 3 more

{code}
*Analysis*

I have configured the execution.batch-shuffle-mode to use either 
ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, or 
ALL_EXCHANGES_HYBRID_SELECTIVE, but all attempts resulted in the same error 
message.

The Wordcount program runs fine when setting below in the code
{code:java}
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
{code}
Need to investigate why the execution.batch-shuffle-mode is not being 
recognized and, if this behavior is intentional, correct the reported 
misleading error message. Additionally, we need to address the Wordcount job to 
ensure it runs seamlessly with both batch and adaptive scheduler.

  was:
Batch WordCount job fails when run with AdaptiveBatch scheduler.

*Repro Steps*

{code}
flink-yarn-session -Djobmanager.scheduler=adaptive -d

 flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input 
s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
{code}

*Error logs*

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
        at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
        at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
        at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 12 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
        ... 20 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
        at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: At the moment, adaptive batch scheduler 
requires batch workloads to be executed with types of all edges being BLOCKING 
or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
        ... 3 more
Caused by: java.lang.IllegalStateException: At the moment, adaptive batch 
scheduler requires batch workloads to be executed with types of all edges being 
BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 3 more

{code}

*Analysis*

Have set execution.batch-shuffle-mode to 
ALL_EXCHANGES_BLOCKING/ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE 
but all of them failed with same error message.

The Wordcount program runs fine when setting below in the code

{code}
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
{code}


Need to investigate why the execution.batch-shuffle-mode is not being 
recognized and, if this behavior is intentional, correct the reported 
misleading error message. Additionally, we need to address the Wordcount job to 
ensure it runs seamlessly with both batch and adaptive scheduler.








> Batch WordCount job fails when run with AdaptiveBatch scheduler
> ---------------------------------------------------------------
>
>                 Key: FLINK-34132
>                 URL: https://issues.apache.org/jira/browse/FLINK-34132
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.17.1, 1.18.1
>            Reporter: Prabhu Joseph
>            Priority: Major
>
> Batch WordCount job fails when run with AdaptiveBatch scheduler.
> *Repro Steps*
> {code:java}
> flink-yarn-session -Djobmanager.scheduler=adaptive -d
>  flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input 
> s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
> {code}
> *Error logs*
> {code:java}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
>       at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
>       at 
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
>       at 
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>       ... 12 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
>       ... 20 more
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>       at 
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>       at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: At the moment, adaptive batch scheduler 
> requires batch workloads to be executed with types of all edges being 
> BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
> 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
> 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>       ... 3 more
> Caused by: java.lang.IllegalStateException: At the moment, adaptive batch 
> scheduler requires batch workloads to be executed with types of all edges 
> being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to 
> configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
> 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>       ... 3 more
> {code}
> *Analysis*
> I have configured the execution.batch-shuffle-mode to use either 
> ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, or 
> ALL_EXCHANGES_HYBRID_SELECTIVE, but all attempts resulted in the same error 
> message.
> The Wordcount program runs fine when setting below in the code
> {code:java}
> env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
> {code}
> Need to investigate why the execution.batch-shuffle-mode is not being 
> recognized and, if this behavior is intentional, correct the reported 
> misleading error message. Additionally, we need to address the Wordcount job 
> to ensure it runs seamlessly with both batch and adaptive scheduler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to