[ https://issues.apache.org/jira/browse/FLINK-6875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Francisco Rosa closed FLINK-6875. --------------------------------- Resolution: Invalid I believe this was a versioning issue. Version on server did not match client version. Closing. > Remote DataSet API job submission timing out > -------------------------------------------- > > Key: FLINK-6875 > URL: https://issues.apache.org/jira/browse/FLINK-6875 > Project: Flink > Issue Type: Bug > Components: DataSet API > Affects Versions: 1.3.0 > Reporter: Francisco Rosa > Fix For: 1.3.1 > > > When trying to submit a DataSet API job from a remote environment, Flink > times out. This works well in 1.2.1 and seems to be broken in 1.3.0. > The following program reproduces the issue: > {code:title=Example|borderStyle=solid} > package com.test; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import java.util.Date; > public class FlinkRemoteIssue { > public static void main(String[] args) throws Exception { > String host = "192.168.1.235"; > int port = 6123; > String[] jars = { > "c:\\tmp\\FlinkRemoteIssue-all-1.0-SNAPSHOT.jar" > }; > ExecutionEnvironment env = > ExecutionEnvironment.createRemoteEnvironment(host, port, jars); > DataSet<String> pipe = env.fromElements("1"); > pipe.map( (oneString) -> { > System.err.println("Map executing: " + new Date()); > return "Map result: " + new Date(); > }).writeAsText("/tmp/lixo-" + System.currentTimeMillis()); > env.execute("Flink Remote Issue"); > } > } > {code} > Result from running program (running inside IntelliJ): > {code} > Submitting job with JobID: 9f96638f014a87783cecd54b61c55d9a. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@10.97.120.139:6123/user/jobmanager#1432447220] with > leader session id 00000000-0000-0000-0000-000000000000. > Exception in thread "main" > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Couldn't retrieve the JobExecutionResult from the > JobManager. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) > at > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188) > at > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172) > at com.test.FlinkRemoteIssue.main(FlinkRemoteIssue.java:25) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't > retrieve the JobExecutionResult from the JobManager. > at > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309) > at > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467) > ... 13 more > Caused by: > org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job > submission to the JobManager timed out. You may increase > 'akka.client.timeout' in case the JobManager needs more time to configure and > confirm the job submission. > at > org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) > at > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Process finished with exit code 1 > {code} > Message in JobManager log: > {code} > 2017-06-08 10:57:03,310 WARN org.apache.flink.runtime.jobmanager.JobManager > - Discard message > LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: > 4d414efd050a871863f3319a8c56781c),EXECUTION_RESULT_AND_STATE_CHANGES)) > because the expected leader session ID None did not equal the received leader > session ID Some(00000000-0000-0000-0000-000000000000). > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)