[ 
https://issues.apache.org/jira/browse/FLINK-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14298847#comment-14298847
 ] 

Till Rohrmann edited comment on FLINK-1438 at 1/30/15 4:48 PM:
---------------------------------------------------------------

This is not only happening in local mode but also on cloud-11 with jobs which 
contain the InputSplit class information in the user code jars.

The result is that Akka cannot deserialize the object and terminates the 
transmission with the following error:

WARN  akka.remote.ReliableDeliverySupervisor                        - 
Association with remote system 
[akka.tcp://fl...@cloud-22.dima.tu-berlin.de:53627] has failed, address is now 
gated for [5000] ms. Reason is: [invalid type code: 00].


was (Author: till.rohrmann):
This is not only happening in local mode but also on cloud-11 with jobs which 
contain the InputSplit class information in the user code jars.

> ClassCastException for Custom InputSplit in local mode
> ------------------------------------------------------
>
>                 Key: FLINK-1438
>                 URL: https://issues.apache.org/jira/browse/FLINK-1438
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 0.8
>            Reporter: Fabian Hueske
>            Priority: Minor
>
> Jobs with custom InputSplits fail with a ClassCastException such as 
> {{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit 
> cannot be cast to 
> org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} 
> if executed on a local setup. 
> This issue is probably related to different ClassLoaders used by the 
> JobManager when InputSplits are generated and when they are handed to the 
> InputFormat by the TaskManager. Moving the class of the custom InputSplit 
> into the {{./lib}} folder and removing it from the job's makes the job work.
> To reproduce the bug, run the following job on a local setup. 
> {code}
> public class CustomSplitTestJob {
>       public static void main(String[] args) throws Exception {
>               ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>               DataSet<String> x = env.createInput(new TestFileInputFormat());
>               x.print();
>               env.execute();
>       }
>       public static class TestFileInputFormat implements 
> InputFormat<String,TestFileInputSplit> {
>               @Override
>               public void configure(Configuration parameters) {
>               }
>               @Override
>               public BaseStatistics getStatistics(BaseStatistics 
> cachedStatistics) throws IOException {
>                       return null;
>               }
>               @Override
>               public TestFileInputSplit[] createInputSplits(int minNumSplits) 
> throws IOException {
>                       return new TestFileInputSplit[]{new 
> TestFileInputSplit()};
>               }
>               @Override
>               public InputSplitAssigner 
> getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
>                       return new LocatableInputSplitAssigner(inputSplits);
>               }
>               @Override
>               public void open(TestFileInputSplit split) throws IOException {
>               }
>               @Override
>               public boolean reachedEnd() throws IOException {
>                       return false;
>               }
>               @Override
>               public String nextRecord(String reuse) throws IOException {
>                       return null;
>               }
>               @Override
>               public void close() throws IOException {
>               }
>       }
>       public static class TestFileInputSplit extends FileInputSplit {
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to