Repository: apex-malhar Updated Branches: refs/heads/master 09a65c2f9 -> 8eb750053
APEXMALHAR-2481 support lambda expressions with high level API Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8eb75005 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8eb75005 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8eb75005 Branch: refs/heads/master Commit: 8eb7500537f1491ce35e5a647046c73519948803 Parents: 09a65c2 Author: Thomas Weise <[email protected]> Authored: Sat May 6 23:24:44 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Sat May 6 23:24:44 2017 -0700 ---------------------------------------------------------------------- .../apex/malhar/lib/function/Function.java | 2 +- .../malhar/lib/function/FunctionOperator.java | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8eb75005/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java index 0d43cd2..6d2226c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java @@ -35,7 +35,7 @@ import com.datatorrent.lib.util.KeyValPair; * @since 3.4.0 */ @InterfaceStability.Evolving -public interface Function +public interface Function extends java.io.Serializable { /** * If the {@link Function} implements this interface. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8eb75005/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java index b6190a0..6f06302 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java @@ -30,8 +30,11 @@ import org.apache.apex.malhar.lib.utils.ByteArrayClassLoader; import org.apache.apex.malhar.lib.utils.TupleUtil; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.classification.InterfaceStability; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader; import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter; import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes; @@ -55,7 +58,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato protected transient FUNCTION statelessF; - protected FUNCTION statefulF; + /** + * Kryo cannot handle fields that reference ({@link java.io.Serializable}) lambda classes. + * Wrap the reference to keep Kryo happy and delegate to Java serialization. + */ + @Bind(JavaSerializer.class) + protected final MutableObject<FUNCTION> statefulF = new MutableObject<>(); protected boolean stateful = false; @@ -75,7 +83,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato } else if (f instanceof Function.Stateful) { statelessF = f; } else { - statefulF = f; + statefulF.setValue(f); stateful = true; } } @@ -171,7 +179,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato private void readFunction() { try { - if (statelessF != null || statefulF != null) { + if (statelessF != null || statefulF.getValue() != null) { return; } DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass)); @@ -199,7 +207,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato { readFunction(); if (stateful) { - return statefulF; + return statefulF.getValue(); } else { return statelessF; } @@ -217,12 +225,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato public FUNCTION getStatefulF() { - return statefulF; + return statefulF.getValue(); } public void setStatefulF(FUNCTION statefulF) { - this.statefulF = statefulF; + this.statefulF.setValue(statefulF); } public boolean isStateful()
