Repository: apex-malhar Updated Branches: refs/heads/master a1c319ca7 -> a017dfaa4
APEXMALHAR-2220 Move the FunctionOperator to Malhar library Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/65488fd6 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/65488fd6 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/65488fd6 Branch: refs/heads/master Commit: 65488fd6e585ee18d084d05f5a44329eb62753f4 Parents: b43818b Author: Dongming Liang <[email protected]> Authored: Fri Nov 4 18:57:21 2016 -0700 Committer: Dongming Liang <[email protected]> Committed: Fri Nov 4 18:57:21 2016 -0700 ---------------------------------------------------------------------- .../malhar/stream/sample/MinimalWordCount.java | 2 +- .../malhar/stream/sample/WindowedWordCount.java | 2 +- .../stream/sample/complete/AutoComplete.java | 2 +- .../sample/complete/StreamingWordExtract.java | 2 +- .../sample/complete/TopWikipediaSessions.java | 2 +- .../stream/sample/complete/TrafficRoutes.java | 2 +- .../sample/complete/TwitterAutoComplete.java | 2 +- .../sample/cookbook/CombinePerKeyExamples.java | 2 +- .../stream/sample/cookbook/DeDupExample.java | 2 +- .../sample/cookbook/MaxPerKeyExamples.java | 2 +- .../stream/sample/cookbook/TriggerExample.java | 2 +- .../lib/function/AnnonymousClassModifier.java | 134 +++++++ .../apex/malhar/lib/function/Function.java | 87 +++++ .../malhar/lib/function/FunctionOperator.java | 378 +++++++++++++++++++ .../malhar/lib/utils/ByteArrayClassLoader.java | 54 +++ .../apache/apex/malhar/lib/utils/TupleUtil.java | 46 +++ .../apex/malhar/stream/api/ApexStream.java | 2 +- .../apex/malhar/stream/api/WindowedStream.java | 5 +- .../malhar/stream/api/function/Function.java | 88 ----- .../malhar/stream/api/impl/ApexStreamImpl.java | 6 +- .../stream/api/impl/ApexWindowedStreamImpl.java | 2 +- .../api/operator/AnnonymousClassModifier.java | 134 ------- .../api/operator/ByteArrayClassLoader.java | 54 --- .../stream/api/operator/FunctionOperator.java | 378 ------------------- .../apex/malhar/stream/api/util/TupleUtil.java | 46 --- .../FunctionOperator/FunctionOperatorTest.java | 4 +- .../stream/sample/ApplicationWithStreamAPI.java | 2 +- .../LocalTestWithoutStreamApplication.java | 2 +- .../apex/malhar/stream/sample/MyStream.java | 2 +- .../apex/malhar/stream/sample/MyStreamTest.java | 2 +- .../stream/sample/WordCountWithStreamAPI.java | 2 +- 31 files changed, 723 insertions(+), 727 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java index 327c882..160175f 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@ -22,10 +22,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java index 5b83bd0..6e57bfd 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@ -27,12 +27,12 @@ import java.util.HashMap; import java.util.Map; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java index 2db59b6..571a25f 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Window; @@ -36,7 +37,6 @@ import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java index 07f01d0..b5e491e 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java @@ -22,9 +22,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.Option; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java index 68ec733..b2b9ae4 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Window; @@ -34,7 +35,6 @@ import org.apache.apex.malhar.lib.window.accumulation.TopN; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java index e6a53d6..431263a 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; @@ -36,7 +37,6 @@ import org.apache.apex.malhar.lib.window.accumulation.Group; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java index 4fc80ea..f6829da 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Window; @@ -36,7 +37,6 @@ import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java index bfdb268..937476e 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -21,6 +21,7 @@ package org.apache.apex.malhar.stream.sample.cookbook; import java.util.ArrayList; import java.util.List; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; @@ -28,7 +29,6 @@ import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java index 4df5fe7..ab6b28b 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@ -23,12 +23,12 @@ import java.util.List; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates; import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java index 9fd9495..f28b96a 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java @@ -20,6 +20,7 @@ package org.apache.apex.malhar.stream.sample.cookbook; import java.util.List; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Window; @@ -28,7 +29,6 @@ import org.apache.apex.malhar.lib.window.accumulation.Max; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java index 962faa5..2fa7619 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java @@ -23,13 +23,13 @@ import java.util.Objects; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import com.datatorrent.lib.util.KeyValPair; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java b/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java new file mode 100644 index 0000000..c84d4bd --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.function; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor; +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute; +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor; +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldVisitor; +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.MethodVisitor; +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes; + +/** + * Because annonymous class serialization is not supported by default in most serialization library + * This class is used to modify the bytecode of annonymous at runtime. + * The limit for this is the annonymous class that is being modified must by stateless + * + * + * @since 3.4.0 + */ [email protected] +public class AnnonymousClassModifier extends ClassVisitor +{ + private String className; + + private boolean hasDefaultConstructor = false; + + public AnnonymousClassModifier(int i) + { + super(i); + } + + public AnnonymousClassModifier(int i, ClassVisitor classVisitor) + { + super(i, classVisitor); + } + + @Override + public void visit(int i, int i1, String s, String s1, String s2, String[] strings) + { + className = s; + super.visit(i, 33, s, s1, s2, strings); + } + + @Override + public void visitSource(String s, String s1) + { + super.visitSource(s, s1); + } + + @Override + public void visitOuterClass(String s, String s1, String s2) + { + // skip outer class, make it top level. For now only one level annonymous class + return; + } + + @Override + public AnnotationVisitor visitAnnotation(String s, boolean b) + { + return super.visitAnnotation(s, b); + } + + + @Override + public void visitAttribute(Attribute attribute) + { + super.visitAttribute(attribute); + } + + @Override + public void visitInnerClass(String s, String s1, String s2, int i) + { + if (s.equals(className)) { + return; + } + super.visitInnerClass(s, s1, s2, i); + } + + @Override + public FieldVisitor visitField(int i, String s, String s1, String s2, Object o) + { + return super.visitField(i, s, s1, s2, o); + } + + @Override + public MethodVisitor visitMethod(int i, String s, String s1, String s2, String[] strings) + { + //make the constructor public + int j = s.equals("<init>") ? i | Opcodes.ACC_PUBLIC : i; + if (s1.contains("()V")) { + hasDefaultConstructor = true; + } + + return super.visitMethod(i, s, s1, s2, strings); + } + + @Override + public void visitEnd() + { + + // If there is no default constructor, create one + if (!hasDefaultConstructor) { + MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null); + mv.visitVarInsn(Opcodes.ALOAD, 0); + mv.visitMethodInsn(Opcodes.INVOKESPECIAL, + "java/lang/Object", + "<init>", + "()V"); + mv.visitInsn(Opcodes.RETURN); + mv.visitMaxs(5, 1); + mv.visitEnd(); + } + + super.visitEnd(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java new file mode 100644 index 0000000..0d43cd2 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.function; + +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * The top level function interface <br> + * The function is wrapped by {@link FunctionOperator} <br> + * It takes input from input port of {@link FunctionOperator} ex. {@link FunctionOperator.MapFunctionOperator#input} <br> + * And the output will be emitted using {@link FunctionOperator#tupleOutput} <br> + * Anonymous function is not fully supported. It must be <b>stateless</b> should not be defined in any static context<br> + * If anonymous function does not working, you can should use top level function class<br> + * Top level function class should have public non-arg constructor + * + * @since 3.4.0 + */ [email protected] +public interface Function +{ + /** + * If the {@link Function} implements this interface. + * The state of the function will be checkpointed + */ + public static interface Stateful + { + + } + + /** + * An interface defines a one input one output transformation + * @param <I> + * @param <O> + */ + public static interface MapFunction<I, O> extends Function + { + O f(I input); + } + + /** + * A special map function to convert any pojo to key value pair datastructure + * @param <T> + * @param <K> + * @param <V> + */ + public static interface ToKeyValue<T, K, V> extends MapFunction<T, Tuple<KeyValPair<K, V>>> + { + + } + + /** + * An interface that defines flatmap transformation + * @param <I> + * @param <O> + */ + public static interface FlatMapFunction<I, O> extends MapFunction<I, Iterable<O>> + { + } + + /** + * An interface that defines filter transformation + * @param <T> + */ + public static interface FilterFunction<T> extends Function + { + boolean f(T input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java new file mode 100644 index 0000000..b6190a0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.function; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.apex.malhar.lib.utils.ByteArrayClassLoader; +import org.apache.apex.malhar.lib.utils.TupleUtil; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader; +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter; +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; + +/** + * Operators that wrap the functions + * + * @since 3.4.0 + */ [email protected] +public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator +{ + private byte[] annonymousFunctionClass; + + protected transient FUNCTION statelessF; + + protected FUNCTION statefulF; + + protected boolean stateful = false; + + protected boolean isAnnonymous = false; + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput = new DefaultOutputPort<>(); + + public FunctionOperator(FUNCTION f) + { + isAnnonymous = f.getClass().isAnonymousClass(); + if (isAnnonymous) { + annonymousFunctionClass = functionClassData(f); + } else if (f instanceof Function.Stateful) { + statelessF = f; + } else { + statefulF = f; + stateful = true; + } + } + + private byte[] functionClassData(Function f) + { + Class<? extends Function> classT = f.getClass(); + + byte[] classBytes = null; + byte[] classNameBytes = null; + String className = classT.getName(); + try { + classNameBytes = className.replace('.', '/').getBytes(); + classBytes = IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.', '/') + ".class")); + int cursor = 0; + for (int j = 0; j < classBytes.length; j++) { + if (classBytes[j] != classNameBytes[cursor]) { + cursor = 0; + } else { + cursor++; + } + + if (cursor == classNameBytes.length) { + for (int p = 0; p < classNameBytes.length; p++) { + if (classBytes[j - p] == '$') { + classBytes[j - p] = '_'; + } + } + cursor = 0; + } + } + ClassReader cr = new ClassReader(new ByteArrayInputStream(classBytes)); + ClassWriter cw = new ClassWriter(0); + AnnonymousClassModifier annonymousClassModifier = new AnnonymousClassModifier(Opcodes.ASM4, cw); + cr.accept(annonymousClassModifier, 0); + classBytes = cw.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + int dataLength = classNameBytes.length + 4 + 4; + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataLength); + DataOutputStream output = new DataOutputStream(byteArrayOutputStream); + + try { + output.writeInt(classNameBytes.length); + output.write(className.replace('$', '_').getBytes()); + output.writeInt(classBytes.length); + output.write(classBytes); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + output.flush(); + output.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + return byteArrayOutputStream.toByteArray(); + + } + + /** + * Default constructor to make kryo happy + */ + public FunctionOperator() + { + + } + + @Override + public void beginWindow(long l) + { + + } + + @Override + public void endWindow() + { + + } + + @Override + public void setup(Context.OperatorContext context) + { + readFunction(); + } + + + @SuppressWarnings("unchecked") + private void readFunction() + { + try { + if (statelessF != null || statefulF != null) { + return; + } + DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass)); + byte[] classNameBytes = new byte[input.readInt()]; + input.read(classNameBytes); + String className = new String(classNameBytes); + byte[] classData = new byte[input.readInt()]; + input.read(classData); + Map<String, byte[]> classBin = new HashMap<>(); + classBin.put(className, classData); + ByteArrayClassLoader byteArrayClassLoader = new ByteArrayClassLoader(classBin, Thread.currentThread().getContextClassLoader()); + statelessF = ((Class<FUNCTION>)byteArrayClassLoader.findClass(className)).newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void teardown() + { + + } + + public FUNCTION getFunction() + { + readFunction(); + if (stateful) { + return statefulF; + } else { + return statelessF; + } + } + + public FUNCTION getStatelessF() + { + return statelessF; + } + + public void setStatelessF(FUNCTION statelessF) + { + this.statelessF = statelessF; + } + + public FUNCTION getStatefulF() + { + return statefulF; + } + + public void setStatefulF(FUNCTION statefulF) + { + this.statefulF = statefulF; + } + + public boolean isStateful() + { + return stateful; + } + + public void setStateful(boolean stateful) + { + this.stateful = stateful; + } + + public boolean isAnnonymous() + { + return isAnnonymous; + } + + public void setIsAnnonymous(boolean isAnnonymous) + { + this.isAnnonymous = isAnnonymous; + } + + public static class MapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.MapFunction<IN, OUT>> + { + + public MapFunctionOperator() + { + + } + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() + { + @Override + public void process(IN t) + { + Function.MapFunction<IN, OUT> f = getFunction(); + output.emit(f.f(t)); + } + }; + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() + { + @Override + public void process(Tuple<IN> t) + { + Function.MapFunction<IN, OUT> f = getFunction(); + if (t instanceof Tuple.PlainTuple) { + TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, f.f(t.getValue())); + } else { + output.emit(f.f(t.getValue())); + } + } + }; + + public MapFunctionOperator(Function.MapFunction<IN, OUT> f) + { + super(f); + } + } + + public static class FlatMapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FlatMapFunction<IN, OUT>> + { + + public FlatMapFunctionOperator() + { + + } + + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() + { + @Override + public void process(IN t) + { + Function.FlatMapFunction<IN, OUT> f = getFunction(); + for (OUT out : f.f(t)) { + output.emit(out); + } + } + }; + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() + { + @Override + public void process(Tuple<IN> t) + { + Function.FlatMapFunction<IN, OUT> f = getFunction(); + if (t instanceof Tuple.PlainTuple) { + for (OUT out : f.f(t.getValue())) { + tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, out)); + } + } else { + for (OUT out : f.f(t.getValue())) { + output.emit(out); + } + } + } + }; + + public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f) + { + super(f); + } + } + + + public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>> + { + + public FilterFunctionOperator() + { + + } + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() + { + @Override + public void process(IN t) + { + Function.FilterFunction<IN> f = getFunction(); + // fold the value + if (f.f(t)) { + output.emit(t); + } + } + }; + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() + { + @Override + public void process(Tuple<IN> t) + { + Function.FilterFunction<IN> f = getFunction(); + if (f.f(t.getValue())) { + tupleOutput.emit(t); + } + + } + }; + + public FilterFunctionOperator(Function.FilterFunction<IN> f) + { + super(f); + } + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java new file mode 100644 index 0000000..9c1aa96 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils; + + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * @since 3.4.0 + */ [email protected] +public class ByteArrayClassLoader extends ClassLoader +{ + private final Map<String, byte[]> classes; + + public ByteArrayClassLoader(Map<String, byte[]> classes) + { + this.classes = classes; + } + + public ByteArrayClassLoader(Map<String, byte[]> classes, ClassLoader parent) + { + super(parent); + this.classes = classes; + } + + public Class findClass(String name) throws ClassNotFoundException + { + byte[] data = (byte[])((byte[])this.classes.get(name)); + if (data == null) { + throw new ClassNotFoundException(name); + } else { + return super.defineClass(name, data, 0, data.length); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java new file mode 100644 index 0000000..d2c25fe --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils; + +import org.apache.apex.malhar.lib.window.Tuple; + +/** + * The tuple util will be used to extract fields that are used as key or value<br> + * Or converting from data tuples to display tuples <br> + * Or generating watermark tuples <br> + * + * + * @since 3.4.0 + */ +public class TupleUtil +{ + + public static <T, O> Tuple.PlainTuple<O> buildOf(Tuple.PlainTuple<T> t, O newValue) + { + + if (t instanceof Tuple.WindowedTuple) { + Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple)t; + return new Tuple.WindowedTuple<>(windowedTuple.getWindows(), windowedTuple.getTimestamp(), newValue); + } else if (t instanceof Tuple.TimestampedTuple) { + return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue); + } else { + return new Tuple.PlainTuple<>(newValue); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java index 47f358f..c09efa5 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java @@ -22,9 +22,9 @@ import java.util.concurrent.Callable; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Attribute; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java index 0f5ce1e..554f5d6 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java @@ -22,6 +22,7 @@ import java.util.List; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.Accumulation; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; @@ -29,7 +30,6 @@ import org.apache.apex.malhar.lib.window.accumulation.FoldFn; import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.lib.util.KeyValPair; @@ -79,7 +79,6 @@ public interface WindowedStream<T> extends ApexStream<T> /** * Count tuples by the key<br> - * @param name name of the operator * @param convertToKeyValue The function convert plain tuple to k,v pair * @return new stream of Key Value Pair */ @@ -88,7 +87,6 @@ public interface WindowedStream<T> extends ApexStream<T> /** * Return top N tuples by the selected key * @param N how many tuples you want to keep - * @param name name of the operator * @param convertToKeyVal The function convert plain tuple to k,v pair * @return new stream of Key and top N tuple of the key */ @@ -97,7 +95,6 @@ public interface WindowedStream<T> extends ApexStream<T> /** * Return top N tuples of all tuples in the window * @param N - * @param name name of the operator * @return new stream of topN */ <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java deleted file mode 100644 index d516064..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.function; - -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.stream.api.operator.FunctionOperator; -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.lib.util.KeyValPair; - -/** - * The top level function interface <br> - * The function is wrapped by {@link FunctionOperator} <br> - * It takes input from input port of {@link FunctionOperator} ex. {@link FunctionOperator.MapFunctionOperator#input} <br> - * And the output will be emitted using {@link FunctionOperator#tupleOutput} <br> - * Anonymous function is not fully supported. It must be <b>stateless</b> should not be defined in any static context<br> - * If anonymous function does not working, you can should use top level function class<br> - * Top level function class should have public non-arg constructor - * - * @since 3.4.0 - */ [email protected] -public interface Function -{ - /** - * If the {@link Function} implements this interface. - * The state of the function will be checkpointed - */ - public static interface Stateful - { - - } - - /** - * An interface defines a one input one output transformation - * @param <I> - * @param <O> - */ - public static interface MapFunction<I, O> extends Function - { - O f(I input); - } - - /** - * A special map function to convert any pojo to key value pair datastructure - * @param <T> - * @param <K> - * @param <V> - */ - public static interface ToKeyValue<T, K, V> extends MapFunction<T, Tuple<KeyValPair<K, V>>> - { - - } - - /** - * An interface that defines flatmap transformation - * @param <I> - * @param <O> - */ - public static interface FlatMapFunction<I, O> extends MapFunction<I, Iterable<O>> - { - } - - /** - * An interface that defines filter transformation - * @param <T> - */ - public static interface FilterFunction<T> extends Function - { - boolean f(T input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java index ba399de..bb0f781 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java @@ -29,6 +29,9 @@ import java.util.concurrent.Callable; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.function.Function.FlatMapFunction; +import org.apache.apex.malhar.lib.function.FunctionOperator; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.WindowOption; @@ -36,9 +39,6 @@ import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.Option; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.function.Function.FlatMapFunction; -import org.apache.apex.malhar.stream.api.operator.FunctionOperator; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java index 5866a4c..3f08d8c 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java @@ -22,6 +22,7 @@ import java.util.List; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.Accumulation; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; @@ -40,7 +41,6 @@ import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.Option; import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java deleted file mode 100644 index b0fe3c5..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.operator; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor; -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute; -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor; -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldVisitor; -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.MethodVisitor; -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes; - -/** - * Because annonymous class serialization is not supported by default in most serialization library - * This class is used to modify the bytecode of annonymous at runtime. - * The limit for this is the annonymous class that is being modified must by stateless - * - * - * @since 3.4.0 - */ [email protected] -public class AnnonymousClassModifier extends ClassVisitor -{ - private String className; - - private boolean hasDefaultConstructor = false; - - public AnnonymousClassModifier(int i) - { - super(i); - } - - public AnnonymousClassModifier(int i, ClassVisitor classVisitor) - { - super(i, classVisitor); - } - - @Override - public void visit(int i, int i1, String s, String s1, String s2, String[] strings) - { - className = s; - super.visit(i, 33, s, s1, s2, strings); - } - - @Override - public void visitSource(String s, String s1) - { - super.visitSource(s, s1); - } - - @Override - public void visitOuterClass(String s, String s1, String s2) - { - // skip outer class, make it top level. For now only one level annonymous class - return; - } - - @Override - public AnnotationVisitor visitAnnotation(String s, boolean b) - { - return super.visitAnnotation(s, b); - } - - - @Override - public void visitAttribute(Attribute attribute) - { - super.visitAttribute(attribute); - } - - @Override - public void visitInnerClass(String s, String s1, String s2, int i) - { - if (s.equals(className)) { - return; - } - super.visitInnerClass(s, s1, s2, i); - } - - @Override - public FieldVisitor visitField(int i, String s, String s1, String s2, Object o) - { - return super.visitField(i, s, s1, s2, o); - } - - @Override - public MethodVisitor visitMethod(int i, String s, String s1, String s2, String[] strings) - { - //make the constructor public - int j = s.equals("<init>") ? i | Opcodes.ACC_PUBLIC : i; - if (s1.contains("()V")) { - hasDefaultConstructor = true; - } - - return super.visitMethod(i, s, s1, s2, strings); - } - - @Override - public void visitEnd() - { - - // If there is no default constructor, create one - if (!hasDefaultConstructor) { - MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null); - mv.visitVarInsn(Opcodes.ALOAD, 0); - mv.visitMethodInsn(Opcodes.INVOKESPECIAL, - "java/lang/Object", - "<init>", - "()V"); - mv.visitInsn(Opcodes.RETURN); - mv.visitMaxs(5, 1); - mv.visitEnd(); - } - - super.visitEnd(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java deleted file mode 100644 index 05a791c..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.operator; - - -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceStability; - -/** - * @since 3.4.0 - */ [email protected] -public class ByteArrayClassLoader extends ClassLoader -{ - private final Map<String, byte[]> classes; - - public ByteArrayClassLoader(Map<String, byte[]> classes) - { - this.classes = classes; - } - - public ByteArrayClassLoader(Map<String, byte[]> classes, ClassLoader parent) - { - super(parent); - this.classes = classes; - } - - protected Class findClass(String name) throws ClassNotFoundException - { - byte[] data = (byte[])((byte[])this.classes.get(name)); - if (data == null) { - throw new ClassNotFoundException(name); - } else { - return super.defineClass(name, data, 0, data.length); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java deleted file mode 100644 index 1e2066c..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.operator; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.util.TupleUtil; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.classification.InterfaceStability; - -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader; -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter; -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; - -/** - * Operators that wrap the functions - * - * @since 3.4.0 - */ [email protected] -public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator -{ - private byte[] annonymousFunctionClass; - - protected transient FUNCTION statelessF; - - protected FUNCTION statefulF; - - protected boolean stateful = false; - - protected boolean isAnnonymous = false; - - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>(); - - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput = new DefaultOutputPort<>(); - - public FunctionOperator(FUNCTION f) - { - isAnnonymous = f.getClass().isAnonymousClass(); - if (isAnnonymous) { - annonymousFunctionClass = functionClassData(f); - } else if (f instanceof Function.Stateful) { - statelessF = f; - } else { - statefulF = f; - stateful = true; - } - } - - private byte[] functionClassData(Function f) - { - Class<? extends Function> classT = f.getClass(); - - byte[] classBytes = null; - byte[] classNameBytes = null; - String className = classT.getName(); - try { - classNameBytes = className.replace('.', '/').getBytes(); - classBytes = IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.', '/') + ".class")); - int cursor = 0; - for (int j = 0; j < classBytes.length; j++) { - if (classBytes[j] != classNameBytes[cursor]) { - cursor = 0; - } else { - cursor++; - } - - if (cursor == classNameBytes.length) { - for (int p = 0; p < classNameBytes.length; p++) { - if (classBytes[j - p] == '$') { - classBytes[j - p] = '_'; - } - } - cursor = 0; - } - } - ClassReader cr = new ClassReader(new ByteArrayInputStream(classBytes)); - ClassWriter cw = new ClassWriter(0); - AnnonymousClassModifier annonymousClassModifier = new AnnonymousClassModifier(Opcodes.ASM4, cw); - cr.accept(annonymousClassModifier, 0); - classBytes = cw.toByteArray(); - } catch (IOException e) { - throw new RuntimeException(e); - } - int dataLength = classNameBytes.length + 4 + 4; - - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataLength); - DataOutputStream output = new DataOutputStream(byteArrayOutputStream); - - try { - output.writeInt(classNameBytes.length); - output.write(className.replace('$', '_').getBytes()); - output.writeInt(classBytes.length); - output.write(classBytes); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - try { - output.flush(); - output.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - return byteArrayOutputStream.toByteArray(); - - } - - /** - * Default constructor to make kryo happy - */ - public FunctionOperator() - { - - } - - @Override - public void beginWindow(long l) - { - - } - - @Override - public void endWindow() - { - - } - - @Override - public void setup(Context.OperatorContext context) - { - readFunction(); - } - - - @SuppressWarnings("unchecked") - private void readFunction() - { - try { - if (statelessF != null || statefulF != null) { - return; - } - DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass)); - byte[] classNameBytes = new byte[input.readInt()]; - input.read(classNameBytes); - String className = new String(classNameBytes); - byte[] classData = new byte[input.readInt()]; - input.read(classData); - Map<String, byte[]> classBin = new HashMap<>(); - classBin.put(className, classData); - ByteArrayClassLoader byteArrayClassLoader = new ByteArrayClassLoader(classBin, Thread.currentThread().getContextClassLoader()); - statelessF = ((Class<FUNCTION>)byteArrayClassLoader.findClass(className)).newInstance(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void teardown() - { - - } - - public FUNCTION getFunction() - { - readFunction(); - if (stateful) { - return statefulF; - } else { - return statelessF; - } - } - - public FUNCTION getStatelessF() - { - return statelessF; - } - - public void setStatelessF(FUNCTION statelessF) - { - this.statelessF = statelessF; - } - - public FUNCTION getStatefulF() - { - return statefulF; - } - - public void setStatefulF(FUNCTION statefulF) - { - this.statefulF = statefulF; - } - - public boolean isStateful() - { - return stateful; - } - - public void setStateful(boolean stateful) - { - this.stateful = stateful; - } - - public boolean isAnnonymous() - { - return isAnnonymous; - } - - public void setIsAnnonymous(boolean isAnnonymous) - { - this.isAnnonymous = isAnnonymous; - } - - public static class MapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.MapFunction<IN, OUT>> - { - - public MapFunctionOperator() - { - - } - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() - { - @Override - public void process(IN t) - { - Function.MapFunction<IN, OUT> f = getFunction(); - output.emit(f.f(t)); - } - }; - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() - { - @Override - public void process(Tuple<IN> t) - { - Function.MapFunction<IN, OUT> f = getFunction(); - if (t instanceof Tuple.PlainTuple) { - TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, f.f(t.getValue())); - } else { - output.emit(f.f(t.getValue())); - } - } - }; - - public MapFunctionOperator(Function.MapFunction<IN, OUT> f) - { - super(f); - } - } - - public static class FlatMapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FlatMapFunction<IN, OUT>> - { - - public FlatMapFunctionOperator() - { - - } - - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() - { - @Override - public void process(IN t) - { - Function.FlatMapFunction<IN, OUT> f = getFunction(); - for (OUT out : f.f(t)) { - output.emit(out); - } - } - }; - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() - { - @Override - public void process(Tuple<IN> t) - { - Function.FlatMapFunction<IN, OUT> f = getFunction(); - if (t instanceof Tuple.PlainTuple) { - for (OUT out : f.f(t.getValue())) { - tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, out)); - } - } else { - for (OUT out : f.f(t.getValue())) { - output.emit(out); - } - } - } - }; - - public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f) - { - super(f); - } - } - - - public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>> - { - - public FilterFunctionOperator() - { - - } - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() - { - @Override - public void process(IN t) - { - Function.FilterFunction<IN> f = getFunction(); - // fold the value - if (f.f(t)) { - output.emit(t); - } - } - }; - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() - { - @Override - public void process(Tuple<IN> t) - { - Function.FilterFunction<IN> f = getFunction(); - if (f.f(t.getValue())) { - tupleOutput.emit(t); - } - - } - }; - - public FilterFunctionOperator(Function.FilterFunction<IN> f) - { - super(f); - } - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java deleted file mode 100644 index f9a4ed8..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.util; - -import org.apache.apex.malhar.lib.window.Tuple; - -/** - * The tuple util will be used to extract fields that are used as key or value<br> - * Or converting from data tuples to display tuples <br> - * Or generating watermark tuples <br> - * - * - * @since 3.4.0 - */ -public class TupleUtil -{ - - public static <T, O> Tuple.PlainTuple<O> buildOf(Tuple.PlainTuple<T> t, O newValue) - { - - if (t instanceof Tuple.WindowedTuple) { - Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple)t; - return new Tuple.WindowedTuple<>(windowedTuple.getWindows(), windowedTuple.getTimestamp(), newValue); - } else if (t instanceof Tuple.TimestampedTuple) { - return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue); - } else { - return new Tuple.PlainTuple<>(newValue); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java index a5da669..c33d9af 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java @@ -26,10 +26,10 @@ import java.util.concurrent.Callable; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.function.FunctionOperator; import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.apex.malhar.stream.api.operator.FunctionOperator; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java index a39ff35..c64126c 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java @@ -22,11 +22,11 @@ import java.util.Arrays; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java index f46fb14..45a2363 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java @@ -27,10 +27,10 @@ import java.util.concurrent.Callable; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import com.datatorrent.lib.util.KeyValPair; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java index 20d7aed..ef6a88e 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java @@ -18,7 +18,7 @@ */ package org.apache.apex.malhar.stream.sample; -import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl; import com.datatorrent.api.DAG; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java index d912117..893f546 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java @@ -28,10 +28,10 @@ import org.joda.time.Duration; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl; import org.apache.apex.malhar.stream.api.impl.StreamFactory; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java index 11dabe4..c476055 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java @@ -22,11 +22,11 @@ import java.util.Arrays; import org.joda.time.Duration; +import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; import org.apache.hadoop.conf.Configuration;
