Updated Branches: refs/heads/master dea3fd93e -> 4983a0ca8
CRUNCH-232: Ensure that all nodes are cleaned up during joins (or any Crunch job involving unions) Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4983a0ca Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4983a0ca Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4983a0ca Branch: refs/heads/master Commit: 4983a0ca85d814e29cf60d3588dc2a38ce3b5aa0 Parents: dea3fd9 Author: Josh Wills <[email protected]> Authored: Tue Jul 2 18:01:38 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Jul 2 18:01:38 2013 -0700 ---------------------------------------------------------------------- .../crunch/lib/join/MapsideJoinStrategyIT.java | 22 ++++++++--------- .../apache/crunch/impl/mr/run/CrunchMapper.java | 16 +++++-------- .../crunch/impl/mr/run/CrunchReducer.java | 17 ++++--------- .../crunch/impl/mr/run/CrunchTaskContext.java | 25 ++++++++++---------- .../org/apache/crunch/impl/mr/run/RTNode.java | 8 +++++-- 5 files changed, 41 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java index c459ad8..09723d2 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java @@ -30,7 +30,6 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.fn.FilterFns; -import org.apache.crunch.fn.MapValuesFn; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; @@ -72,14 +71,14 @@ public class MapsideJoinStrategyIT { } } - private static class CapOrdersFn extends MapValuesFn<Integer, String, String> { + private static class CapOrdersFn extends MapFn<String, String> { @Override public String map(String v) { return v.toUpperCase(); } } - private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>, String> { + private static class ConcatValuesFn extends MapFn<Pair<String, String>, String> { @Override public String map(Pair<String, String> v) { return v.toString(); @@ -109,7 +108,7 @@ public class MapsideJoinStrategyIT { .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType()); - MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(); + JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(); PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable, filteredOrderTable, JoinType.INNER_JOIN); List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize()); @@ -131,11 +130,11 @@ public class MapsideJoinStrategyIT { PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); - MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(); + JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(); PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.INNER_JOIN) - .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings())); + .mapValues("concat", new ConcatValuesFn(), Writables.strings()); - PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType()); + PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType()); PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN); @@ -163,11 +162,11 @@ public class MapsideJoinStrategyIT { PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); - MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(); + JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(); PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.LEFT_OUTER_JOIN) - .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings())); + .mapValues("concat", new ConcatValuesFn(), Writables.strings()); - PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType()); + PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType()); PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN); @@ -194,7 +193,8 @@ public class MapsideJoinStrategyIT { private PTable<Integer, String> readTable(Pipeline pipeline, String filename) { try { - return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable", new LineSplitter(), + return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable", + new LineSplitter(), Writables.tableOf(Writables.ints(), Writables.strings())); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java index 70f0b01..0e2ef38 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java @@ -17,12 +17,10 @@ */ package org.apache.crunch.impl.mr.run; -import java.io.IOException; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.crunch.CrunchRuntimeException; import org.apache.hadoop.mapreduce.Mapper; public class CrunchMapper extends Mapper<Object, Object, Object, Object> { @@ -35,21 +33,19 @@ public class CrunchMapper extends Mapper<Object, Object, Object, Object> { @Override protected void setup(Mapper<Object, Object, Object, Object>.Context context) { - List<RTNode> nodes; - this.ctxt = new CrunchTaskContext(context, NodeContext.MAP); - try { - nodes = ctxt.getNodes(); - } catch (IOException e) { - LOG.info("Crunch deserialization error", e); - throw new CrunchRuntimeException(e); + if (ctxt == null) { + ctxt = new CrunchTaskContext(context, NodeContext.MAP); + this.debug = ctxt.isDebugRun(); } + + List<RTNode> nodes = ctxt.getNodes(); if (nodes.size() == 1) { this.node = nodes.get(0); } else { CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit(); this.node = nodes.get(split.getNodeIndex()); } - this.debug = ctxt.isDebugRun(); + this.node.initialize(ctxt); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java index e5ddbd2..c3e3e3e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java @@ -17,12 +17,8 @@ */ package org.apache.crunch.impl.mr.run; -import java.io.IOException; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.impl.SingleUseIterable; import org.apache.hadoop.mapreduce.Reducer; @@ -40,15 +36,12 @@ public class CrunchReducer extends Reducer<Object, Object, Object, Object> { @Override protected void setup(Reducer<Object, Object, Object, Object>.Context context) { - this.ctxt = new CrunchTaskContext(context, getNodeContext()); - try { - List<RTNode> nodes = ctxt.getNodes(); - this.node = nodes.get(0); - } catch (IOException e) { - LOG.info("Crunch deserialization error", e); - throw new CrunchRuntimeException(e); + if (ctxt == null) { + this.ctxt = new CrunchTaskContext(context, getNodeContext()); + this.debug = ctxt.isDebugRun(); } - this.debug = ctxt.isDebugRun(); + this.node = ctxt.getNodes().get(0); + this.node.initialize(ctxt); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java index c4f2873..b81df05 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java @@ -32,11 +32,21 @@ class CrunchTaskContext { private final TaskInputOutputContext<Object, Object, Object, Object> taskContext; private final NodeContext nodeContext; + private final List<RTNode> nodes; private CrunchOutputs<Object, Object> multipleOutputs; - - public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, NodeContext nodeContext) { + + public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, + NodeContext nodeContext) { this.taskContext = taskContext; this.nodeContext = nodeContext; + Configuration conf = taskContext.getConfiguration(); + Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), + nodeContext.toString()); + try { + this.nodes = (List<RTNode>) DistCache.read(conf, path); + } catch (IOException e) { + throw new CrunchRuntimeException("Could not read runtime node information", e); + } } public TaskInputOutputContext<Object, Object, Object, Object> getContext() { @@ -47,16 +57,7 @@ class CrunchTaskContext { return nodeContext; } - public List<RTNode> getNodes() throws IOException { - Configuration conf = taskContext.getConfiguration(); - Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString()); - @SuppressWarnings("unchecked") - List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path); - if (nodes != null) { - for (RTNode node : nodes) { - node.initialize(this); - } - } + public List<RTNode> getNodes() { return nodes; } http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java index ce7b795..fd7697c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -45,9 +45,13 @@ public class RTNode implements Serializable { private transient Emitter<Object> emitter; - public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children, + public RTNode(DoFn<Object, Object> fn, + PType<Object> outputPType, + String name, + List<RTNode> children, Converter inputConverter, - Converter outputConverter, String outputName) { + Converter outputConverter, + String outputName) { this.fn = fn; this.outputPType = outputPType; this.nodeName = name;
