Till Rohrmann created FLINK-3633:
------------------------------------

             Summary: Job submission silently fails when using user code types
                 Key: FLINK-3633
                 URL: https://issues.apache.org/jira/browse/FLINK-3633
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.1.0
            Reporter: Till Rohrmann
            Priority: Blocker


With the changes introduced by FLINK-3327, it is no longer possible to run 
remote Flink jobs which work on user code types. The reason is that now the 
{{ExecutionConfig}} is directly stored in the {{JobGraph}} which is sent as an 
Akka message to the {{JobManager}}. Per default, user code types are 
automatically detected and registered in the {{ExecutionConfig}}. When 
deserializing a {{JobGraph}} whose {{ExecutionConfig}} contains user code 
classes the user code class loader is consequently required. However, Akka does 
not have access to it and uses the system class loader. This causes that Akka 
silently discards the {{SubmitJob}} message which cannot be deserialized 
because of a {{ClassNotFoundException}}.

I propose to not sent the {{ExecutionConfig}} explicitly with the {{JobGraph}} 
and, thus, to partially revert the changes to before FLINK-3327. Before, the 
{{ExectuionConfig}} was serialized into the job configuration and deserialized 
on the {{TaskManager}} using the proper user code class loader.

In order to reproduce the problem you can submit the following job to a remote 
cluster.

{code}
public class Job {
        public static class CustomType {
                private final int value;

                public CustomType(int value) {
                        this.value = value;
                }

                @Override
                public String toString() {
                        return "CustomType(" + value + ")";
                }
        }

        public static void main(String[] args) throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, 
"/Users/till/work/flink/workspace/javaFlinkBench/target/javaFlinkBench-1.0-SNAPSHOT.jar");

                env.getConfig().disableAutoTypeRegistration();

                DataSet<Integer> input = env.fromElements(1,2,3,4,5);

                DataSet<CustomType> customTypes = input.map(new 
MapFunction<Integer, CustomType>() {
                        @Override
                        public CustomType map(Integer integer) throws Exception 
{
                                return new CustomType(integer);
                        }
                });

                customTypes.print();
        }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to