Updated Branches: refs/heads/master b0165faae -> 8b3cc0015
CRUNCH-90 - Handle object reuse in fused mappers Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8b3cc001 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8b3cc001 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8b3cc001 Branch: refs/heads/master Commit: 8b3cc0015164e33bb0edd5f88c6b7c767a4529ae Parents: b0165fa Author: Gabriel Reid <[email protected]> Authored: Sat Oct 6 14:34:16 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Sat Oct 6 14:34:16 2012 +0200 ---------------------------------------------------------------------- .../java/org/apache/crunch/MultipleOutputIT.java | 58 ++++++++++- .../crunch/impl/mr/emit/IntermediateEmitter.java | 15 +++- .../org/apache/crunch/impl/mr/plan/DoNode.java | 2 +- .../java/org/apache/crunch/impl/mr/run/RTNode.java | 8 +- .../impl/mr/emit/IntermediateEmitterTest.java | 80 +++++++++++++++ 5 files changed, 157 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java index 0d1f83f..1a85b6a 100644 --- a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java +++ b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java @@ -27,14 +27,18 @@ import java.util.List; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; +import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.crunch.types.writable.Writables; import org.junit.Rule; import org.junit.Test; +import com.google.common.collect.Lists; import com.google.common.io.Files; public class MultipleOutputIT { @@ -85,7 +89,8 @@ public class MultipleOutputIT { @Test public void testParallelDosFused() throws IOException { - PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); + PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), + WritableTypeFamily.getInstance()); // Ensure our multiple outputs were fused into a single job. assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size()); @@ -111,6 +116,57 @@ public class MultipleOutputIT { return result; } + /** + * Mutates the state of an input and then emits the mutated object. + */ + static class AppendFn extends DoFn<StringWrapper, StringWrapper> { + + private String value; + + public AppendFn(String value) { + this.value = value; + } + + @Override + public void process(StringWrapper input, Emitter<StringWrapper> emitter) { + input.setValue(input.getValue() + value); + emitter.emit(input); + } + + } + + /** + * Fusing multiple pipelines has a risk of running into object reuse bugs. + * This test verifies that mutating the state of an object that is passed + * through multiple streams of a pipeline doesn't allow one stream to affect + * another. + */ + @Test + public void testFusedMappersObjectReuseBug() throws IOException { + Pipeline pipeline = new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()); + PCollection<StringWrapper> stringWrappers = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")) + .parallelDo(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class)); + + PCollection<String> stringsA = stringWrappers.parallelDo(new AppendFn("A"), stringWrappers.getPType()) + .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings()); + PCollection<String> stringsB = stringWrappers.parallelDo(new AppendFn("B"), stringWrappers.getPType()) + .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings()); + + String outputA = tmpDir.getFileName("stringsA"); + String outputB = tmpDir.getFileName("stringsB"); + + pipeline.writeTextFile(stringsA, outputA); + pipeline.writeTextFile(stringsB, outputB); + PipelineResult pipelineResult = pipeline.done(); + + // Make sure fusing did actually occur + assertEquals(1, pipelineResult.getStageResults().size()); + + checkFileContents(outputA, Lists.newArrayList("cA", "dA", "aA")); + checkFileContents(outputB, Lists.newArrayList("cB", "dB", "aB")); + + } + private void checkFileContents(String filePath, List<String> expected) throws IOException { File outputFile = new File(filePath, "part-m-00000"); List<String> lines = Files.readLines(outputFile, Charset.defaultCharset()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java index 4c97c42..d609489 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.impl.mr.run.RTNode; +import org.apache.crunch.types.PType; import com.google.common.collect.ImmutableList; @@ -33,14 +34,24 @@ import com.google.common.collect.ImmutableList; public class IntermediateEmitter implements Emitter<Object> { private final List<RTNode> children; + private final PType<Object> outputPType; + private final boolean needDetachedValues; - public IntermediateEmitter(List<RTNode> children) { + public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children) { + this.outputPType = outputPType; this.children = ImmutableList.copyOf(children); + + outputPType.initialize(); + needDetachedValues = this.children.size() > 1; } public void emit(Object emitted) { for (RTNode child : children) { - child.process(emitted); + Object value = emitted; + if (needDetachedValues) { + value = this.outputPType.getDetachedValue(emitted); + } + child.process(value); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java index 236496b..865369c 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java @@ -140,7 +140,7 @@ public class DoNode { inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter(); } } - return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter, outputName); + return new RTNode(fn, (PType<Object>) getPType(), name, childRTNodes, inputConverter, outputConverter, outputName); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java index 0eb429a..4df989b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -28,6 +28,7 @@ import org.apache.crunch.impl.mr.emit.IntermediateEmitter; import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter; import org.apache.crunch.impl.mr.emit.OutputEmitter; import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; public class RTNode implements Serializable { @@ -35,6 +36,7 @@ public class RTNode implements Serializable { private final String nodeName; private DoFn<Object, Object> fn; + private PType<Object> outputPType; private final List<RTNode> children; private final Converter inputConverter; private final Converter outputConverter; @@ -42,9 +44,11 @@ public class RTNode implements Serializable { private transient Emitter<Object> emitter; - public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children, Converter inputConverter, + public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children, + Converter inputConverter, Converter outputConverter, String outputName) { this.fn = fn; + this.outputPType = outputPType; this.nodeName = name; this.children = children; this.inputConverter = inputConverter; @@ -70,7 +74,7 @@ public class RTNode implements Serializable { this.emitter = new OutputEmitter(outputConverter, ctxt.getContext()); } } else if (!children.isEmpty()) { - this.emitter = new IntermediateEmitter(children); + this.emitter = new IntermediateEmitter(outputPType, children); } else { throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java new file mode 100644 index 0000000..998e654 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.emit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import org.apache.crunch.impl.mr.run.RTNode; +import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.Avros; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.google.common.collect.Lists; + +public class IntermediateEmitterTest { + + private StringWrapper stringWrapper; + private PType ptype; + + @Before + public void setUp() { + stringWrapper = new StringWrapper("test"); + ptype = spy(Avros.reflects(StringWrapper.class)); + } + + @Test + public void testEmit_SingleChild() { + RTNode singleChild = mock(RTNode.class); + IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild)); + emitter.emit(stringWrapper); + + ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class); + verify(singleChild).process(argumentCaptor.capture()); + assertSame(stringWrapper, argumentCaptor.getValue()); + } + + @Test + public void testEmit_MultipleChildren() { + RTNode childA = mock(RTNode.class); + RTNode childB = mock(RTNode.class); + IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB)); + emitter.emit(stringWrapper); + + ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class); + ArgumentCaptor<StringWrapper> argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class); + + verify(childA).process(argumentCaptorA.capture()); + verify(childB).process(argumentCaptorB.capture()); + + assertEquals(stringWrapper, argumentCaptorA.getValue()); + assertEquals(stringWrapper, argumentCaptorB.getValue()); + + // Make sure that multiple children means deep copies are performed + assertNotSame(stringWrapper, argumentCaptorA.getValue()); + assertNotSame(stringWrapper, argumentCaptorB.getValue()); + } + +}
