Repository: crunch Updated Branches: refs/heads/master f8cc90a75 -> f5d3858aa
CRUNCH-462: MemPipeline should verify that DoFns are Serializable Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f5d3858a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f5d3858a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f5d3858a Branch: refs/heads/master Commit: f5d3858aa57c71ef38835031218163a3d4121381 Parents: f8cc90a Author: Josh Wills <[email protected]> Authored: Mon Aug 11 11:41:02 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Aug 11 11:41:02 2014 -0700 ---------------------------------------------------------------------- .../apache/crunch/impl/mem/collect/MemCollection.java | 13 +++++++++++++ .../java/org/apache/crunch/impl/mem/CountersTest.java | 3 ++- 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f5d3858a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index becee88..eaaab59 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -25,6 +25,8 @@ import javassist.util.proxy.MethodFilter; import javassist.util.proxy.MethodHandler; import javassist.util.proxy.ProxyFactory; +import org.apache.commons.lang.SerializationException; +import org.apache.commons.lang.SerializationUtils; import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; import org.apache.crunch.DoFn; @@ -101,6 +103,16 @@ public class MemCollection<S> implements PCollection<S> { return new MemCollection<S>(output, collections[0].getPType()); } + private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T> doFn) { + try { + return (DoFn<S, T>) SerializationUtils.deserialize(SerializationUtils.serialize(doFn)); + } catch (SerializationException e) { + throw new IllegalStateException( + doFn.getClass().getSimpleName() + " named '" + name + "' cannot be serialized", + e); + } + } + @Override public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type) { return parallelDo(null, doFn, type); @@ -114,6 +126,7 @@ public class MemCollection<S> implements PCollection<S> { @Override public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type, ParallelDoOptions options) { + doFn = verifySerializable(name, doFn); InMemoryEmitter<T> emitter = new InMemoryEmitter<T>(); Configuration conf = getPipeline().getConfiguration(); doFn.configure(conf); http://git-wip-us.apache.org/repos/asf/crunch/blob/f5d3858a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java index 6b3d0fd..78acb2c 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java @@ -24,9 +24,10 @@ import org.apache.crunch.impl.mem.collect.MemCollection; import org.apache.crunch.types.writable.Writables; import org.junit.Test; +import java.io.Serializable; import java.util.Arrays; -public class CountersTest { +public class CountersTest implements Serializable { @Test public void counterTest() throws Exception {
