Repository: flink Updated Branches: refs/heads/release-0.8 5b420d847 -> 79da5a920
[FLINK-1438] [jobmanager] Fix class loading issue for messages with custom input splits Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/689e26f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/689e26f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/689e26f7 Branch: refs/heads/release-0.8 Commit: 689e26f7a6536fa6944184078388ff029874604b Parents: 5b420d8 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 9 14:01:10 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 20:56:42 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/RemoteExecutor.java | 2 +- .../runtime/jobmanager/InputSplitWrapper.java | 71 ++++++++ .../flink/runtime/jobmanager/JobManager.java | 16 +- .../protocols/InputSplitProviderProtocol.java | 4 +- .../taskmanager/TaskInputSplitProvider.java | 20 ++- .../flink/runtime/taskmanager/TaskManager.java | 2 +- flink-tests/pom.xml | 45 ++++- flink-tests/src/test/assembly/test-assembly.xml | 37 ---- .../test/assembly/test-custominput-assembly.xml | 37 ++++ .../src/test/assembly/test-kmeans-assembly.xml | 37 ++++ .../InputSplitClassLoaderITCase.java | 56 ++++++ .../jar/CustomInputSpitProgram.java | 172 +++++++++++++++++++ .../PackagedProgramEndToEndITCase.java | 8 +- .../scala/functions/ClosureCleanerITCase.scala | 2 +- 14 files changed, 452 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index c90d334..6c52667 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -94,7 +94,7 @@ public class RemoteExecutor extends PlanExecutor { return c.run(p, -1, true); } - public JobExecutionResult executeJar(String jarPath, String assemblerClass, String[] args) throws Exception { + public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception { File jarFile = new File(jarPath); PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args); http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java new file mode 100644 index 0000000..741e4ad --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import java.io.IOException; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +public class InputSplitWrapper implements IOReadableWritable { + + private InputSplit split; + + private byte[] splitData; + + // ------------------------------------------------------------------------ + + public InputSplitWrapper() {} + + public InputSplitWrapper(InputSplit split) throws Exception { + this.split = split; + this.splitData = InstantiationUtil.serializeObject(split); + } + + public InputSplit getSplit(ClassLoader userCodeClassLoader) throws ClassNotFoundException, IOException { + if (split == null) { + if (splitData == null) { + throw new IllegalStateException("No split or split data available"); + } + + split = (InputSplit) InstantiationUtil.deserializeObject(splitData, userCodeClassLoader); + } + + return split; + } + + // ------------------------------------------------------------------------ + + @Override + public void read(DataInputView in) throws IOException { + int len = in.readInt(); + splitData = new byte[len]; + in.readFully(splitData); + split = null; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(splitData.length); + out.write(splitData); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java index 80607c2..4c51c09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java @@ -480,7 +480,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide } @Override - public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId, ExecutionAttemptID executionAttempt) throws IOException { + public InputSplitWrapper requestNextInputSplit(JobID jobID, JobVertexID vertexId, ExecutionAttemptID executionAttempt) throws IOException { final ExecutionGraph graph = this.currentJobs.get(jobID); if (graph == null) { @@ -505,6 +505,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide Execution execution = graph.getRegisteredExecutions().get(executionAttempt); if(execution == null) { LOG.error("Can not find Execution for attempt " + executionAttempt); + return null; } else { SimpleSlot slot = execution.getAssignedResource(); if(slot != null) { @@ -512,7 +513,18 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide } } - return splitAssigner.getNextInputSplit(host); + InputSplit split = splitAssigner.getNextInputSplit(host); + if (split == null) { + return null; + } + + try { + return new InputSplitWrapper(split); + } + catch (Throwable t) { + graph.fail(new Exception("Error serializing input split: " + t.getMessage(), t)); + return null; + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java index be1846a..2d6c862 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java @@ -20,16 +20,16 @@ package org.apache.flink.runtime.protocols; import java.io.IOException; -import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.protocols.VersionedProtocol; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.InputSplitWrapper; /** * The input split provider protocol is used to facilitate RPC calls related to the lazy split assignment. */ public interface InputSplitProviderProtocol extends VersionedProtocol { - InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertex, ExecutionAttemptID executionAttempt) throws IOException; + InputSplitWrapper requestNextInputSplit(JobID jobID, JobVertexID vertex, ExecutionAttemptID executionAttempt) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java index d4e1b7d..813d6d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java @@ -18,13 +18,12 @@ package org.apache.flink.runtime.taskmanager; -import java.io.IOException; - import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.jobmanager.InputSplitWrapper; import org.apache.flink.runtime.protocols.InputSplitProviderProtocol; public class TaskInputSplitProvider implements InputSplitProvider { @@ -37,19 +36,30 @@ public class TaskInputSplitProvider implements InputSplitProvider { private final ExecutionAttemptID executionAttempt; - public TaskInputSplitProvider(InputSplitProviderProtocol protocol, JobID jobId, JobVertexID vertexId, ExecutionAttemptID executionAttempt) { + private final ClassLoader userCodeClassLoader; + + public TaskInputSplitProvider(InputSplitProviderProtocol protocol, JobID jobId, JobVertexID vertexId, + ExecutionAttemptID executionAttempt, ClassLoader userCodeClassLoader) + { this.protocol = protocol; this.jobId = jobId; this.vertexId = vertexId; this.executionAttempt = executionAttempt; + this.userCodeClassLoader = userCodeClassLoader; } @Override public InputSplit getNextInputSplit() { try { - return protocol.requestNextInputSplit(jobId, vertexId, executionAttempt); + InputSplitWrapper wrapper = protocol.requestNextInputSplit(jobId, vertexId, executionAttempt); + if (wrapper == null) { + return null; + } + else { + return wrapper.getSplit(userCodeClassLoader); + } } - catch (IOException e) { + catch (Exception e) { throw new RuntimeException("Requesting the next InputSplit failed.", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java index e6e5287..02cf710 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java @@ -607,7 +607,7 @@ public class TaskManager implements TaskOperationProtocol { throw new Exception("TaskManager contains already a task with executionId " + executionId); } - final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, executionId); + final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, executionId, userCodeClassLoader); final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy, this.bcVarManager); task.setEnvironment(env); http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index c773a21..cf7fb6b 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -297,7 +297,7 @@ under the License. <version>2.4</version><!--$NO-MVN-MAN-VER$--> <executions> <execution> - <id>create-test-dependency</id> + <id>create-kmeans-jar</id> <phase>process-test-classes</phase> <goals> <goal>single</goal> @@ -308,10 +308,29 @@ under the License. <mainClass>org.apache.flink.test.util.testjar.KMeansForTest</mainClass> </manifest> </archive> - <finalName>maven</finalName> + <finalName>kmeans</finalName> <attach>false</attach> <descriptors> - <descriptor>src/test/assembly/test-assembly.xml</descriptor> + <descriptor>src/test/assembly/test-kmeans-assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + <execution> + <id>create-custominputsplit-jar</id> + <phase>process-test-classes</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.flink.test.classloading.jar.CustomInputSpitProgram</mainClass> + </manifest> + </archive> + <finalName>customsplit</finalName> + <attach>false</attach> + <descriptors> + <descriptor>src/test/assembly/test-custominput-assembly.xml</descriptor> </descriptors> </configuration> </execution> @@ -326,7 +345,7 @@ under the License. <version>2.5</version><!--$NO-MVN-MAN-VER$--> <executions> <execution> - <id>remove-kmeansfortest</id> + <id>remove-kmeans-test-dependencies</id> <phase>process-test-classes</phase> <goals> <goal>clean</goal> @@ -343,6 +362,24 @@ under the License. </filesets> </configuration> </execution> + <execution> + <id>remove-custominputformat-test-dependencies</id> + <phase>process-test-classes</phase> + <goals> + <goal>clean</goal> + </goals> + <configuration> + <excludeDefaultDirectories>true</excludeDefaultDirectories> + <filesets> + <fileset> + <directory>${project.build.testOutputDirectory}</directory> + <includes> + <include>**/classloading/jar/*.class</include> + </includes> + </fileset> + </filesets> + </configuration> + </execution> </executions> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-assembly.xml b/flink-tests/src/test/assembly/test-assembly.xml deleted file mode 100644 index bad0e38..0000000 --- a/flink-tests/src/test/assembly/test-assembly.xml +++ /dev/null @@ -1,37 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - ---> - -<assembly> - <id>test-jar</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${project.build.testOutputDirectory}</directory> - <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> - <includes> - <include>org/apache/flink/test/util/testjar/**</include> - </includes> - </fileSet> - </fileSets> -</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-custominput-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml new file mode 100644 index 0000000..9d4800b --- /dev/null +++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml @@ -0,0 +1,37 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> + +<assembly> + <id>test-jar</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.testOutputDirectory}</directory> + <outputDirectory>/</outputDirectory> + <!--modify/add include to match your package(s) --> + <includes> + <include>org/apache/flink/test/classloading/jar/**</include> + </includes> + </fileSet> + </fileSets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-kmeans-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml b/flink-tests/src/test/assembly/test-kmeans-assembly.xml new file mode 100644 index 0000000..bad0e38 --- /dev/null +++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml @@ -0,0 +1,37 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> + +<assembly> + <id>test-jar</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.testOutputDirectory}</directory> + <outputDirectory>/</outputDirectory> + <!--modify/add include to match your package(s) --> + <includes> + <include>org/apache/flink/test/util/testjar/**</include> + </includes> + </fileSet> + </fileSets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java new file mode 100644 index 0000000..705dbf3 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.classloading; + +import java.io.File; + +import org.apache.flink.client.minicluster.NepheleMiniCluster; +import org.apache.flink.client.program.PackagedProgram; +import org.junit.Assert; +import org.junit.Test; + +public class InputSplitClassLoaderITCase { + + private static final String JAR_FILE = "target/customsplit-test-jar.jar"; + + @Test + public void testJobWithCustomInputFormat() { + try { + NepheleMiniCluster cluster = new NepheleMiniCluster(); + cluster.setNumTaskManager(2); + cluster.setTaskManagerNumSlots(2); + cluster.start(); + + try { + int port = cluster.getJobManagerRpcPort(); + + PackagedProgram prog = new PackagedProgram(new File(JAR_FILE), + new String[] { JAR_FILE, "localhost", String.valueOf(port) } ); + prog.invokeInteractiveModeForExecution(); + } + finally { + cluster.stop(); + } + } + catch (Throwable t) { + t.printStackTrace(); + Assert.fail(t.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java new file mode 100644 index 0000000..31aaaf2 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.classloading.jar; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +@SuppressWarnings("serial") +public class CustomInputSpitProgram { + + public static void main(String[] args) throws Exception { + + final String jarFile = args[0]; + final String host = args[1]; + final int port = Integer.parseInt(args[2]); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile); + + DataSet<Integer> data = env.createInput(new CustomInputFormat()); + + data + .map(new MapFunction<Integer, Tuple2<Integer, Double>>() { + @Override + public Tuple2<Integer, Double> map(Integer value) { + return new Tuple2<Integer, Double>(value, value * 0.5); + } + }) + .output(new DiscardingOutputFormat<Tuple2<Integer,Double>>()); + + env.execute(); + } + // -------------------------------------------------------------------------------------------- + + public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> { + + private static final long serialVersionUID = 1L; + + private Integer value; + + @Override + public void configure(Configuration parameters) {} + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + + @Override + public CustomInputSplit[] createInputSplits(int minNumSplits) { + CustomInputSplit[] splits = new CustomInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new CustomInputSplit(i); + } + return splits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) { + return new CustomSplitAssigner(inputSplits); + } + + @Override + public void open(CustomInputSplit split) { + this.value = split.getSplitNumber(); + } + + @Override + public boolean reachedEnd() { + return this.value == null; + } + + @Override + public Integer nextRecord(Integer reuse) { + Integer val = this.value; + this.value = null; + return val; + } + + @Override + public void close() {} + + @Override + public TypeInformation<Integer> getProducedType() { + return BasicTypeInfo.INT_TYPE_INFO; + } + } + + public static final class CustomInputSplit implements InputSplit { + + private static final long serialVersionUID = 1L; + + private int splitNumber; + + public CustomInputSplit() { + this(-1); + } + + public CustomInputSplit(int splitNumber) { + this.splitNumber = splitNumber; + } + + @Override + public int getSplitNumber() { + return this.splitNumber; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(splitNumber); + } + + @Override + public void read(DataInputView in) throws IOException { + splitNumber = in.readInt(); + } + } + + public static final class CustomSplitAssigner implements InputSplitAssigner { + + private final List<CustomInputSplit> remainingSplits; + + public CustomSplitAssigner(CustomInputSplit[] splits) { + this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits)); + } + + @Override + public InputSplit getNextInputSplit(String host) { + synchronized (this) { + int size = remainingSplits.size(); + if (size > 0) { + return remainingSplits.remove(size - 1); + } else { + return null; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java index e7e3f95..430d817 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java @@ -27,10 +27,10 @@ import org.apache.flink.test.testdata.KMeansData; import org.junit.Assert; import org.junit.Test; -// When the API changes KMeansForTest needs to be rebuilt and the KMeansForTest.jar in resources needs -// to be replaced with the new one. public class PackagedProgramEndToEndITCase { + + private static final String JAR_PATH = "target/kmeans-test-jar.jar"; @Test public void testEverything() { @@ -56,7 +56,7 @@ public class PackagedProgramEndToEndITCase { fwClusters.write(KMeansData.INITIAL_CENTERS); fwClusters.close(); - String jarPath = "target/maven-test-jar.jar"; + // run KMeans cluster.setNumTaskManager(2); @@ -65,7 +65,7 @@ public class PackagedProgramEndToEndITCase { RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRpcPort()); - ex.executeJar(jarPath, + ex.executeJar(JAR_PATH, "org.apache.flink.test.util.testjar.KMeansForTest", new String[] { points.toURI().toString(), http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala index a063957..2cae061 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala @@ -190,7 +190,7 @@ object TestObjectWithBogusReturns { nums.map { x => return 1; x * 2}.print() } catch { case inv: InvalidProgramException => // all good - case _ => fail("Bogus return statement not detected.") + case _: Throwable => fail("Bogus return statement not detected.") } nums.writeAsText(resultPath)