Fabian Hueske created FLINK-1438:
------------------------------------

             Summary: ClassCastException for Custom InputSplit in local mode
                 Key: FLINK-1438
                 URL: https://issues.apache.org/jira/browse/FLINK-1438
             Project: Flink
          Issue Type: Bug
          Components: JobManager
    Affects Versions: 0.8
            Reporter: Fabian Hueske
            Priority: Minor


Jobs with custom InputSplits fail with a ClassCastException such as 
{{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit 
cannot be cast to 
org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} if 
executed on a local setup. 

This issue is probably related to different ClassLoaders used by the JobManager 
when InputSplits are generated and when they are handed to the InputFormat by 
the TaskManager. Moving the class of the custom InputSplit into the {{./lib}} 
folder and removing it from the job's makes the job work.

To reproduce the bug, run the following job on a local setup. 

{code}
public class CustomSplitTestJob {

        public static void main(String[] args) throws Exception {

                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

                DataSet<String> x = env.createInput(new TestFileInputFormat());
                x.print();

                env.execute();
        }

        public static class TestFileInputFormat implements 
InputFormat<String,TestFileInputSplit> {

                @Override
                public void configure(Configuration parameters) {

                }

                @Override
                public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) throws IOException {
                        return null;
                }

                @Override
                public TestFileInputSplit[] createInputSplits(int minNumSplits) 
throws IOException {
                        return new TestFileInputSplit[]{new 
TestFileInputSplit()};
                }

                @Override
                public InputSplitAssigner 
getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
                        return new LocatableInputSplitAssigner(inputSplits);
                }

                @Override
                public void open(TestFileInputSplit split) throws IOException {

                }

                @Override
                public boolean reachedEnd() throws IOException {
                        return false;
                }

                @Override
                public String nextRecord(String reuse) throws IOException {
                        return null;
                }

                @Override
                public void close() throws IOException {

                }
        }

        public static class TestFileInputSplit extends FileInputSplit {

        }

}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to