Repository: flink
Updated Branches:
  refs/heads/master de573cf5c -> 64607433d


[FLINK-924] Add automatic dependency retrieval for JarFileCreator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c527a2b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c527a2b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c527a2b8

Branch: refs/heads/master
Commit: c527a2b8f42aa0d57cdda4b0b5fb35dcba1a2538
Parents: de573cf
Author: mingliang <qmlm...@gmail.com>
Authored: Sun Jun 22 21:20:55 2014 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Apr 28 09:31:23 2015 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |   6 +
 .../flink/runtime/util/DependencyVisitor.java   | 351 +++++++++++++++++++
 .../flink/runtime/util/JarFileCreator.java      | 103 +++++-
 .../flink/runtime/util/JarFileCreatorTest.java  | 153 ++++++++
 .../util/jartestprogram/ExternalTokenizer.java  |  39 +++
 .../util/jartestprogram/ExternalTokenizer2.java |  24 ++
 .../runtime/util/jartestprogram/StaticData.java |  35 ++
 .../WordCountWithAnonymousClass.java            |  62 ++++
 .../WordCountWithExternalClass.java             |  47 +++
 .../WordCountWithExternalClass2.java            |  47 +++
 .../jartestprogram/WordCountWithInnerClass.java |  66 ++++
 11 files changed, 930 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5990c32..cc03eab 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -117,6 +117,12 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.ow2.asm</groupId>
+                       <artifactId>asm-all</artifactId>
+                       <version>5.0.3</version>
+               </dependency>
+               
+               <dependency>
                        <groupId>org.scala-lang</groupId>
                        <artifactId>scala-library</artifactId>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
new file mode 100644
index 0000000..5119a38
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.objectweb.asm.AnnotationVisitor;
+import org.objectweb.asm.ClassVisitor;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.FieldVisitor;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.TypePath;
+import org.objectweb.asm.Label;
+import org.objectweb.asm.signature.SignatureReader;
+import org.objectweb.asm.signature.SignatureVisitor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * This class tracks class dependency with ASM visitors.
+ * The tutorial could be found here 
http://asm.ow2.org/doc/tutorial-asm-2.0.html
+ *
+ */
+public class DependencyVisitor extends ClassVisitor {
+
+       private static final String CLASS_EXTENSION = ".class";
+
+       private Set<String> packages = new HashSet<String>();
+
+       private Set<String> innerClasses = new HashSet<String>();
+
+       private Set<String> nameSpace = new HashSet<String>();
+
+       public Set<String> getPackages() {
+               return packages;
+       }
+
+
+       public DependencyVisitor(int api) {
+               super(api);
+       }
+
+       @Override
+       public void visit(int version, int access, String name, String 
signature, String superName, String[] interfaces) {
+               if (signature == null) {
+                       addInternalName(superName);
+                       addInternalNames(interfaces);
+               } else {
+                       addSignature(signature);
+               }
+       }
+
+       @Override
+       public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+               addDesc(desc);
+               return new AnnotationVisitorImpl(Opcodes.ASM5);
+       }
+
+       @Override
+       public FieldVisitor visitField(int access, String name, String desc, 
String signature, Object value) {
+               if (signature == null) {
+                       addDesc(desc);
+               } else {
+                       addTypeSignature(signature);
+               }
+               if (value instanceof Type) {
+                       addType((Type) value);
+               }
+               return new FieldVisitorImpl(Opcodes.ASM5);
+       }
+
+       @Override
+       public MethodVisitor visitMethod(int access, String name, String desc, 
String signature, String[] exceptions) {
+               if (signature == null) {
+                       addMethodDesc(desc);
+               } else {
+                       addSignature(signature);
+               }
+               addInternalNames(exceptions);
+               return new MethodVisitorImpl(Opcodes.ASM5);
+       }
+
+       @Override
+       public void visitInnerClass(String name, String outerName, String 
innerName, int access) {
+               if (!innerClasses.contains(name)) {
+                       try {
+                               innerClasses.add(name);
+                               Class clazz = Class.forName(name.replace('/', 
'.'));
+                               int n = name.lastIndexOf('/');
+                               String className = null;
+                               if (n > -1) {
+                                       className = name.substring(n + 1, 
name.length());
+                               }
+
+                               InputStream classInputStream = 
clazz.getResourceAsStream(className + CLASS_EXTENSION);
+                               new ClassReader(classInputStream).accept(this, 
0);
+                               classInputStream.close();
+                       } catch (ClassNotFoundException e) {
+                               e.printStackTrace();
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                       }
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+
+       public void addNameSpace(Set<String> names) {
+               for (String name : names) {
+                       this.nameSpace.add(name.replace('.', '/'));
+               }
+       }
+
+       private boolean checkUserDefine(String name) {
+               String[] ns = {};
+               ns = nameSpace.toArray(ns);
+
+               for (String s : ns) {
+                       if (name.startsWith(s)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       private void addName(final String name) {
+               if (checkUserDefine(name)) {
+                       packages.add(name);
+               }
+       }
+
+       private void addInternalName(String name) {
+               addType(Type.getObjectType(name));
+       }
+
+       private void addInternalNames(String[] names) {
+               for (int i = 0; names != null && i < names.length; i++) {
+                       addInternalName(names[i]);
+               }
+       }
+
+       private void addDesc(String desc) {
+               addType(Type.getType(desc));
+       }
+
+       private void addMethodDesc(String desc) {
+               addType(Type.getReturnType(desc));
+               Type[] types = Type.getArgumentTypes(desc);
+               for (int i = 0; i < types.length; i++) {
+                       addType(types[i]);
+               }
+       }
+
+       private void addType(Type t) {
+               switch (t.getSort()) {
+                       case Type.ARRAY:
+                               addType(t.getElementType());
+                               break;
+                       case Type.OBJECT:
+                               addName(t.getInternalName());
+                               break;
+               }
+       }
+
+       private void addSignature(String signature) {
+               if (signature != null) {
+                       new SignatureReader(signature).accept(new 
SignatureVisitorImpl(Opcodes.ASM5));
+               }
+       }
+
+       private void addTypeSignature(String signature) {
+               if (signature != null) {
+                       new SignatureReader(signature).acceptType(new 
SignatureVisitorImpl(Opcodes.ASM5));
+               }
+       }
+
+       public class MethodVisitorImpl extends MethodVisitor {
+
+               public MethodVisitorImpl(int api) {
+                       super(api);
+               }
+
+               @Override
+               public AnnotationVisitor visitParameterAnnotation(int 
parameter, String desc, boolean visible) {
+                       addDesc(desc);
+                       return new AnnotationVisitorImpl(Opcodes.ASM5);
+               }
+
+               @Override
+               public void visitTypeInsn(int opcode, String type) {
+                       addType(Type.getObjectType(type));
+               }
+
+               @Override
+               public void visitFieldInsn(int opcode, String owner, String 
name, String desc) {
+                       addInternalName(owner);
+                       addDesc(desc);
+               }
+
+               @Override
+               public void visitMethodInsn(int opcode, String owner, String 
name, String desc) {
+                       addInternalName(owner);
+                       addMethodDesc(desc);
+               }
+
+               @Override
+               public void visitMethodInsn(int opcode, String owner, String 
name, String desc, boolean itf) {
+                       addInternalName(owner);
+                       addMethodDesc(desc);
+               }
+
+               @Override
+               public void visitLdcInsn(Object cst) {
+                       if (cst instanceof Type) {
+                               addType((Type) cst);
+                       }
+               }
+
+               @Override
+               public void visitMultiANewArrayInsn(String desc, int dims) {
+                       addDesc(desc);
+               }
+
+               @Override
+               public void visitLocalVariable(String name, String desc, String 
signature, Label start, Label end, int index) {
+                       if (signature == null) {
+                               addDesc(desc);
+                       } else {
+                               addTypeSignature(signature);
+                       }
+               }
+
+               @Override
+               public AnnotationVisitor visitAnnotationDefault() {
+                       return new AnnotationVisitorImpl(Opcodes.ASM5);
+               }
+
+               @Override
+               public AnnotationVisitor visitAnnotation(String desc, boolean 
visible) {
+                       addDesc(desc);
+                       return new AnnotationVisitorImpl(Opcodes.ASM5);
+               }
+
+               @Override
+               public void visitTryCatchBlock(Label start, Label end, Label 
handler, String type) {
+                       if (type != null) {
+                               addInternalName(type);
+                       }
+               }
+       }
+
+       public class AnnotationVisitorImpl extends AnnotationVisitor {
+               public AnnotationVisitorImpl(int api) {
+                       super(api);
+               }
+
+               @Override
+               public void visit(String name, Object value) {
+                       if (value instanceof Type) {
+                               addType((Type) value);
+                       }
+               }
+
+               @Override
+               public void visitEnum(String name, String desc, String value) {
+                       addDesc(desc);
+               }
+
+               @Override
+               public AnnotationVisitor visitAnnotation(String name, String 
desc) {
+                       addDesc(desc);
+                       return this;
+               }
+
+               @Override
+               public AnnotationVisitor visitArray(String name) {
+                       return this;
+               }
+       }
+
+       public class FieldVisitorImpl extends FieldVisitor {
+
+               public FieldVisitorImpl(int api) {
+                       super(api);
+               }
+
+               @Override
+               public AnnotationVisitor visitTypeAnnotation(int typeRef, 
TypePath typePath, String desc, boolean visible) {
+                       addDesc(desc);
+                       return new AnnotationVisitorImpl(Opcodes.ASM5);
+               }
+       }
+
+       public class SignatureVisitorImpl extends SignatureVisitor {
+
+               private String signatureClassName;
+
+               private boolean newParameter = false;
+
+               public SignatureVisitorImpl(int api) {
+                       super(api);
+               }
+
+               @Override
+               public SignatureVisitor visitParameterType() {
+                       this.newParameter = true;
+                       return this;
+               }
+
+               @Override
+               public SignatureVisitor visitReturnType() {
+                       this.newParameter = true;
+                       return this;
+               }
+
+               @Override
+               public void visitClassType(String name) {
+                       if (signatureClassName == null || this.newParameter == 
true) {
+                               signatureClassName = name;
+                               newParameter = false;
+                       }
+                       addInternalName(name);
+               }
+
+               @Override
+               public void visitInnerClassType(String name) {
+                       signatureClassName = signatureClassName + "$" + name;
+                       addInternalName(signatureClassName);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
index b145d36..c55a9dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
@@ -19,11 +19,15 @@
 
 package org.apache.flink.runtime.util;
 
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.Opcodes;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.jar.JarEntry;
@@ -54,6 +58,11 @@ public class JarFileCreator {
        private final File outputFile;
 
        /**
+        * The namespace of the dependencies to be packaged.
+        */
+       private final Set<String> packages = new HashSet<String>();
+
+       /**
         * Constructs a new jar file creator.
         * 
         * @param outputFile
@@ -70,9 +79,89 @@ public class JarFileCreator {
         * @param clazz
         *        the class to be added to the jar file.
         */
-       public synchronized void addClass(final Class<?> clazz) {
+       public synchronized JarFileCreator addClass(final Class<?> clazz) {
 
                this.classSet.add(clazz);
+               String name = clazz.getName();
+               int n = name.lastIndexOf('.');
+               if (n > -1) {
+                       name = name.substring(0, n);
+               }
+               return addPackage(name);
+       }
+
+       /**
+        * Manually specify the package of the dependencies.
+        *
+        * @param p
+        *                the package to be included.
+        */
+       public synchronized JarFileCreator addPackage(String p) {
+               this.packages.add(p);
+               return this;
+       }
+
+       /**
+        * Manually specify the packages of the dependencies.
+        *
+        * @param packages
+        *        the packages to be included.
+        */
+       public synchronized JarFileCreator addPackages(String[] packages) {
+               for (String p : packages) {
+                       addPackage(p);
+               }
+               return this;
+       }
+
+       /**
+        * Add the dependencies within the given packages automatically.
+        * @throws IOException
+        *                      throw if an error occurs while read the class 
file.
+        */
+       private synchronized void addDependencies() throws IOException {
+               List<String> dependencies = new ArrayList<String>();
+               for (Class clazz : classSet) {
+                       dependencies.add(clazz.getName());
+               }
+               //Traverse the dependency tree using BFS.
+               int head = 0;
+               while (head != dependencies.size()) {
+                       DependencyVisitor v = new 
DependencyVisitor(Opcodes.ASM5);
+                       v.addNameSpace(this.packages);
+                       InputStream classInputStream = null;
+                       String name = dependencies.get(head);
+                       try {
+                               Class clazz = Class.forName(name);
+                               int n = name.lastIndexOf('.');
+                               String className = null;
+                               if (n > -1) {
+                                       className = name.substring(n + 1, 
name.length());
+                               }
+                               classInputStream = 
clazz.getResourceAsStream(className + CLASS_EXTENSION);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException(e.getMessage());
+                       }
+                       new ClassReader(classInputStream).accept(v, 0);
+                       classInputStream.close();
+
+                       //Update the BFS queue.
+                       Set<String> classPackages = v.getPackages();
+                       for (String s : classPackages) {
+                               if (!dependencies.contains(s.replace('/','.'))) 
{
+                                       dependencies.add(s.replace('/','.'));
+                               }
+                       }
+                       head++;
+               }
+
+               for (String dependency : dependencies) {
+                       try {
+                               this.classSet.add(Class.forName(dependency));
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException(e.getMessage());
+                       }
+               }
        }
 
        /**
@@ -84,6 +173,8 @@ public class JarFileCreator {
         *         thrown if an error occurs while writing to the output file
         */
        public synchronized void createJarFile() throws IOException {
+               //Retrieve dependencies automatically
+               addDependencies();
 
                // Temporary buffer for the stream copy
                final byte[] buf = new byte[128];
@@ -107,7 +198,14 @@ public class JarFileCreator {
 
                        jos.putNextEntry(new JarEntry(entry));
 
-                       final InputStream classInputStream = 
clazz.getResourceAsStream(clazz.getSimpleName() + CLASS_EXTENSION);
+                       String name = clazz.getName();
+                       int n = name.lastIndexOf('.');
+                       String className = null;
+                       if (n > -1) {
+                               className = name.substring(n + 1, 
name.length());
+                       }
+                       //Using the part after last dot instead of 
class.getSimpleName() could resolve the problem of inner class.
+                       final InputStream classInputStream = 
clazz.getResourceAsStream(className + CLASS_EXTENSION);
 
                        int num = classInputStream.read(buf);
                        while (num != -1) {
@@ -118,7 +216,6 @@ public class JarFileCreator {
                        classInputStream.close();
                        jos.closeEntry();
                }
-
                jos.close();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
new file mode 100644
index 0000000..a5f9de9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.util;
+
+import 
org.apache.flink.runtime.util.jartestprogram.WordCountWithAnonymousClass;
+import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass;
+import 
org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass2;
+import org.apache.flink.runtime.util.jartestprogram.WordCountWithInnerClass;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.jar.JarInputStream;
+import java.util.zip.ZipEntry;
+
+
+public class JarFileCreatorTest {
+
+       @Test
+       public void TestExternalClass() throws IOException {
+               File out = new File("/tmp/jarcreatortest1.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(WordCountWithExternalClass.class).createJarFile();
+
+               Set<String> ans = new HashSet<String>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class");
+               JarInputStream jis = new JarInputStream(new 
FileInputStream(out));
+               ZipEntry ze;
+               int count = 3;
+               while ((ze = jis.getNextEntry()) != null) {
+                       count--;
+                       ans.remove(ze.getName());
+               }
+               Assert.assertTrue("Jar file for External Class is not correct", 
count == 0 && ans.size() == 0);
+
+               out.delete();
+       }
+
+       @Test
+       public void TestInnerClass() throws IOException {
+               File out = new File("/tmp/jarcreatortest2.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(WordCountWithInnerClass.class).createJarFile();
+
+               Set<String> ans = new HashSet<String>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
+               JarInputStream jis = new JarInputStream(new 
FileInputStream(out));
+               ZipEntry ze;
+               int count = 3;
+               while ((ze = jis.getNextEntry()) != null) {
+                       count--;
+                       ans.remove(ze.getName());
+               }
+               Assert.assertTrue("Jar file for Inner Class is not correct", 
count == 0 && ans.size() == 0);
+
+               out.delete();
+       }
+
+       @Test
+       public void TestAnonymousClass() throws IOException {
+               File out = new File("/tmp/jarcreatortest3.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(WordCountWithAnonymousClass.class).createJarFile();
+
+               Set<String> ans = new HashSet<String>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass$1.class");
+               JarInputStream jis = new JarInputStream(new 
FileInputStream(out));
+               ZipEntry ze;
+               int count = 3;
+               while ((ze = jis.getNextEntry()) != null) {
+                       count--;
+                       ans.remove(ze.getName());
+               }
+               Assert.assertTrue("Jar file for Anonymous Class is not 
correct", count == 0 && ans.size() == 0);
+
+               out.delete();
+       }
+
+       @Test
+       public void TestExtendIdentifier() throws IOException {
+               File out = new File("/tmp/jarcreatortest4.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(WordCountWithExternalClass2.class).createJarFile();
+
+               Set<String> ans = new HashSet<String>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class");
+               JarInputStream jis = new JarInputStream(new 
FileInputStream(out));
+               ZipEntry ze;
+               int count = 4;
+               while ((ze = jis.getNextEntry()) != null) {
+                       count--;
+                       ans.remove(ze.getName());
+               }
+               Assert.assertTrue("Jar file for Extend Identifier is not 
correct", count == 0 && ans.size() == 0);
+
+               out.delete();
+       }
+
+       @Test
+       public void TestUDFPackage() throws IOException {
+               File out = new File("/tmp/jarcreatortest5.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(WordCountWithInnerClass.class)
+                       .addPackage("org.apache.flink.util").createJarFile();
+
+               Set<String> ans = new HashSet<String>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
+               ans.add("org/apache/flink/util/Collector.class");
+               JarInputStream jis = new JarInputStream(new 
FileInputStream(out));
+               ZipEntry ze;
+               int count = 4;
+               while ((ze = jis.getNextEntry()) != null) {
+                       count--;
+                       ans.remove(ze.getName());
+               }
+               Assert.assertTrue("Jar file for UDF package is not correct", 
count == 0 && ans.size() == 0);
+
+               out.delete();
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java
new file mode 100644
index 0000000..3c94684
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public class ExternalTokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+       @Override
+       public void flatMap(String value, Collector<Tuple2<String, Integer>> 
out) {
+               // normalize and split the line
+               String[] tokens = value.toLowerCase().split("\\W+");
+
+               // emit the pairs
+               for (String token : tokens) {
+                       if (token.length() > 0) {
+                               out.collect(new Tuple2<String, Integer>(token, 
1));
+                       }
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java
new file mode 100644
index 0000000..f52c537
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+
+public class ExternalTokenizer2 extends ExternalTokenizer {
+       public void dummyMethod() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java
new file mode 100644
index 0000000..543c62e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+
+public class StaticData {
+       public static DataSet<String> 
getDefaultTextLineDataSet(ExecutionEnvironment env) {
+
+               return env.fromElements(
+                       "To be, or not to be,--that is the question:--",
+                       "Whether 'tis nobler in the mind to suffer",
+                       "The slings and arrows of outrageous fortune",
+                       "Or to take arms against a sea of troubles,"
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java
new file mode 100644
index 0000000..b18cd5a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public class WordCountWithAnonymousClass {
+
+       public static void main(String[] args) throws Exception {
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataSet<String> text = 
StaticData.getDefaultTextLineDataSet(env);
+
+               DataSet<Tuple2<String, Integer>> counts =
+                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
+                       text.flatMap(new FlatMapFunction<String, Tuple2<String, 
Integer>>() {
+                               @Override
+                               public void flatMap(String value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                       // normalize and split the line
+                                       String[] tokens = 
value.toLowerCase().split("\\W+");
+
+                                       // emit the pairs
+                                       for (String token : tokens) {
+                                               if (token.length() > 0) {
+                                                       out.collect(new 
Tuple2<String, Integer>(token, 1));
+                                               }
+                                       }
+                               }
+                       })
+                               // group by the tuple field "0" and sum up 
tuple field "1"
+                               .groupBy(0)
+                               .sum(1);
+
+               // emit result
+               counts.print();
+
+               // execute program
+               env.execute("WordCount Example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java
new file mode 100644
index 0000000..87ce0cb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+public class WordCountWithExternalClass {
+
+       public static void main(String[] args) throws Exception {
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataSet<String> text = 
StaticData.getDefaultTextLineDataSet(env);
+
+               DataSet<Tuple2<String, Integer>> counts =
+                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
+                       text.flatMap(new ExternalTokenizer())
+                               // group by the tuple field "0" and sum up 
tuple field "1"
+                               .groupBy(0)
+                               .sum(1);
+
+               // emit result
+               counts.print();
+
+               // execute program
+               env.execute("WordCount Example");
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java
new file mode 100644
index 0000000..8568a7e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+public class WordCountWithExternalClass2 {
+
+       public static void main(String[] args) throws Exception {
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataSet<String> text = 
StaticData.getDefaultTextLineDataSet(env);
+
+               DataSet<Tuple2<String, Integer>> counts =
+                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
+                       text.flatMap(new ExternalTokenizer2())
+                               // group by the tuple field "0" and sum up 
tuple field "1"
+                               .groupBy(0)
+                               .sum(1);
+
+               // emit result
+               counts.print();
+
+               // execute program
+               env.execute("WordCount Example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java
new file mode 100644
index 0000000..362fe33
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public class WordCountWithInnerClass {
+
+       public static void main(String[] args) throws Exception {
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataSet<String> text = 
StaticData.getDefaultTextLineDataSet(env);
+
+               DataSet<Tuple2<String, Integer>> counts =
+                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
+                       text.flatMap(new Tokenizer())
+                               // group by the tuple field "0" and sum up 
tuple field "1"
+                               .groupBy(0)
+                               .sum(1);
+
+               // emit result
+               counts.print();
+
+               // execute program
+               env.execute("WordCount Example");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+}

Reply via email to