[
https://issues.apache.org/jira/browse/FLINK-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346263#comment-15346263
]
Juho Autio commented on FLINK-3964:
-----------------------------------
Sorry I didn't realize that {{-m yarn-cluster}} was significant here – I also
had that flag.
I'm a bit concerned that there won't be an easy way to find a good minimum
value for the timeout. I would rather set a timeout for configuring a single
vertice during submission, if that makes sense. This would mean that if
configuring a single vertice takes too long, we get a timeout, but on the other
hand if we have a job that spawns many vertices, it will be able to configure
all of them without timing out (as long as configuring each vertice doesn't
time out). What do you think about that?
Also if this happens, the error message could give a hint to increase
akka.client.timeout (or should it just suggest Dakka.ask.timeout?) and print
the current value. It would make this much easier to fix if it happens.
> Job submission times out with recursive.file.enumeration
> --------------------------------------------------------
>
> Key: FLINK-3964
> URL: https://issues.apache.org/jira/browse/FLINK-3964
> Project: Flink
> Issue Type: Bug
> Components: Batch Connectors and Input/Output Formats, DataSet API
> Affects Versions: 1.0.0
> Reporter: Juho Autio
>
> When using "recursive.file.enumeration" with a big enough folder structure to
> list, flink batch job fails right at the beginning because of a timeout.
> h2. Problem details
> We get this error: {{Communication with JobManager failed: Job submission to
> the JobManager timed out}}.
> The code we have is basically this:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val parameters = new Configuration
> // set the recursive enumeration parameter
> parameters.setBoolean("recursive.file.enumeration", true)
> val parameter = ParameterTool.fromArgs(args)
> val input_data_path : String = parameter.get("input_data_path", null )
> val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text],
> classOf[Text], input_data_path)
> .withParameters(parameters)
> data.first(10).print
> {code}
> If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it
> times out. If we use a more restrictive pattern like
> {{s3n://bucket/path/date=20160523/}}, it doesn't time out.
> To me it seems that time taken to list files shouldn't cause any timeouts on
> job submission level.
> For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in
> {{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have
> even more files to list?
> ----
> P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink
> run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}}
> flag but couldn't get it working.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)