Repository: crunch Updated Branches: refs/heads/master a32bd8fc9 -> 1b1e523e5
Patched the MSCRPlanner to correctly add dependencies between jobs planned in different stages. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1b1e523e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1b1e523e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1b1e523e Branch: refs/heads/master Commit: 1b1e523e5b8be2e094e98609aad97278dca6fdd9 Parents: a32bd8f Author: Marius Curelariu <[email protected]> Authored: Wed May 7 09:42:30 2014 +0300 Committer: Josh Wills <[email protected]> Committed: Wed May 7 20:47:27 2014 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/MultiStagePlanningIT.java | 100 +++++++++++++++++++ .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 20 +++- crunch-test/src/main/resources/addresses.txt | 4 + 3 files changed, 119 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1b1e523e/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java new file mode 100644 index 0000000..a7b7d48 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.base.Splitter; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.join.BloomFilterJoinStrategy; +import org.apache.crunch.lib.join.JoinType; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.junit.Rule; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Iterator; + +import static org.apache.crunch.types.avro.Avros.strings; +import static org.apache.crunch.types.avro.Avros.tableOf; +import static org.junit.Assert.assertTrue; + +public class MultiStagePlanningIT implements Serializable { + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testMultiStagePlanning() throws Exception { + Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration()); + + String customersFile = tmpDir.copyResourceFileName("customers.txt"); + String ordersFile = tmpDir.copyResourceFileName("orders.txt"); + String addressesFile = tmpDir.copyResourceFileName("addresses.txt"); + PTable<String, String> customersTable = pipeline.readTextFile(customersFile) + .parallelDo("Split customers", new StringToPairMapFn(), tableOf(strings(), strings())); + PTable<String, String> ordersTable = pipeline.readTextFile(ordersFile) + .parallelDo("Split orders", new StringToPairMapFn(), tableOf(strings(), strings())); + + PTable<String, String> assignedOrders = new BloomFilterJoinStrategy<String, String, String>(5) + .join(customersTable, ordersTable, JoinType.INNER_JOIN) + .parallelDo(new MapFn<Pair<String, Pair<String, String>>, Pair<String, String>>() { + @Override + public Pair<String, String> map(Pair<String, Pair<String, String>> input) { + return Pair.of(input.first(), input.second().second()); + } + }, tableOf(strings(), strings())); + + PTable<String, String> addressesTable = pipeline.readTextFile(addressesFile) + .parallelDo("Split addresses", new StringToPairMapFn(), tableOf(strings(), strings())) + .filter(new FilterFn<Pair<String, String>>() { + @Override + public boolean accept(Pair<String, String> input) { + // This is odd but it is the simpler way of simulating this would take longer than + // the other branch with the Bloom Filter ... + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + }); + addressesTable.materialize(); + + PTable<String, Pair<String, String>> orderAddresses = assignedOrders.join(addressesTable); + orderAddresses.materialize(); + + PipelineResult result = pipeline.run(); + assertTrue(result != null && result.succeeded()); + } + + private static class StringToPairMapFn extends MapFn<String, Pair<String, String>> { + private transient Splitter splitter; + + @Override + public void initialize() { + super.initialize(); + splitter = Splitter.on('|'); + } + + @Override + public Pair<String, String> map(String input) { + Iterator<String> split = splitter.split(input).iterator(); + return Pair.of(split.next(), split.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/1b1e523e/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index bce7010..72c431b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -18,14 +18,11 @@ package org.apache.crunch.impl.mr.plan; import java.io.IOException; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.impl.dist.collect.PCollectionImpl; @@ -41,6 +38,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.collect.ImmutableMultimap; public class MSCRPlanner { @@ -140,6 +138,7 @@ public class MSCRPlanner { } } + ImmutableMultimap<Target, JobPrototype> previousStages = ImmutableMultimap.copyOf(assignments); for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) { if (e.getKey().isOutput()) { PCollectionImpl<?> pcollect = e.getKey().getPCollection(); @@ -156,6 +155,17 @@ public class MSCRPlanner { for (Target t : outputs.get(pcollect)) { assignments.put(t, e.getValue()); } + } else { + Source source = e.getKey().getSource(); + if (source != null && source instanceof Target) { + JobPrototype current = e.getValue(); + Collection<JobPrototype> parentJobPrototypes = previousStages.get((Target) source); + if (parentJobPrototypes != null) { + for (JobPrototype parentJobProto : parentJobPrototypes) { + current.addDependency(parentJobProto); + } + } + } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1b1e523e/crunch-test/src/main/resources/addresses.txt ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/resources/addresses.txt b/crunch-test/src/main/resources/addresses.txt new file mode 100644 index 0000000..b4b2b5a --- /dev/null +++ b/crunch-test/src/main/resources/addresses.txt @@ -0,0 +1,4 @@ +111|First address +222|Second address +333|Third address +444|And the final fourth address \ No newline at end of file
