Repository: crunch Updated Branches: refs/heads/master b4da23b26 -> 387068d4e
CRUNCH-571: Scrunch functions fail serialization check in the REPL Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/387068d4 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/387068d4 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/387068d4 Branch: refs/heads/master Commit: 387068d4e68b4b22116af8392418353effd1bb5b Parents: b4da23b Author: Tom White <[email protected]> Authored: Mon Oct 19 10:21:36 2015 +0100 Committer: Tom White <[email protected]> Committed: Mon Oct 19 10:21:36 2015 +0100 ---------------------------------------------------------------------- .../crunch/impl/mem/collect/MemCollection.java | 54 +++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/387068d4/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 05bff3f..e5f04d5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -17,6 +17,11 @@ */ package org.apache.crunch.impl.mem.collect; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; import java.lang.reflect.Method; import java.util.Collection; import java.util.Set; @@ -103,7 +108,7 @@ public class MemCollection<S> implements PCollection<S> { private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T> doFn) { try { - return (DoFn<S, T>) SerializationUtils.deserialize(SerializationUtils.serialize(doFn)); + return (DoFn<S, T>) deserialize(SerializationUtils.serialize(doFn)); } catch (SerializationException e) { throw new IllegalStateException( doFn.getClass().getSimpleName() + " named '" + name + "' cannot be serialized", @@ -111,6 +116,53 @@ public class MemCollection<S> implements PCollection<S> { } } + // Use a custom deserialize implementation (not SerializationUtils) so we can fall back + // to using the thread context classloader, which is needed when running Scrunch in + // the Scala REPL + private static Object deserialize(InputStream inputStream) { + if (inputStream == null) { + throw new IllegalArgumentException("The InputStream must not be null"); + } + ObjectInputStream in = null; + try { + // stream closed in the finally + in = new ObjectInputStream(inputStream) { + @Override + protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, + ClassNotFoundException { + try { + return super.resolveClass(desc); + } catch (ClassNotFoundException e) { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + return Class.forName(desc.getName(), false, cl); + } + } + }; + return in.readObject(); + + } catch (ClassNotFoundException ex) { + throw new SerializationException(ex); + } catch (IOException ex) { + throw new SerializationException(ex); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException ex) { + // ignore close exception + } + } + } + + private static Object deserialize(byte[] objectData) { + if (objectData == null) { + throw new IllegalArgumentException("The byte[] must not be null"); + } + ByteArrayInputStream bais = new ByteArrayInputStream(objectData); + return deserialize(bais); + } + @Override public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type) { return parallelDo(null, doFn, type);
