Till Rohrmann created FLINK-3633:
------------------------------------
Summary: Job submission silently fails when using user code types
Key: FLINK-3633
URL: https://issues.apache.org/jira/browse/FLINK-3633
Project: Flink
Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Priority: Blocker
With the changes introduced by FLINK-3327, it is no longer possible to run
remote Flink jobs which work on user code types. The reason is that now the
{{ExecutionConfig}} is directly stored in the {{JobGraph}} which is sent as an
Akka message to the {{JobManager}}. Per default, user code types are
automatically detected and registered in the {{ExecutionConfig}}. When
deserializing a {{JobGraph}} whose {{ExecutionConfig}} contains user code
classes the user code class loader is consequently required. However, Akka does
not have access to it and uses the system class loader. This causes that Akka
silently discards the {{SubmitJob}} message which cannot be deserialized
because of a {{ClassNotFoundException}}.
I propose to not sent the {{ExecutionConfig}} explicitly with the {{JobGraph}}
and, thus, to partially revert the changes to before FLINK-3327. Before, the
{{ExectuionConfig}} was serialized into the job configuration and deserialized
on the {{TaskManager}} using the proper user code class loader.
In order to reproduce the problem you can submit the following job to a remote
cluster.
{code}
public class Job {
public static class CustomType {
private final int value;
public CustomType(int value) {
this.value = value;
}
@Override
public String toString() {
return "CustomType(" + value + ")";
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
"/Users/till/work/flink/workspace/javaFlinkBench/target/javaFlinkBench-1.0-SNAPSHOT.jar");
env.getConfig().disableAutoTypeRegistration();
DataSet<Integer> input = env.fromElements(1,2,3,4,5);
DataSet<CustomType> customTypes = input.map(new
MapFunction<Integer, CustomType>() {
@Override
public CustomType map(Integer integer) throws Exception
{
return new CustomType(integer);
}
});
customTypes.print();
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)