Updated Branches: refs/heads/master 92ea0592f -> 98458852a
CRUNCH-247: Enable the planner to take advantage of to-be-materialized outputs during job planning. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/98458852 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/98458852 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/98458852 Branch: refs/heads/master Commit: 98458852a7c6d774de7716eef855467c0df22c87 Parents: 92ea059 Author: Josh Wills <[email protected]> Authored: Tue Aug 6 15:25:22 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Aug 6 20:50:05 2013 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/DependentSourcesIT.java | 76 ++++++++++++++++++++ .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 9 ++- 2 files changed, 84 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/98458852/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java new file mode 100644 index 0000000..36bd7a7 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.apache.crunch.types.avro.Avros.strings; +import static org.apache.crunch.types.avro.Avros.tableOf; + +import java.util.List; + +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.MRJob; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.MRPipelineExecution; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.materialize.MaterializableIterable; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class DependentSourcesIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testRun() throws Exception { + run(new MRPipeline(DependentSourcesIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourcePath("shakes.txt"), + tmpDir.getFileName("out")); + } + + public static void run(MRPipeline p, Path inputPath, String out) throws Exception { + PCollection<String> in = p.read(From.textFile(inputPath)); + PTable<String, String> op = in.parallelDo("op1", new DoFn<String, Pair<String, String>>() { + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + if (input.length() > 5) { + emitter.emit(Pair.of(input.substring(0, 3), input)); + } + } + }, tableOf(strings(), strings())); + + SourceTarget src = (SourceTarget)((MaterializableIterable<Pair<String, String>>) op.materialize()).getSource(); + + op = op.parallelDo("op2", IdentityFn.<Pair<String,String>>getInstance(), tableOf(strings(), strings()), + ParallelDoOptions.builder().sourceTargets(src).build()); + + PCollection<String> output = op.values(); + output.write(To.textFile(out)); + MRPipelineExecution exec = p.runAsync(); + exec.waitUntilDone(); + List<MRJob> jobs = exec.getJobs(); + Assert.assertEquals(2, jobs.size()); + Assert.assertEquals(0, jobs.get(0).getJob().getNumReduceTasks()); + Assert.assertEquals(0, jobs.get(1).getJob().getNumReduceTasks()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/98458852/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index b5b37d7..5ad5ca1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -136,8 +136,15 @@ public class MSCRPlanner { } assignments.putAll(newAssignments); - // Remove completed outputs. + // Remove completed outputs and mark materialized output locations + // for subsequent job processing. for (PCollectionImpl<?> output : currentStage) { + if (toMaterialize.containsKey(output)) { + MaterializableIterable mi = toMaterialize.get(output); + if (mi.isSourceTarget()) { + output.materializeAt((SourceTarget) mi.getSource()); + } + } targetDeps.remove(output); } }
