Repository: flink Updated Branches: refs/heads/master de573cf5c -> 64607433d
[FLINK-924] Add automatic dependency retrieval for JarFileCreator Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c527a2b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c527a2b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c527a2b8 Branch: refs/heads/master Commit: c527a2b8f42aa0d57cdda4b0b5fb35dcba1a2538 Parents: de573cf Author: mingliang <qmlm...@gmail.com> Authored: Sun Jun 22 21:20:55 2014 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Apr 28 09:31:23 2015 +0200 ---------------------------------------------------------------------- flink-runtime/pom.xml | 6 + .../flink/runtime/util/DependencyVisitor.java | 351 +++++++++++++++++++ .../flink/runtime/util/JarFileCreator.java | 103 +++++- .../flink/runtime/util/JarFileCreatorTest.java | 153 ++++++++ .../util/jartestprogram/ExternalTokenizer.java | 39 +++ .../util/jartestprogram/ExternalTokenizer2.java | 24 ++ .../runtime/util/jartestprogram/StaticData.java | 35 ++ .../WordCountWithAnonymousClass.java | 62 ++++ .../WordCountWithExternalClass.java | 47 +++ .../WordCountWithExternalClass2.java | 47 +++ .../jartestprogram/WordCountWithInnerClass.java | 66 ++++ 11 files changed, 930 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 5990c32..cc03eab 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -117,6 +117,12 @@ under the License. </dependency> <dependency> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-all</artifactId> + <version>5.0.3</version> + </dependency> + + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java new file mode 100644 index 0000000..5119a38 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.objectweb.asm.AnnotationVisitor; +import org.objectweb.asm.ClassVisitor; +import org.objectweb.asm.Opcodes; +import org.objectweb.asm.FieldVisitor; +import org.objectweb.asm.MethodVisitor; +import org.objectweb.asm.ClassReader; +import org.objectweb.asm.Type; +import org.objectweb.asm.TypePath; +import org.objectweb.asm.Label; +import org.objectweb.asm.signature.SignatureReader; +import org.objectweb.asm.signature.SignatureVisitor; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Set; + + +/** + * This class tracks class dependency with ASM visitors. + * The tutorial could be found here http://asm.ow2.org/doc/tutorial-asm-2.0.html + * + */ +public class DependencyVisitor extends ClassVisitor { + + private static final String CLASS_EXTENSION = ".class"; + + private Set<String> packages = new HashSet<String>(); + + private Set<String> innerClasses = new HashSet<String>(); + + private Set<String> nameSpace = new HashSet<String>(); + + public Set<String> getPackages() { + return packages; + } + + + public DependencyVisitor(int api) { + super(api); + } + + @Override + public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) { + if (signature == null) { + addInternalName(superName); + addInternalNames(interfaces); + } else { + addSignature(signature); + } + } + + @Override + public AnnotationVisitor visitAnnotation(String desc, boolean visible) { + addDesc(desc); + return new AnnotationVisitorImpl(Opcodes.ASM5); + } + + @Override + public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) { + if (signature == null) { + addDesc(desc); + } else { + addTypeSignature(signature); + } + if (value instanceof Type) { + addType((Type) value); + } + return new FieldVisitorImpl(Opcodes.ASM5); + } + + @Override + public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) { + if (signature == null) { + addMethodDesc(desc); + } else { + addSignature(signature); + } + addInternalNames(exceptions); + return new MethodVisitorImpl(Opcodes.ASM5); + } + + @Override + public void visitInnerClass(String name, String outerName, String innerName, int access) { + if (!innerClasses.contains(name)) { + try { + innerClasses.add(name); + Class clazz = Class.forName(name.replace('/', '.')); + int n = name.lastIndexOf('/'); + String className = null; + if (n > -1) { + className = name.substring(n + 1, name.length()); + } + + InputStream classInputStream = clazz.getResourceAsStream(className + CLASS_EXTENSION); + new ClassReader(classInputStream).accept(this, 0); + classInputStream.close(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + // --------------------------------------------------------------------------------------------- + + public void addNameSpace(Set<String> names) { + for (String name : names) { + this.nameSpace.add(name.replace('.', '/')); + } + } + + private boolean checkUserDefine(String name) { + String[] ns = {}; + ns = nameSpace.toArray(ns); + + for (String s : ns) { + if (name.startsWith(s)) { + return true; + } + } + return false; + } + + private void addName(final String name) { + if (checkUserDefine(name)) { + packages.add(name); + } + } + + private void addInternalName(String name) { + addType(Type.getObjectType(name)); + } + + private void addInternalNames(String[] names) { + for (int i = 0; names != null && i < names.length; i++) { + addInternalName(names[i]); + } + } + + private void addDesc(String desc) { + addType(Type.getType(desc)); + } + + private void addMethodDesc(String desc) { + addType(Type.getReturnType(desc)); + Type[] types = Type.getArgumentTypes(desc); + for (int i = 0; i < types.length; i++) { + addType(types[i]); + } + } + + private void addType(Type t) { + switch (t.getSort()) { + case Type.ARRAY: + addType(t.getElementType()); + break; + case Type.OBJECT: + addName(t.getInternalName()); + break; + } + } + + private void addSignature(String signature) { + if (signature != null) { + new SignatureReader(signature).accept(new SignatureVisitorImpl(Opcodes.ASM5)); + } + } + + private void addTypeSignature(String signature) { + if (signature != null) { + new SignatureReader(signature).acceptType(new SignatureVisitorImpl(Opcodes.ASM5)); + } + } + + public class MethodVisitorImpl extends MethodVisitor { + + public MethodVisitorImpl(int api) { + super(api); + } + + @Override + public AnnotationVisitor visitParameterAnnotation(int parameter, String desc, boolean visible) { + addDesc(desc); + return new AnnotationVisitorImpl(Opcodes.ASM5); + } + + @Override + public void visitTypeInsn(int opcode, String type) { + addType(Type.getObjectType(type)); + } + + @Override + public void visitFieldInsn(int opcode, String owner, String name, String desc) { + addInternalName(owner); + addDesc(desc); + } + + @Override + public void visitMethodInsn(int opcode, String owner, String name, String desc) { + addInternalName(owner); + addMethodDesc(desc); + } + + @Override + public void visitMethodInsn(int opcode, String owner, String name, String desc, boolean itf) { + addInternalName(owner); + addMethodDesc(desc); + } + + @Override + public void visitLdcInsn(Object cst) { + if (cst instanceof Type) { + addType((Type) cst); + } + } + + @Override + public void visitMultiANewArrayInsn(String desc, int dims) { + addDesc(desc); + } + + @Override + public void visitLocalVariable(String name, String desc, String signature, Label start, Label end, int index) { + if (signature == null) { + addDesc(desc); + } else { + addTypeSignature(signature); + } + } + + @Override + public AnnotationVisitor visitAnnotationDefault() { + return new AnnotationVisitorImpl(Opcodes.ASM5); + } + + @Override + public AnnotationVisitor visitAnnotation(String desc, boolean visible) { + addDesc(desc); + return new AnnotationVisitorImpl(Opcodes.ASM5); + } + + @Override + public void visitTryCatchBlock(Label start, Label end, Label handler, String type) { + if (type != null) { + addInternalName(type); + } + } + } + + public class AnnotationVisitorImpl extends AnnotationVisitor { + public AnnotationVisitorImpl(int api) { + super(api); + } + + @Override + public void visit(String name, Object value) { + if (value instanceof Type) { + addType((Type) value); + } + } + + @Override + public void visitEnum(String name, String desc, String value) { + addDesc(desc); + } + + @Override + public AnnotationVisitor visitAnnotation(String name, String desc) { + addDesc(desc); + return this; + } + + @Override + public AnnotationVisitor visitArray(String name) { + return this; + } + } + + public class FieldVisitorImpl extends FieldVisitor { + + public FieldVisitorImpl(int api) { + super(api); + } + + @Override + public AnnotationVisitor visitTypeAnnotation(int typeRef, TypePath typePath, String desc, boolean visible) { + addDesc(desc); + return new AnnotationVisitorImpl(Opcodes.ASM5); + } + } + + public class SignatureVisitorImpl extends SignatureVisitor { + + private String signatureClassName; + + private boolean newParameter = false; + + public SignatureVisitorImpl(int api) { + super(api); + } + + @Override + public SignatureVisitor visitParameterType() { + this.newParameter = true; + return this; + } + + @Override + public SignatureVisitor visitReturnType() { + this.newParameter = true; + return this; + } + + @Override + public void visitClassType(String name) { + if (signatureClassName == null || this.newParameter == true) { + signatureClassName = name; + newParameter = false; + } + addInternalName(name); + } + + @Override + public void visitInnerClassType(String name) { + signatureClassName = signatureClassName + "$" + name; + addInternalName(signatureClassName); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java index b145d36..c55a9dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java @@ -19,11 +19,15 @@ package org.apache.flink.runtime.util; +import org.objectweb.asm.ClassReader; +import org.objectweb.asm.Opcodes; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; import java.util.Iterator; import java.util.Set; import java.util.jar.JarEntry; @@ -54,6 +58,11 @@ public class JarFileCreator { private final File outputFile; /** + * The namespace of the dependencies to be packaged. + */ + private final Set<String> packages = new HashSet<String>(); + + /** * Constructs a new jar file creator. * * @param outputFile @@ -70,9 +79,89 @@ public class JarFileCreator { * @param clazz * the class to be added to the jar file. */ - public synchronized void addClass(final Class<?> clazz) { + public synchronized JarFileCreator addClass(final Class<?> clazz) { this.classSet.add(clazz); + String name = clazz.getName(); + int n = name.lastIndexOf('.'); + if (n > -1) { + name = name.substring(0, n); + } + return addPackage(name); + } + + /** + * Manually specify the package of the dependencies. + * + * @param p + * the package to be included. + */ + public synchronized JarFileCreator addPackage(String p) { + this.packages.add(p); + return this; + } + + /** + * Manually specify the packages of the dependencies. + * + * @param packages + * the packages to be included. + */ + public synchronized JarFileCreator addPackages(String[] packages) { + for (String p : packages) { + addPackage(p); + } + return this; + } + + /** + * Add the dependencies within the given packages automatically. + * @throws IOException + * throw if an error occurs while read the class file. + */ + private synchronized void addDependencies() throws IOException { + List<String> dependencies = new ArrayList<String>(); + for (Class clazz : classSet) { + dependencies.add(clazz.getName()); + } + //Traverse the dependency tree using BFS. + int head = 0; + while (head != dependencies.size()) { + DependencyVisitor v = new DependencyVisitor(Opcodes.ASM5); + v.addNameSpace(this.packages); + InputStream classInputStream = null; + String name = dependencies.get(head); + try { + Class clazz = Class.forName(name); + int n = name.lastIndexOf('.'); + String className = null; + if (n > -1) { + className = name.substring(n + 1, name.length()); + } + classInputStream = clazz.getResourceAsStream(className + CLASS_EXTENSION); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getMessage()); + } + new ClassReader(classInputStream).accept(v, 0); + classInputStream.close(); + + //Update the BFS queue. + Set<String> classPackages = v.getPackages(); + for (String s : classPackages) { + if (!dependencies.contains(s.replace('/','.'))) { + dependencies.add(s.replace('/','.')); + } + } + head++; + } + + for (String dependency : dependencies) { + try { + this.classSet.add(Class.forName(dependency)); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getMessage()); + } + } } /** @@ -84,6 +173,8 @@ public class JarFileCreator { * thrown if an error occurs while writing to the output file */ public synchronized void createJarFile() throws IOException { + //Retrieve dependencies automatically + addDependencies(); // Temporary buffer for the stream copy final byte[] buf = new byte[128]; @@ -107,7 +198,14 @@ public class JarFileCreator { jos.putNextEntry(new JarEntry(entry)); - final InputStream classInputStream = clazz.getResourceAsStream(clazz.getSimpleName() + CLASS_EXTENSION); + String name = clazz.getName(); + int n = name.lastIndexOf('.'); + String className = null; + if (n > -1) { + className = name.substring(n + 1, name.length()); + } + //Using the part after last dot instead of class.getSimpleName() could resolve the problem of inner class. + final InputStream classInputStream = clazz.getResourceAsStream(className + CLASS_EXTENSION); int num = classInputStream.read(buf); while (num != -1) { @@ -118,7 +216,6 @@ public class JarFileCreator { classInputStream.close(); jos.closeEntry(); } - jos.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java new file mode 100644 index 0000000..a5f9de9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.util; + +import org.apache.flink.runtime.util.jartestprogram.WordCountWithAnonymousClass; +import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass; +import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass2; +import org.apache.flink.runtime.util.jartestprogram.WordCountWithInnerClass; +import org.junit.Assert; +import org.junit.Test; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.jar.JarInputStream; +import java.util.zip.ZipEntry; + + +public class JarFileCreatorTest { + + @Test + public void TestExternalClass() throws IOException { + File out = new File("/tmp/jarcreatortest1.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(WordCountWithExternalClass.class).createJarFile(); + + Set<String> ans = new HashSet<String>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class"); + JarInputStream jis = new JarInputStream(new FileInputStream(out)); + ZipEntry ze; + int count = 3; + while ((ze = jis.getNextEntry()) != null) { + count--; + ans.remove(ze.getName()); + } + Assert.assertTrue("Jar file for External Class is not correct", count == 0 && ans.size() == 0); + + out.delete(); + } + + @Test + public void TestInnerClass() throws IOException { + File out = new File("/tmp/jarcreatortest2.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(WordCountWithInnerClass.class).createJarFile(); + + Set<String> ans = new HashSet<String>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class"); + JarInputStream jis = new JarInputStream(new FileInputStream(out)); + ZipEntry ze; + int count = 3; + while ((ze = jis.getNextEntry()) != null) { + count--; + ans.remove(ze.getName()); + } + Assert.assertTrue("Jar file for Inner Class is not correct", count == 0 && ans.size() == 0); + + out.delete(); + } + + @Test + public void TestAnonymousClass() throws IOException { + File out = new File("/tmp/jarcreatortest3.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(WordCountWithAnonymousClass.class).createJarFile(); + + Set<String> ans = new HashSet<String>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass$1.class"); + JarInputStream jis = new JarInputStream(new FileInputStream(out)); + ZipEntry ze; + int count = 3; + while ((ze = jis.getNextEntry()) != null) { + count--; + ans.remove(ze.getName()); + } + Assert.assertTrue("Jar file for Anonymous Class is not correct", count == 0 && ans.size() == 0); + + out.delete(); + } + + @Test + public void TestExtendIdentifier() throws IOException { + File out = new File("/tmp/jarcreatortest4.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(WordCountWithExternalClass2.class).createJarFile(); + + Set<String> ans = new HashSet<String>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class"); + JarInputStream jis = new JarInputStream(new FileInputStream(out)); + ZipEntry ze; + int count = 4; + while ((ze = jis.getNextEntry()) != null) { + count--; + ans.remove(ze.getName()); + } + Assert.assertTrue("Jar file for Extend Identifier is not correct", count == 0 && ans.size() == 0); + + out.delete(); + } + + @Test + public void TestUDFPackage() throws IOException { + File out = new File("/tmp/jarcreatortest5.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(WordCountWithInnerClass.class) + .addPackage("org.apache.flink.util").createJarFile(); + + Set<String> ans = new HashSet<String>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class"); + ans.add("org/apache/flink/util/Collector.class"); + JarInputStream jis = new JarInputStream(new FileInputStream(out)); + ZipEntry ze; + int count = 4; + while ((ze = jis.getNextEntry()) != null) { + count--; + ans.remove(ze.getName()); + } + Assert.assertTrue("Jar file for UDF package is not correct", count == 0 && ans.size() == 0); + + out.delete(); + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java new file mode 100644 index 0000000..3c94684 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +public class ExternalTokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java new file mode 100644 index 0000000..f52c537 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + + +public class ExternalTokenizer2 extends ExternalTokenizer { + public void dummyMethod() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java new file mode 100644 index 0000000..543c62e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/StaticData.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + + +public class StaticData { + public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) { + + return env.fromElements( + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles," + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java new file mode 100644 index 0000000..b18cd5a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +public class WordCountWithAnonymousClass { + + public static void main(String[] args) throws Exception { + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<String> text = StaticData.getDefaultTextLineDataSet(env); + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + }) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result + counts.print(); + + // execute program + env.execute("WordCount Example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java new file mode 100644 index 0000000..87ce0cb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; + +public class WordCountWithExternalClass { + + public static void main(String[] args) throws Exception { + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<String> text = StaticData.getDefaultTextLineDataSet(env); + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new ExternalTokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result + counts.print(); + + // execute program + env.execute("WordCount Example"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java new file mode 100644 index 0000000..8568a7e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; + +public class WordCountWithExternalClass2 { + + public static void main(String[] args) throws Exception { + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<String> text = StaticData.getDefaultTextLineDataSet(env); + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new ExternalTokenizer2()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result + counts.print(); + + // execute program + env.execute("WordCount Example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c527a2b8/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java new file mode 100644 index 0000000..362fe33 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +public class WordCountWithInnerClass { + + public static void main(String[] args) throws Exception { + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<String> text = StaticData.getDefaultTextLineDataSet(env); + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result + counts.print(); + + // execute program + env.execute("WordCount Example"); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } +}