Updated Branches: refs/heads/master 013f2e19a -> 14f0c16b5
CRUNCH-269: Add option for disabling deep copies on intermediate outputs from DoFns. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/14f0c16b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/14f0c16b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/14f0c16b Branch: refs/heads/master Commit: 14f0c16b5bcf6496bedcf184d22c10462ababe7e Parents: 013f2e1 Author: Josh Wills <[email protected]> Authored: Fri Sep 20 16:33:08 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Fri Sep 20 16:33:08 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/DoFn.java | 13 ++++++++++ .../impl/mr/emit/IntermediateEmitter.java | 5 ++-- .../org/apache/crunch/impl/mr/run/RTNode.java | 8 ++++--- .../crunch/impl/mr/run/RuntimeParameters.java | 2 ++ .../impl/mr/emit/IntermediateEmitterTest.java | 25 ++++++++++++++++++-- 5 files changed, 46 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java index 6da89ef..6ae89a4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java @@ -119,6 +119,19 @@ public abstract class DoFn<S, T> implements Serializable { return 1.2f; } + /** + * By default, Crunch will do a defensive deep copy of the outputs of a + * DoFn when there are multiple downstream consumers of that item, in order to + * prevent the downstream functions from making concurrent modifications to + * data objects. This introduces some extra overhead in cases where you know + * that the downstream code is only reading the objects and not modifying it, + * so you can disable this feature by overriding this function to + * return {@code true}. + */ + public boolean disableDeepCopy() { + return false; + } + protected TaskInputOutputContext<?, ?, ?, ?> getContext() { return context; } http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java index b6df98b..955aed8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java @@ -39,13 +39,14 @@ public class IntermediateEmitter implements Emitter<Object> { private final PType<Object> outputPType; private final boolean needDetachedValues; - public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children, Configuration conf) { + public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children, Configuration conf, + boolean disableDeepCopy) { this.outputPType = outputPType; this.children = ImmutableList.copyOf(children); this.conf = conf; outputPType.initialize(conf); - needDetachedValues = this.children.size() > 1; + needDetachedValues = !disableDeepCopy && this.children.size() > 1; } public void emit(Object emitted) { http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java index fd7697c..f17beb0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -30,6 +30,8 @@ import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter; import org.apache.crunch.impl.mr.emit.OutputEmitter; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; public class RTNode implements Serializable { @@ -66,7 +68,6 @@ public class RTNode implements Serializable { // Already initialized return; } - fn.setContext(ctxt.getContext()); fn.initialize(); for (RTNode child : children) { @@ -81,8 +82,9 @@ public class RTNode implements Serializable { this.emitter = new OutputEmitter(outputConverter, ctxt.getContext()); } } else if (!children.isEmpty()) { - this.emitter = new IntermediateEmitter(outputPType, children, - ctxt.getContext().getConfiguration()); + Configuration conf = ctxt.getContext().getConfiguration(); + boolean disableDeepCopy = conf.getBoolean(RuntimeParameters.DISABLE_DEEP_COPY, false); + this.emitter = new IntermediateEmitter(outputPType, children, conf, disableDeepCopy || fn.disableDeepCopy()); } else { throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName); } http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 7dc8521..a8e8aff 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -34,6 +34,8 @@ public class RuntimeParameters { public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; + public static final String DISABLE_DEEP_COPY = "crunch.disable.deep.copy"; + // Not instantiated private RuntimeParameters() { } http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java index dd72364..0971211 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java @@ -50,7 +50,7 @@ public class IntermediateEmitterTest { public void testEmit_SingleChild() { RTNode singleChild = mock(RTNode.class); IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild), - new Configuration()); + new Configuration(), false); emitter.emit(stringWrapper); ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class); @@ -63,7 +63,7 @@ public class IntermediateEmitterTest { RTNode childA = mock(RTNode.class); RTNode childB = mock(RTNode.class); IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB), - new Configuration()); + new Configuration(), false); emitter.emit(stringWrapper); ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class); @@ -80,4 +80,25 @@ public class IntermediateEmitterTest { assertNotSame(stringWrapper, argumentCaptorB.getValue()); } + @Test + public void testEmit_MultipleChildrenDisableDeepCopy() { + RTNode childA = mock(RTNode.class); + RTNode childB = mock(RTNode.class); + IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB), + new Configuration(), true); + emitter.emit(stringWrapper); + + ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class); + ArgumentCaptor<StringWrapper> argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class); + + verify(childA).process(argumentCaptorA.capture()); + verify(childB).process(argumentCaptorB.capture()); + + assertEquals(stringWrapper, argumentCaptorA.getValue()); + assertEquals(stringWrapper, argumentCaptorB.getValue()); + + // Make sure that multiple children without deep copies are the same instance + assertSame(stringWrapper, argumentCaptorA.getValue()); + assertSame(stringWrapper, argumentCaptorB.getValue()); + } }
