[
https://issues.apache.org/jira/browse/FLINK-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14298847#comment-14298847
]
Till Rohrmann edited comment on FLINK-1438 at 1/30/15 4:48 PM:
---------------------------------------------------------------
This is not only happening in local mode but also on cloud-11 with jobs which
contain the InputSplit class information in the user code jars.
The result is that Akka cannot deserialize the object and terminates the
transmission with the following error:
WARN akka.remote.ReliableDeliverySupervisor -
Association with remote system
[akka.tcp://[email protected]:53627] has failed, address is now
gated for [5000] ms. Reason is: [invalid type code: 00].
was (Author: till.rohrmann):
This is not only happening in local mode but also on cloud-11 with jobs which
contain the InputSplit class information in the user code jars.
> ClassCastException for Custom InputSplit in local mode
> ------------------------------------------------------
>
> Key: FLINK-1438
> URL: https://issues.apache.org/jira/browse/FLINK-1438
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 0.8
> Reporter: Fabian Hueske
> Priority: Minor
>
> Jobs with custom InputSplits fail with a ClassCastException such as
> {{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit
> cannot be cast to
> org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}}
> if executed on a local setup.
> This issue is probably related to different ClassLoaders used by the
> JobManager when InputSplits are generated and when they are handed to the
> InputFormat by the TaskManager. Moving the class of the custom InputSplit
> into the {{./lib}} folder and removing it from the job's makes the job work.
> To reproduce the bug, run the following job on a local setup.
> {code}
> public class CustomSplitTestJob {
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet<String> x = env.createInput(new TestFileInputFormat());
> x.print();
> env.execute();
> }
> public static class TestFileInputFormat implements
> InputFormat<String,TestFileInputSplit> {
> @Override
> public void configure(Configuration parameters) {
> }
> @Override
> public BaseStatistics getStatistics(BaseStatistics
> cachedStatistics) throws IOException {
> return null;
> }
> @Override
> public TestFileInputSplit[] createInputSplits(int minNumSplits)
> throws IOException {
> return new TestFileInputSplit[]{new
> TestFileInputSplit()};
> }
> @Override
> public InputSplitAssigner
> getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
> return new LocatableInputSplitAssigner(inputSplits);
> }
> @Override
> public void open(TestFileInputSplit split) throws IOException {
> }
> @Override
> public boolean reachedEnd() throws IOException {
> return false;
> }
> @Override
> public String nextRecord(String reuse) throws IOException {
> return null;
> }
> @Override
> public void close() throws IOException {
> }
> }
> public static class TestFileInputSplit extends FileInputSplit {
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)