Repository: flink
Updated Branches:
  refs/heads/release-0.8 5b420d847 -> 79da5a920


[FLINK-1438] [jobmanager] Fix class loading issue for messages with custom 
input splits


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/689e26f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/689e26f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/689e26f7

Branch: refs/heads/release-0.8
Commit: 689e26f7a6536fa6944184078388ff029874604b
Parents: 5b420d8
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 9 14:01:10 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 20:56:42 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/RemoteExecutor.java |   2 +-
 .../runtime/jobmanager/InputSplitWrapper.java   |  71 ++++++++
 .../flink/runtime/jobmanager/JobManager.java    |  16 +-
 .../protocols/InputSplitProviderProtocol.java   |   4 +-
 .../taskmanager/TaskInputSplitProvider.java     |  20 ++-
 .../flink/runtime/taskmanager/TaskManager.java  |   2 +-
 flink-tests/pom.xml                             |  45 ++++-
 flink-tests/src/test/assembly/test-assembly.xml |  37 ----
 .../test/assembly/test-custominput-assembly.xml |  37 ++++
 .../src/test/assembly/test-kmeans-assembly.xml  |  37 ++++
 .../InputSplitClassLoaderITCase.java            |  56 ++++++
 .../jar/CustomInputSpitProgram.java             | 172 +++++++++++++++++++
 .../PackagedProgramEndToEndITCase.java          |   8 +-
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 14 files changed, 452 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index c90d334..6c52667 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -94,7 +94,7 @@ public class RemoteExecutor extends PlanExecutor {
                return c.run(p, -1, true);
        }
 
-       public JobExecutionResult executeJar(String jarPath, String 
assemblerClass, String[] args) throws Exception {
+       public JobExecutionResult executeJar(String jarPath, String 
assemblerClass, String... args) throws Exception {
                File jarFile = new File(jarPath);
                PackagedProgram program = new PackagedProgram(jarFile, 
assemblerClass, args);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
new file mode 100644
index 0000000..741e4ad
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+public class InputSplitWrapper implements IOReadableWritable {
+
+       private InputSplit split;
+       
+       private byte[] splitData;
+       
+       // 
------------------------------------------------------------------------
+       
+       public InputSplitWrapper() {}
+       
+       public InputSplitWrapper(InputSplit split) throws Exception {
+               this.split = split;
+               this.splitData = InstantiationUtil.serializeObject(split);
+       }
+
+       public InputSplit getSplit(ClassLoader userCodeClassLoader) throws 
ClassNotFoundException, IOException {
+               if (split == null) {
+                       if (splitData == null) {
+                               throw new IllegalStateException("No split or 
split data available");
+                       }
+                       
+                       split = (InputSplit) 
InstantiationUtil.deserializeObject(splitData, userCodeClassLoader);
+               }
+               
+               return split;
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void read(DataInputView in) throws IOException {
+               int len = in.readInt();
+               splitData = new byte[len];
+               in.readFully(splitData);
+               split = null;
+       }
+       
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(splitData.length);
+               out.write(splitData);           
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 80607c2..4c51c09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -480,7 +480,7 @@ public class JobManager implements 
ExtendedManagementProtocol, InputSplitProvide
        }
        
        @Override
-       public InputSplit requestNextInputSplit(JobID jobID, JobVertexID 
vertexId, ExecutionAttemptID executionAttempt) throws IOException {
+       public InputSplitWrapper requestNextInputSplit(JobID jobID, JobVertexID 
vertexId, ExecutionAttemptID executionAttempt) throws IOException {
 
                final ExecutionGraph graph = this.currentJobs.get(jobID);
                if (graph == null) {
@@ -505,6 +505,7 @@ public class JobManager implements 
ExtendedManagementProtocol, InputSplitProvide
                Execution execution = 
graph.getRegisteredExecutions().get(executionAttempt);
                if(execution == null) {
                        LOG.error("Can not find Execution for attempt " + 
executionAttempt);
+                       return null;
                } else {
                        SimpleSlot slot = execution.getAssignedResource();
                        if(slot != null) {
@@ -512,7 +513,18 @@ public class JobManager implements 
ExtendedManagementProtocol, InputSplitProvide
                        }
                }
                
-               return splitAssigner.getNextInputSplit(host);
+               InputSplit split = splitAssigner.getNextInputSplit(host);
+               if (split == null) {
+                       return null;
+               }
+               
+               try {
+                       return new InputSplitWrapper(split);
+               }
+               catch (Throwable t) {
+                       graph.fail(new Exception("Error serializing input 
split: " + t.getMessage(), t));
+                       return null;
+               }
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
index be1846a..2d6c862 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
@@ -20,16 +20,16 @@ package org.apache.flink.runtime.protocols;
 
 import java.io.IOException;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.InputSplitWrapper;
 
 /**
  * The input split provider protocol is used to facilitate RPC calls related 
to the lazy split assignment.
  */
 public interface InputSplitProviderProtocol extends VersionedProtocol {
 
-       InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertex, 
ExecutionAttemptID executionAttempt) throws IOException;
+       InputSplitWrapper requestNextInputSplit(JobID jobID, JobVertexID 
vertex, ExecutionAttemptID executionAttempt) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index d4e1b7d..813d6d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmanager.InputSplitWrapper;
 import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
@@ -37,19 +36,30 @@ public class TaskInputSplitProvider implements 
InputSplitProvider {
        
        private final ExecutionAttemptID executionAttempt;
        
-       public TaskInputSplitProvider(InputSplitProviderProtocol protocol, 
JobID jobId, JobVertexID vertexId, ExecutionAttemptID executionAttempt) {
+       private final ClassLoader userCodeClassLoader;
+       
+       public TaskInputSplitProvider(InputSplitProviderProtocol protocol, 
JobID jobId, JobVertexID vertexId,
+                       ExecutionAttemptID executionAttempt, ClassLoader 
userCodeClassLoader)
+       {
                this.protocol = protocol;
                this.jobId = jobId;
                this.vertexId = vertexId;
                this.executionAttempt = executionAttempt;
+               this.userCodeClassLoader = userCodeClassLoader;
        }
 
        @Override
        public InputSplit getNextInputSplit() {
                try {
-                       return protocol.requestNextInputSplit(jobId, vertexId, 
executionAttempt);
+                       InputSplitWrapper wrapper = 
protocol.requestNextInputSplit(jobId, vertexId, executionAttempt);
+                       if (wrapper == null) {
+                               return null;
+                       }
+                       else {
+                               return wrapper.getSplit(userCodeClassLoader);
+                       }
                }
-               catch (IOException e) {
+               catch (Exception e) {
                        throw new RuntimeException("Requesting the next 
InputSplit failed.", e);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index e6e5287..02cf710 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -607,7 +607,7 @@ public class TaskManager implements TaskOperationProtocol {
                                throw new Exception("TaskManager contains 
already a task with executionId " + executionId);
                        }
                        
-                       final InputSplitProvider splitProvider = new 
TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, 
executionId);
+                       final InputSplitProvider splitProvider = new 
TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, 
executionId, userCodeClassLoader);
                        final RuntimeEnvironment env = new 
RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, 
this.ioManager, splitProvider, this.accumulatorProtocolProxy, 
this.bcVarManager);
                        task.setEnvironment(env);
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index c773a21..cf7fb6b 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -297,7 +297,7 @@ under the License.
                                <version>2.4</version><!--$NO-MVN-MAN-VER$-->
                                <executions>
                                        <execution>
-                                               <id>create-test-dependency</id>
+                                               <id>create-kmeans-jar</id>
                                                
<phase>process-test-classes</phase>
                                                <goals>
                                                        <goal>single</goal>
@@ -308,10 +308,29 @@ under the License.
                                                                        
<mainClass>org.apache.flink.test.util.testjar.KMeansForTest</mainClass>
                                                                </manifest>
                                                        </archive>
-                                                       
<finalName>maven</finalName>
+                                                       
<finalName>kmeans</finalName>
                                                        <attach>false</attach>
                                                        <descriptors>
-                                                               
<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+                                                               
<descriptor>src/test/assembly/test-kmeans-assembly.xml</descriptor>
+                                                       </descriptors>
+                                               </configuration>
+                                       </execution>
+                                       <execution>
+                                               
<id>create-custominputsplit-jar</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <archive>
+                                                               <manifest>
+                                                                       
<mainClass>org.apache.flink.test.classloading.jar.CustomInputSpitProgram</mainClass>
+                                                               </manifest>
+                                                       </archive>
+                                                       
<finalName>customsplit</finalName>
+                                                       <attach>false</attach>
+                                                       <descriptors>
+                                                               
<descriptor>src/test/assembly/test-custominput-assembly.xml</descriptor>
                                                        </descriptors>
                                                </configuration>
                                        </execution>
@@ -326,7 +345,7 @@ under the License.
                                <version>2.5</version><!--$NO-MVN-MAN-VER$-->
                                <executions>
                                        <execution>
-                                               <id>remove-kmeansfortest</id>
+                                               
<id>remove-kmeans-test-dependencies</id>
                                                
<phase>process-test-classes</phase>
                                                <goals>
                                                        <goal>clean</goal>
@@ -343,6 +362,24 @@ under the License.
                                                        </filesets>
                                                </configuration>
                                        </execution>
+                                       <execution>
+                                               
<id>remove-custominputformat-test-dependencies</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>clean</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<excludeDefaultDirectories>true</excludeDefaultDirectories>
+                                                       <filesets>
+                                                               <fileset>
+                                                                       
<directory>${project.build.testOutputDirectory}</directory>
+                                                                       
<includes>
+                                                                               
<include>**/classloading/jar/*.class</include>
+                                                                       
</includes>
+                                                               </fileset>
+                                                       </filesets>
+                                               </configuration>
+                                       </execution>
                                </executions>
                        </plugin>
                </plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-assembly.xml 
b/flink-tests/src/test/assembly/test-assembly.xml
deleted file mode 100644
index bad0e38..0000000
--- a/flink-tests/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-
-<assembly>
-       <id>test-jar</id>
-       <formats>
-               <format>jar</format>
-       </formats>
-       <includeBaseDirectory>false</includeBaseDirectory>
-       <fileSets>
-               <fileSet>
-                       
<directory>${project.build.testOutputDirectory}</directory>
-                       <outputDirectory>/</outputDirectory>
-                       <!--modify/add include to match your package(s) -->
-                       <includes>
-                               
<include>org/apache/flink/test/util/testjar/**</include>
-                       </includes>
-               </fileSet>
-       </fileSets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-custominput-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml 
b/flink-tests/src/test/assembly/test-custominput-assembly.xml
new file mode 100644
index 0000000..9d4800b
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+       <id>test-jar</id>
+       <formats>
+               <format>jar</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+       <fileSets>
+               <fileSet>
+                       
<directory>${project.build.testOutputDirectory}</directory>
+                       <outputDirectory>/</outputDirectory>
+                       <!--modify/add include to match your package(s) -->
+                       <includes>
+                               
<include>org/apache/flink/test/classloading/jar/**</include>
+                       </includes>
+               </fileSet>
+       </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-kmeans-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml 
b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
new file mode 100644
index 0000000..bad0e38
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+       <id>test-jar</id>
+       <formats>
+               <format>jar</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+       <fileSets>
+               <fileSet>
+                       
<directory>${project.build.testOutputDirectory}</directory>
+                       <outputDirectory>/</outputDirectory>
+                       <!--modify/add include to match your package(s) -->
+                       <includes>
+                               
<include>org/apache/flink/test/util/testjar/**</include>
+                       </includes>
+               </fileSet>
+       </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
new file mode 100644
index 0000000..705dbf3
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import java.io.File;
+
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.client.program.PackagedProgram;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InputSplitClassLoaderITCase {
+       
+       private static final String JAR_FILE = 
"target/customsplit-test-jar.jar";
+       
+       @Test
+       public void testJobWithCustomInputFormat() {    
+               try {
+                       NepheleMiniCluster cluster = new NepheleMiniCluster();
+                       cluster.setNumTaskManager(2);
+                       cluster.setTaskManagerNumSlots(2);
+                       cluster.start();
+                       
+                       try {
+                               int port = cluster.getJobManagerRpcPort();
+                               
+                               PackagedProgram prog = new PackagedProgram(new 
File(JAR_FILE),
+                                               new String[] { JAR_FILE, 
"localhost", String.valueOf(port) } );
+                               prog.invokeInteractiveModeForExecution();
+                       }
+                       finally {
+                               cluster.stop();
+                       }
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       Assert.fail(t.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
new file mode 100644
index 0000000..31aaaf2
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+@SuppressWarnings("serial")
+public class CustomInputSpitProgram {
+       
+       public static void main(String[] args) throws Exception {
+               
+               final String jarFile = args[0];
+               final String host = args[1];
+               final int port = Integer.parseInt(args[2]);
+               
+               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+
+               DataSet<Integer> data = env.createInput(new 
CustomInputFormat());
+
+               data
+                       .map(new MapFunction<Integer, Tuple2<Integer, 
Double>>() {
+                               @Override
+                               public Tuple2<Integer, Double> map(Integer 
value) {
+                                       return new Tuple2<Integer, 
Double>(value, value * 0.5);
+                               }
+                       })
+                       .output(new 
DiscardingOutputFormat<Tuple2<Integer,Double>>());
+
+               env.execute();
+       }
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class CustomInputFormat implements 
InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private Integer value;
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) {
+                       return null;
+               }
+
+               @Override
+               public CustomInputSplit[] createInputSplits(int minNumSplits) {
+                       CustomInputSplit[] splits = new 
CustomInputSplit[minNumSplits];
+                       for (int i = 0; i < minNumSplits; i++) {
+                               splits[i] = new CustomInputSplit(i);
+                       }
+                       return splits;
+               }
+
+               @Override
+               public InputSplitAssigner 
getInputSplitAssigner(CustomInputSplit[] inputSplits) {
+                       return new CustomSplitAssigner(inputSplits);
+               }
+
+               @Override
+               public void open(CustomInputSplit split) {
+                       this.value = split.getSplitNumber();
+               }
+
+               @Override
+               public boolean reachedEnd() {
+                       return this.value == null;
+               }
+
+               @Override
+               public Integer nextRecord(Integer reuse) {
+                       Integer val = this.value;
+                       this.value = null;
+                       return val;
+               }
+
+               @Override
+               public void close() {}
+
+               @Override
+               public TypeInformation<Integer> getProducedType() {
+                       return BasicTypeInfo.INT_TYPE_INFO;
+               }
+       }
+
+       public static final class CustomInputSplit implements InputSplit {
+
+               private static final long serialVersionUID = 1L;
+
+               private int splitNumber;
+
+               public CustomInputSplit() {
+                       this(-1);
+               }
+
+               public CustomInputSplit(int splitNumber) {
+                       this.splitNumber = splitNumber;
+               }
+
+               @Override
+               public int getSplitNumber() {
+                       return this.splitNumber;
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       out.writeInt(splitNumber);
+               }
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       splitNumber = in.readInt();
+               }
+       }
+
+       public static final class CustomSplitAssigner implements 
InputSplitAssigner {
+
+               private final List<CustomInputSplit> remainingSplits;
+
+               public CustomSplitAssigner(CustomInputSplit[] splits) {
+                       this.remainingSplits = new 
ArrayList<CustomInputSplit>(Arrays.asList(splits));
+               }
+
+               @Override
+               public InputSplit getNextInputSplit(String host) {
+                       synchronized (this) {
+                               int size = remainingSplits.size();
+                               if (size > 0) {
+                                       return remainingSplits.remove(size - 1);
+                               } else {
+                                       return null;
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index e7e3f95..430d817 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -27,10 +27,10 @@ import org.apache.flink.test.testdata.KMeansData;
 import org.junit.Assert;
 import org.junit.Test;
 
-// When the API changes KMeansForTest needs to be rebuilt and the 
KMeansForTest.jar in resources needs
-// to be replaced with the new one.
 
 public class PackagedProgramEndToEndITCase {
+       
+       private static final String JAR_PATH = "target/kmeans-test-jar.jar";
 
        @Test
        public void testEverything() {
@@ -56,7 +56,7 @@ public class PackagedProgramEndToEndITCase {
                        fwClusters.write(KMeansData.INITIAL_CENTERS);
                        fwClusters.close();
 
-                       String jarPath = "target/maven-test-jar.jar";
+                       
 
                        // run KMeans
                        cluster.setNumTaskManager(2);
@@ -65,7 +65,7 @@ public class PackagedProgramEndToEndITCase {
 
                        RemoteExecutor ex = new RemoteExecutor("localhost", 
cluster.getJobManagerRpcPort());
 
-                       ex.executeJar(jarPath,
+                       ex.executeJar(JAR_PATH,
                                        
"org.apache.flink.test.util.testjar.KMeansForTest",
                                        new String[] {
                                                        
points.toURI().toString(),

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index a063957..2cae061 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -190,7 +190,7 @@ object TestObjectWithBogusReturns {
       nums.map { x => return 1; x * 2}.print()
     } catch {
       case inv: InvalidProgramException => // all good
-      case _ => fail("Bogus return statement not detected.")
+      case _: Throwable => fail("Bogus return statement not detected.")
     }
 
     nums.writeAsText(resultPath)

Reply via email to