CRUNCH-449: Add a new PipelineCallable and sequentialDo method for inserting non-Crunch tasks into Crunch workflows.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a5c59276 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a5c59276 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a5c59276 Branch: refs/heads/apache-crunch-0.8 Commit: a5c5927683ab1d7ebe11138246b4a247e2ad0155 Parents: b6791f8 Author: Josh Wills <[email protected]> Authored: Fri Jun 20 22:07:15 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Sat Aug 2 09:15:34 2014 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/PipelineCallableIT.java | 104 ++++++++ .../java/org/apache/crunch/PCollection.java | 12 + .../org/apache/crunch/ParallelDoOptions.java | 35 ++- .../main/java/org/apache/crunch/Pipeline.java | 23 ++ .../org/apache/crunch/PipelineCallable.java | 244 +++++++++++++++++++ .../lib/jobcontrol/CrunchControlledJob.java | 13 +- .../lib/jobcontrol/CrunchJobControl.java | 89 ++++++- .../crunch/impl/dist/DistributedPipeline.java | 41 +++- .../impl/dist/collect/BaseGroupedTable.java | 5 +- .../impl/dist/collect/BaseInputCollection.java | 11 + .../impl/dist/collect/BaseInputTable.java | 17 +- .../impl/dist/collect/PCollectionFactory.java | 10 +- .../impl/dist/collect/PCollectionImpl.java | 38 ++- .../crunch/impl/dist/collect/PTableBase.java | 4 +- .../org/apache/crunch/impl/mem/MemPipeline.java | 14 ++ .../crunch/impl/mem/collect/MemCollection.java | 7 + .../org/apache/crunch/impl/mr/MRPipeline.java | 3 +- .../crunch/impl/mr/collect/InputCollection.java | 5 +- .../crunch/impl/mr/collect/InputTable.java | 5 +- .../impl/mr/collect/MRCollectionFactory.java | 14 +- .../apache/crunch/impl/mr/exec/MRExecutor.java | 55 +++-- .../crunch/impl/mr/plan/JobPrototype.java | 5 +- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 20 +- .../lib/jobcontrol/CrunchJobControlTest.java | 84 ++++++- .../apache/crunch/scrunch/PCollectionLike.scala | 8 + .../apache/crunch/scrunch/PipelineLike.scala | 4 + .../apache/crunch/SparkPipelineCallableIT.java | 99 ++++++++ .../apache/crunch/impl/spark/SparkPipeline.java | 3 +- .../apache/crunch/impl/spark/SparkRuntime.java | 89 ++++++- .../impl/spark/collect/InputCollection.java | 7 +- .../crunch/impl/spark/collect/InputTable.java | 5 +- .../impl/spark/collect/SparkCollectFactory.java | 14 +- 32 files changed, 995 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java new file mode 100644 index 0000000..b4fc19e --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.ImmutableMap; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; + +public class PipelineCallableIT { + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testMRShakes() throws Exception { + run(new MRPipeline(PipelineCallableIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("shakes.txt"), false /* fail */); + } + + @Test + public void testFailure() throws Exception { + run(new MRPipeline(PipelineCallableIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("shakes.txt"), true /* fail */); + } + + public static int INC1 = 0; + public static int INC2 = 0; + + public static void run(Pipeline p, final String input, final boolean fail) { + + PTable<String, Long> top3 = p.sequentialDo(new PipelineCallable<PCollection<String>>() { + @Override + public Status call() { + INC1 = 17; + return fail ? Status.FAILURE : Status.SUCCESS; + } + + @Override + public PCollection<String> getOutput(Pipeline pipeline) { + return pipeline.readTextFile(input); + } + }.named("first")) + .sequentialDo("onInput", new PipelineCallable<PCollection<String>>() { + @Override + protected PCollection<String> getOutput(Pipeline pipeline) { + return getOnlyPCollection(); + } + + @Override + public Status call() throws Exception { + return Status.SUCCESS; + } + }) + .count() + .sequentialDo("label", new PipelineCallable<PTable<String, Long>>() { + @Override + public Status call() { + INC2 = 29; + if (getPCollection("label") != null) { + return Status.SUCCESS; + } + return Status.FAILURE; + } + + @Override + public PTable<String, Long> getOutput(Pipeline pipeline) { + return (PTable<String, Long>) getOnlyPCollection(); + } + }.named("second")) + .top(3); + + if (fail) { + assertFalse(p.run().succeeded()); + } else { + Map<String, Long> counts = top3.materializeToMap(); + assertEquals(ImmutableMap.of("", 788L, "Enter Macbeth.", 7L, "Exeunt.", 21L), counts); + assertEquals(17, INC1); + assertEquals(29, INC2); + } + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java index 878fbb9..a1d507a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java @@ -188,6 +188,18 @@ public interface PCollection<S> { PObject<S> first(); /** + * Adds the materialized data in this {@code PCollection} as a dependency to the given + * {@code PipelineCallable} and registers it with the {@code Pipeline} associated with this + * instance. + * + * @param label the label to use inside of the PipelineCallable for referencing this PCollection + * @param pipelineCallable the function itself + * + * @return The value of the {@code getOutput} function on the given argument. + */ + <Output> Output sequentialDo(String label, PipelineCallable<Output> pipelineCallable); + + /** * @return A reference to the data in this instance that can be read from a job running * on a cluster. * http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java index 65d0df2..24abf90 100644 --- a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java +++ b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java @@ -34,18 +34,21 @@ import org.apache.hadoop.conf.Configuration; * that require reading a file from the filesystem into a {@code DoFn}. */ public class ParallelDoOptions { - private final Set<SourceTarget<?>> sourceTargets; + private final Set targets; private final Map<String, String> extraConf; - private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets, Map<String, String> extraConf) { - this.sourceTargets = sourceTargets; + private ParallelDoOptions(Set<Target> targets, Map<String, String> extraConf) { + this.targets = targets; this.extraConf = extraConf; } - + + @Deprecated public Set<SourceTarget<?>> getSourceTargets() { - return sourceTargets; + return (Set<SourceTarget<?>>) targets; } + public Set<Target> getTargets() { return targets; } + /** * Applies the key-value pairs that were associated with this instance to the given {@code Configuration} * object. This is called just before the {@code configure} method on the {@code DoFn} corresponding to this @@ -62,11 +65,11 @@ public class ParallelDoOptions { } public static class Builder { - private Set<SourceTarget<?>> sourceTargets; + private Set<Target> targets; private Map<String, String> extraConf; public Builder() { - this.sourceTargets = Sets.newHashSet(); + this.targets = Sets.newHashSet(); this.extraConf = Maps.newHashMap(); } @@ -78,19 +81,29 @@ public class ParallelDoOptions { for (Source<?> src : sources) { // Only SourceTargets need to be checked for materialization if (src instanceof SourceTarget) { - sourceTargets.add((SourceTarget) src); + targets.add((SourceTarget) src); } } return this; } public Builder sourceTargets(SourceTarget<?>... sourceTargets) { - Collections.addAll(this.sourceTargets, sourceTargets); + Collections.addAll(this.targets, sourceTargets); return this; } public Builder sourceTargets(Collection<SourceTarget<?>> sourceTargets) { - this.sourceTargets.addAll(sourceTargets); + this.targets.addAll(sourceTargets); + return this; + } + + public Builder targets(Target... targets) { + Collections.addAll(this.targets, targets); + return this; + } + + public Builder targets(Collection<Target> targets) { + this.targets.addAll(targets); return this; } @@ -107,7 +120,7 @@ public class ParallelDoOptions { } public ParallelDoOptions build() { - return new ParallelDoOptions(sourceTargets, extraConf); + return new ParallelDoOptions(targets, extraConf); } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/Pipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java index f34d0ef..cd3f3f6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java @@ -111,11 +111,34 @@ public interface Pipeline { */ <T> void cache(PCollection<T> pcollection, CachingOptions options); + /** + * Creates an empty {@code PCollection} of the given {@code PType}. + * + * @param ptype The PType of the empty PCollection + * @return A valid PCollection with no contents + */ <T> PCollection<T> emptyPCollection(PType<T> ptype); + /** + * Creates an empty {@code PTable} of the given {@code PTable Type}. + * + * @param ptype The PTableType of the empty PTable + * @return A valid PTable with no contents + */ <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype); /** + * Executes the given {@code PipelineCallable} on the client after the {@code Targets} + * that the PipelineCallable depends on (if any) have been created by other pipeline + * processing steps. + * + * @param pipelineCallable The sequential logic to execute + * @param <Output> The return type of the PipelineCallable + * @return The result of executing the PipelineCallable + */ + <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable); + + /** * Constructs and executes a series of MapReduce jobs in order to write data * to the output targets. */ http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java b/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java new file mode 100644 index 0000000..e1b16aa --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import parquet.Preconditions; + +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * A specialization of {@code Callable} that executes some sequential logic on the client machine as + * part of an overall Crunch pipeline in order to generate zero or more outputs, some of + * which may be {@code PCollection} instances that are processed by other jobs in the + * pipeline. + * + * <p>{@code PipelineCallable} is intended to be used to inject auxiliary logic into the control + * flow of a Crunch pipeline. This can be used for a number of purposes, such as importing or + * exporting data into a cluster using Apache Sqoop, executing a legacy MapReduce job + * or Pig/Hive script within a Crunch pipeline, or sending emails or status notifications + * about the status of a long-running pipeline during its execution.</p> + * + * <p>The Crunch planner needs to know three things about a {@code PipelineCallable} instance in order + * to manage it: + * <ol> + * <li>The {@code Target} and {@code PCollection} instances that must have been materialized + * before this instance is allowed to run. This information should be specified via the {@code dependsOn} + * methods of the class.</li> + * <li>What Outputs will be created after this instance is executed, if any. These outputs may be + * new {@code PCollection} instances that are used as inputs in other Crunch jobs. These outputs should + * be specified by the {@code getOutput(Pipeline)} method of the class, which will be executed immediately + * after this instance is registered with the {@link Pipeline#sequentialDo} method.</li> + * <li>The actual logic to execute when the dependent Targets and PCollections have been created in + * order to materialize the output data. This is defined in the {@code call} method of the class.</li> + * </ol> + * </p> + * + * <p>If a given PipelineCallable does not have any dependencies, it will be executed before any jobs are run + * by the planner. After that, the planner will keep track of when the dependencies of a given instance + * have been materialized, and then execute the instance as soon as they all exist. The Crunch planner + * uses a thread pool executor to run multiple {@code PipelineCallable} instances simultaneously, but you can + * indicate that an instance should be run by itself by overriding the {@code boolean runSingleThreaded()} method + * below to return true.</p> + * + * <p>The {@code call} method returns a {@code Status} to indicate whether it succeeded or failed. A failed + * instance, or any exceptions/errors thrown by the call method, will cause the overall Crunch pipeline containing + * this instance to fail.</p> + * + * <p>A number of helper methods for accessing the dependent Target/PCollection instances that this instance + * needs to exist, as well as the {@code Configuration} instance for the overall Pipeline execution, are available + * as protected methods in this class so that they may be accessed from implementations of {@code PipelineCallable} + * within the {@code call} method. + * </p> + * @param <Output> the output value returned by this instance (Void, PCollection, Pair<PCollection, PCollection>, + * etc. + */ +public abstract class PipelineCallable<Output> implements Callable<PipelineCallable.Status> { + + private static final Log LOG = LogFactory.getLog(PipelineCallable.class); + + public enum Status { SUCCESS, FAILURE }; + + private String name; + private String message; + + private Map<String, Target> namedTargets = Maps.newHashMap(); + private Map<String, PCollection<?>> namedPCollections = Maps.newHashMap(); + private Configuration conf; + + private boolean outputsGenerated = false; + + /** + * Clients should override this method to define the outputs that will exist after this instance is + * executed. These may be PCollections, PObjects, or nothing (which can be indicated with Java's {@code Void} + * type and a null value. + * + * @param pipeline The pipeline that is managing the execution of this instance + */ + protected abstract Output getOutput(Pipeline pipeline); + + /** + * Override this method to indicate to the planner that this instance should not be run at the + * same time as any other {@code PipelineCallable} instances. + * + * @return true if this instance should run by itself, false otherwise + */ + public boolean runSingleThreaded() { + return false; + } + + /** + * Requires that the given {@code Target} exists before this instance may be + * executed. + * + * @param label A string that can be used to retrieve the given Target inside of the {@code call} method. + * @param t the {@code Target} itself + * @return this instance + */ + public PipelineCallable<Output> dependsOn(String label, Target t) { + Preconditions.checkNotNull(label, "label"); + if (outputsGenerated) { + throw new IllegalStateException( + "Dependencies may not be added to a PipelineCallable after its outputs have been generated"); + } + if (namedTargets.containsKey(label)) { + throw new IllegalStateException("Label " + label + " cannot be reused for multiple targets"); + } + this.namedTargets.put(label, t); + return this; + } + + /** + * Requires that the given {@code PCollection} be materialized to disk before this instance may be + * executed. + * + * @param label A string that can be used to retrieve the given PCollection inside of the {@code call} method. + * @param pcollect the {@code PCollection} itself + * @return this instance + */ + public PipelineCallable<Output> dependsOn(String label, PCollection<?> pcollect) { + Preconditions.checkNotNull(label, "label"); + if (outputsGenerated) { + throw new IllegalStateException( + "Dependencies may not be added to a PipelineCallable after its outputs have been generated"); + } + if (namedPCollections.containsKey(label)) { + throw new IllegalStateException("Label " + label + " cannot be reused for multiple PCollections"); + } + this.namedPCollections.put(label, pcollect); + return this; + } + + /** + * Called by the {@code Pipeline} when this instance is registered with {@code Pipeline#sequentialDo}. In general, + * clients should override the protected {@code getOutput(Pipeline)} method instead of this one. + */ + public Output generateOutput(Pipeline pipeline) { + if (outputsGenerated == true) { + throw new IllegalStateException("PipelineCallable.generateOutput should only be called once"); + } + outputsGenerated = true; + this.conf = pipeline.getConfiguration(); + return getOutput(pipeline); + } + + /** + * Returns the name of this instance. + */ + public String getName() { + return name == null ? this.getClass().getName() : name; + } + + /** + * Use the given name to identify this instance in the logs. + */ + public PipelineCallable<Output> named(String name) { + this.name = name; + return this; + } + + /** + * Returns a message associated with this callable's execution, especially in case of errors. + */ + public String getMessage() { + if (message == null) { + LOG.warn("No message specified for PipelineCallable instance \"" + getName() + + "\". Consider overriding PipelineCallable.getMessage()"); + return toString(); + } + return message; + } + + /** + * Sets a message associated with this callable's execution, especially in case of errors. + */ + public void setMessage(String message) { + this.message = message; + } + + /** + * The {@code Configuration} instance for the {@code Pipeline} this callable is registered with. + */ + protected Configuration getConfiguration() { + return conf; + } + + /** + * Returns the {@code Target} associated with the given label in the dependencies list, + * or null if no such target exists. + */ + protected Target getTarget(String label) { + return namedTargets.get(label); + } + + /** + * Returns the {@code PCollection} associated with the given label in the dependencies list, + * or null if no such instance exists. + */ + protected PCollection getPCollection(String label) { + return namedPCollections.get(label); + } + + /** + * Returns the only PCollection this instance depends on. Only valid in the case that this callable + * has precisely one dependency. + */ + protected PCollection getOnlyPCollection() { + return Iterables.getOnlyElement(namedPCollections.values()); + } + + /** + * Returns the mapping of labels to PCollection dependencies for this instance. + */ + public Map<String, PCollection<?>> getAllPCollections() { + return ImmutableMap.copyOf(namedPCollections); + } + + /** + * Returns the mapping of labels to Target dependencies for this instance. + */ + public Map<String, Target> getAllTargets() { + return ImmutableMap.copyOf(namedTargets); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index 06d886d..ce80691 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -19,9 +19,11 @@ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.plan.JobNameBuilder; import org.apache.crunch.impl.mr.run.RuntimeParameters; @@ -55,6 +57,7 @@ public class CrunchControlledJob implements MRJob { private final int jobID; private final Job job; // mapreduce job to be executed. private final JobNameBuilder jobNameBuilder; + private final Set<Target> allTargets; // the jobs the current job depends on private final List<CrunchControlledJob> dependingJobs; @@ -77,15 +80,21 @@ public class CrunchControlledJob implements MRJob { * an ID used to match with its {@link org.apache.crunch.impl.mr.plan.JobPrototype}. * @param job * a mapreduce job to be executed. + * @param jobNameBuilder + * code for generating a name for the executed MapReduce job. + * @param allTargets + * the set of Targets that will exist after this job completes successfully. * @param prepareHook * a piece of code that will run before this job is submitted. * @param completionHook * a piece of code that will run after this job gets completed. */ - public CrunchControlledJob(int jobID, Job job, JobNameBuilder jobNameBuilder, Hook prepareHook, Hook completionHook) { + public CrunchControlledJob(int jobID, Job job, JobNameBuilder jobNameBuilder, Set<Target> allTargets, + Hook prepareHook, Hook completionHook) { this.jobID = jobID; this.job = job; this.jobNameBuilder = jobNameBuilder; + this.allTargets = allTargets; this.dependingJobs = Lists.newArrayList(); this.prepareHook = prepareHook; this.completionHook = completionHook; @@ -160,6 +169,8 @@ public class CrunchControlledJob implements MRJob { return counters; } + public Set<Target> getAllTargets() { return allTargets; } + @Override public synchronized Job getJob() { return this.job; http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index 8a650c7..d23d821 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -22,10 +22,21 @@ import java.util.ArrayList; import java.util.Hashtable; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.PipelineCallable; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRJob.State; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.hadoop.conf.Configuration; @@ -47,6 +58,9 @@ public class CrunchJobControl { private Map<Integer, CrunchControlledJob> runningJobs; private Map<Integer, CrunchControlledJob> successfulJobs; private Map<Integer, CrunchControlledJob> failedJobs; + private Map<PipelineCallable<?>, Set<Target>> allPipelineCallables; + private Set<PipelineCallable<?>> activePipelineCallables; + private List<PipelineCallable<?>> failedCallables; private Log log = LogFactory.getLog(CrunchJobControl.class); @@ -60,7 +74,8 @@ public class CrunchJobControl { * @param groupName * a name identifying this group */ - public CrunchJobControl(Configuration conf, String groupName) { + public CrunchJobControl(Configuration conf, String groupName, + Map<PipelineCallable<?>, Set<Target>> pipelineCallables) { this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>(); this.readyJobs = new Hashtable<Integer, CrunchControlledJob>(); this.runningJobs = new Hashtable<Integer, CrunchControlledJob>(); @@ -68,6 +83,9 @@ public class CrunchJobControl { this.failedJobs = new Hashtable<Integer, CrunchControlledJob>(); this.groupName = groupName; this.maxRunningJobs = conf.getInt(RuntimeParameters.MAX_RUNNING_JOBS, 5); + this.allPipelineCallables = pipelineCallables; + this.activePipelineCallables = allPipelineCallables.keySet(); + this.failedCallables = Lists.newArrayList(); } private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) { @@ -189,6 +207,61 @@ public class CrunchJobControl { } } + private Set<Target> getUnfinishedTargets() { + Set<Target> unfinished = Sets.newHashSet(); + for (CrunchControlledJob job : runningJobs.values()) { + unfinished.addAll(job.getAllTargets()); + } + for (CrunchControlledJob job : readyJobs.values()) { + unfinished.addAll(job.getAllTargets()); + } + for (CrunchControlledJob job : waitingJobs.values()) { + unfinished.addAll(job.getAllTargets()); + } + return unfinished; + } + + synchronized private void executeReadySeqDoFns() { + Set<Target> unfinished = getUnfinishedTargets(); + Set<PipelineCallable<?>> oldPipelineCallables = activePipelineCallables; + this.activePipelineCallables = Sets.newHashSet(); + List<Callable<PipelineCallable.Status>> callablesToRun = Lists.newArrayList(); + for (final PipelineCallable<?> pipelineCallable : oldPipelineCallables) { + if (Sets.intersection(allPipelineCallables.get(pipelineCallable), unfinished).isEmpty()) { + if (pipelineCallable.runSingleThreaded()) { + try { + if (pipelineCallable.call() != PipelineCallable.Status.SUCCESS) { + failedCallables.add(pipelineCallable); + } + } catch (Throwable t) { + pipelineCallable.setMessage(t.getLocalizedMessage()); + failedCallables.add(pipelineCallable); + } + } else { + callablesToRun.add(pipelineCallable); + } + } else { + // Still need to run this one + activePipelineCallables.add(pipelineCallable); + } + } + + ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + try { + List<Future<PipelineCallable.Status>> res = es.invokeAll(callablesToRun); + for (int i = 0; i < res.size(); i++) { + if (res.get(i).get() != PipelineCallable.Status.SUCCESS) { + failedCallables.add((PipelineCallable) callablesToRun.get(i)); + } + } + } catch (Throwable t) { + t.printStackTrace(); + failedCallables.addAll((List) callablesToRun); + } finally { + es.shutdownNow(); + } + } + synchronized private void startReadyJobs() { Map<Integer, CrunchControlledJob> oldJobs = null; oldJobs = this.readyJobs; @@ -220,8 +293,16 @@ public class CrunchJobControl { } synchronized public boolean allFinished() { - return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0 - && this.runningJobs.size() == 0; + return (this.waitingJobs.size() == 0 && this.readyJobs.size() == 0 + && this.runningJobs.size() == 0); + } + + synchronized public boolean anyFailures() { + return this.failedJobs.size() > 0 || failedCallables.size() > 0; + } + + public List<PipelineCallable<?>> getFailedCallables() { + return failedCallables; } /** @@ -231,6 +312,8 @@ public class CrunchJobControl { public void pollJobStatusAndStartNewOnes() throws IOException, InterruptedException { checkRunningJobs(); checkWaitingJobs(); + executeReadySeqDoFns(); startReadyJobs(); } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index e595a72..6b3da5e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -25,8 +25,10 @@ import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; @@ -67,11 +69,13 @@ public abstract class DistributedPipeline implements Pipeline { protected final PCollectionFactory factory; protected final Map<PCollectionImpl<?>, Set<Target>> outputTargets; protected final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize; + protected final Map<PipelineCallable<?>, Set<Target>> allPipelineCallables; private Path tempDirectory; private int tempFileIndex; private int nextAnonymousStageId; private Configuration conf; + private PipelineCallable currentPipelineCallable; /** * Instantiate with a custom name and configuration. @@ -84,6 +88,7 @@ public abstract class DistributedPipeline implements Pipeline { this.factory = factory; this.outputTargets = Maps.newHashMap(); this.outputTargetsToMaterialize = Maps.newHashMap(); + this.allPipelineCallables = Maps.newHashMap(); this.conf = conf; this.tempDirectory = createTempDirectory(conf); this.tempFileIndex = 0; @@ -115,12 +120,44 @@ public abstract class DistributedPipeline implements Pipeline { return res; } + @Override + public <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable) { + allPipelineCallables.put(pipelineCallable, getDependencies(pipelineCallable)); + PipelineCallable last = currentPipelineCallable; + currentPipelineCallable = pipelineCallable; + Output out = pipelineCallable.generateOutput(this); + currentPipelineCallable = last; + return out; + } + public <S> PCollection<S> read(Source<S> source) { - return factory.createInputCollection(source, this); + return factory.createInputCollection(source, this, getCurrentPDoOptions()); } public <K, V> PTable<K, V> read(TableSource<K, V> source) { - return factory.createInputTable(source, this); + return factory.createInputTable(source, this, getCurrentPDoOptions()); + } + + private ParallelDoOptions getCurrentPDoOptions() { + ParallelDoOptions.Builder pdb = ParallelDoOptions.builder(); + if (currentPipelineCallable != null) { + pdb.targets(allPipelineCallables.get(currentPipelineCallable)); + } + return pdb.build(); + } + + private Set<Target> getDependencies(PipelineCallable<?> callable) { + Set<Target> deps = Sets.newHashSet(callable.getAllTargets().values()); + for (PCollection pc : callable.getAllPCollections().values()) { + PCollectionImpl pcImpl = (PCollectionImpl) pc; + deps.addAll(pcImpl.getTargetDependencies()); + MaterializableIterable iter = (MaterializableIterable) pc.materialize(); + Source pcSrc = iter.getSource(); + if (pcSrc instanceof Target) { + deps.add((Target) pcSrc); + } + } + return deps; } public PCollection<String> readTextFile(String pathName) { http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java index 24cbaf5..064bba8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java @@ -31,6 +31,7 @@ import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.fn.Aggregators; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.PGroupedTableType; @@ -127,8 +128,8 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>> } @Override - public Set<SourceTarget<?>> getTargetDependencies() { - Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies()); + public Set<Target> getTargetDependencies() { + Set<Target> td = Sets.newHashSet(super.getTargetDependencies()); if (groupingOptions != null) { td.addAll(groupingOptions.getSourceTargets()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java index 641a3cb..8d3887d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java @@ -19,6 +19,7 @@ package org.apache.crunch.impl.dist.collect; import com.google.common.collect.ImmutableList; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.ReadableData; import org.apache.crunch.Source; import org.apache.crunch.impl.dist.DistributedPipeline; @@ -36,6 +37,11 @@ public class BaseInputCollection<S> extends PCollectionImpl<S> { this.source = source; } + public BaseInputCollection(Source<S> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + super(source.toString(), pipeline, doOpts); + this.source = source; + } + @Override protected ReadableData<S> getReadableDataInternal() { if (source instanceof ReadableSource) { @@ -60,6 +66,11 @@ public class BaseInputCollection<S> extends PCollectionImpl<S> { } @Override + protected boolean waitingOnTargets() { + return doOptions.getTargets().contains(source); + } + + @Override protected long getSizeInternal() { long sz = source.getSize(pipeline.getConfiguration()); if (sz < 0) { http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java index f41895a..cbab255 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java @@ -19,6 +19,7 @@ package org.apache.crunch.impl.dist.collect; import com.google.common.collect.ImmutableList; import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.ReadableData; import org.apache.crunch.TableSource; import org.apache.crunch.impl.dist.DistributedPipeline; @@ -35,13 +36,25 @@ public class BaseInputTable<K, V> extends PTableBase<K, V> { public BaseInputTable(TableSource<K, V> source, DistributedPipeline pipeline) { super(source.toString(), pipeline); this.source = source; - this.asCollection = pipeline.getFactory().createInputCollection(source, pipeline); + this.asCollection = pipeline.getFactory().createInputCollection( + source, pipeline, ParallelDoOptions.builder().build()); + } + + public BaseInputTable(TableSource<K, V> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) { + super(source.toString(), pipeline, doOpts); + this.source = source; + this.asCollection = pipeline.getFactory().createInputCollection(source, pipeline, doOpts); } public TableSource<K, V> getSource() { return source; } - + + @Override + protected boolean waitingOnTargets() { + return asCollection.waitingOnTargets(); + } + @Override protected long getSizeInternal() { return asCollection.getSizeInternal(); http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java index a176aa1..9077fc9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java @@ -33,9 +33,15 @@ import java.util.List; public interface PCollectionFactory { - <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline distributedPipeline); + <S> BaseInputCollection<S> createInputCollection( + Source<S> source, + DistributedPipeline distributedPipeline, + ParallelDoOptions doOpts); - <K, V> BaseInputTable<K, V> createInputTable(TableSource<K,V> source, DistributedPipeline distributedPipeline); + <K, V> BaseInputTable<K, V> createInputTable( + TableSource<K,V> source, + DistributedPipeline distributedPipeline, + ParallelDoOptions doOpts); <S> BaseUnionCollection<S> createUnionCollection(List<? extends PCollectionImpl<S>> internal); http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index a1e70fe..fb2ce31 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; +import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; import org.apache.crunch.MapFn; @@ -30,6 +31,7 @@ import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; @@ -44,6 +46,7 @@ import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -90,10 +93,17 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { @Override public Iterable<S> materialize() { - if (getSize() == 0) { + if (!waitingOnTargets() && getSize() == 0) { System.err.println("Materializing an empty PCollection: " + this.getName()); return Collections.emptyList(); } + if (materializedAt != null && (materializedAt instanceof ReadableSource)) { + try { + return ((ReadableSource<S>) materializedAt).read(getPipeline().getConfiguration()); + } catch (IOException e) { + throw new CrunchRuntimeException("Error reading materialized data", e); + } + } materialized = true; return pipeline.materialize(this); } @@ -156,9 +166,10 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options); } + public PCollection<S> write(Target target) { if (materializedAt != null) { - getPipeline().write(pipeline.getFactory().createInputCollection(materializedAt, pipeline), target); + getPipeline().write(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions), target); } else { getPipeline().write(this, target); } @@ -169,7 +180,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { public PCollection<S> write(Target target, Target.WriteMode writeMode) { if (materializedAt != null) { getPipeline().write( - pipeline.getFactory().createInputCollection(materializedAt, pipeline), + pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions), target, writeMode); } else { @@ -192,12 +203,21 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { public void accept(Visitor visitor) { if (materializedAt != null) { - visitor.visitInputCollection(pipeline.getFactory().createInputCollection(materializedAt, pipeline)); + visitor.visitInputCollection(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions)); } else { acceptInternal(visitor); } } + protected boolean waitingOnTargets() { + for (PCollectionImpl parent : getParents()) { + if (parent.waitingOnTargets()) { + return true; + } + } + return false; + } + protected abstract void acceptInternal(Visitor visitor); public void setBreakpoint() { @@ -217,6 +237,12 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { @Override public PObject<S> first() { return new FirstElementPObject<S>(this); } + @Override + public <Output> Output sequentialDo(String label, PipelineCallable<Output> pipelineCallable) { + pipelineCallable.dependsOn(label, this); + return getPipeline().sequentialDo(pipelineCallable); + } + public SourceTarget<S> getMaterializedAt() { return materializedAt; } @@ -286,8 +312,8 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { return parents.get(0); } - public Set<SourceTarget<?>> getTargetDependencies() { - Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets(); + public Set<Target> getTargetDependencies() { + Set<Target> targetDeps = Sets.<Target>newHashSet(doOptions.getTargets()); for (PCollectionImpl<?> parent : getParents()) { targetDeps = Sets.union(targetDeps, parent.getTargetDependencies()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java index 32f9991..a893b9e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java @@ -92,7 +92,7 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple public PTable<K, V> write(Target target) { if (getMaterializedAt() != null) { getPipeline().write(pipeline.getFactory().createInputTable( - (TableSource<K, V>) getMaterializedAt(), pipeline), target); + (TableSource<K, V>) getMaterializedAt(), pipeline, doOptions), target); } else { getPipeline().write(this, target); } @@ -103,7 +103,7 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple public PTable<K, V> write(Target target, Target.WriteMode writeMode) { if (getMaterializedAt() != null) { getPipeline().write(pipeline.getFactory().createInputTable( - (TableSource<K, V>) getMaterializedAt(), pipeline), target, writeMode); + (TableSource<K, V>) getMaterializedAt(), pipeline, doOptions), target, writeMode); } else { getPipeline().write(this, target, writeMode); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 42d1ca8..5996bfa 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -37,6 +37,7 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.crunch.Target; @@ -326,6 +327,19 @@ public class MemPipeline implements Pipeline { } @Override + public <Output> Output sequentialDo(PipelineCallable<Output> callable) { + Output out = callable.generateOutput(this); + try { + if (PipelineCallable.Status.FAILURE == callable.call()) { + throw new IllegalStateException("PipelineCallable " + callable + " failed in in-memory Crunch pipeline"); + } + } catch (Throwable t) { + t.printStackTrace(); + } + return out; + } + + @Override public PipelineExecution runAsync() { activeTargets.clear(); return new MemExecution(); http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 240de1c..becee88 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -37,6 +37,7 @@ import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Pipeline; import org.apache.crunch.ReadableData; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.impl.mem.MemPipeline; @@ -189,6 +190,12 @@ public class MemCollection<S> implements PCollection<S> { public PObject<S> first() { return new FirstElementPObject<S>(this); } @Override + public <Output> Output sequentialDo(String label, PipelineCallable<Output> pipelineCallable) { + pipelineCallable.dependsOn(label, this); + return getPipeline().sequentialDo(pipelineCallable); + } + + @Override public ReadableData<S> asReadable(boolean materialize) { return new MemReadableData<S>(collect); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 6cfc6d0..bf3f58a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -27,7 +27,6 @@ import java.util.Map; import com.google.common.base.Charsets; import com.google.common.collect.Maps; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CachingOptions; @@ -107,7 +106,7 @@ public class MRPipeline extends DistributedPipeline { outputTargetsToMaterialize.remove(c); } } - MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize); + MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize, allPipelineCallables); try { return planner.plan(jarClass, getConfiguration()); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java index 42d9df2..ea189f8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.impl.mr.collect; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.ReadableData; import org.apache.crunch.Source; import org.apache.crunch.impl.dist.collect.BaseInputCollection; @@ -27,8 +28,8 @@ import org.apache.crunch.io.ReadableSource; public class InputCollection<S> extends BaseInputCollection<S> implements MRCollection { - public InputCollection(Source<S> source, MRPipeline pipeline) { - super(source, pipeline); + public InputCollection(Source<S> source, MRPipeline pipeline, ParallelDoOptions doOpts) { + super(source, pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java index fb550fa..3154190 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.impl.mr.collect; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.TableSource; import org.apache.crunch.impl.dist.collect.BaseInputTable; import org.apache.crunch.impl.dist.collect.MRCollection; @@ -25,8 +26,8 @@ import org.apache.crunch.impl.mr.plan.DoNode; public class InputTable<K, V> extends BaseInputTable<K, V> implements MRCollection { - public InputTable(TableSource<K, V> source, MRPipeline pipeline) { - super(source, pipeline); + public InputTable(TableSource<K, V> source, MRPipeline pipeline, ParallelDoOptions doOpts) { + super(source, pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java index 1e94c53..ede88f2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java @@ -50,13 +50,19 @@ import java.util.List; public class MRCollectionFactory implements PCollectionFactory { @Override - public <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline pipeline) { - return new InputCollection<S>(source, (MRPipeline) pipeline); + public <S> BaseInputCollection<S> createInputCollection( + Source<S> source, + DistributedPipeline pipeline, + ParallelDoOptions doOpts) { + return new InputCollection<S>(source, (MRPipeline) pipeline, doOpts); } @Override - public <K, V> BaseInputTable<K, V> createInputTable(TableSource<K, V> source, DistributedPipeline pipeline) { - return new InputTable<K, V>(source, (MRPipeline) pipeline); + public <K, V> BaseInputTable<K, V> createInputTable( + TableSource<K, V> source, + DistributedPipeline pipeline, + ParallelDoOptions doOpts) { + return new InputTable<K, V>(source, (MRPipeline) pipeline, doOpts); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index eb46ab0..2d07c13 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractFuture; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.PipelineResult; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; @@ -33,7 +34,6 @@ import org.apache.crunch.impl.mr.MRPipelineExecution; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -73,8 +73,9 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe Configuration conf, Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets, - Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) { - this.control = new CrunchJobControl(conf, jarClass.toString()); + Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize, + Map<PipelineCallable<?>, Set<Target>> pipelineCallables) { + this.control = new CrunchJobControl(conf, jarClass.toString(), pipelineCallables); this.outputTargets = outputTargets; this.toMaterialize = toMaterialize; this.monitorThread = new Thread(new Runnable() { @@ -121,31 +122,41 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage()); } } + List<PipelineCallable<?>> failedCallables = control.getFailedCallables(); + if (!failedCallables.isEmpty()) { + System.err.println(failedCallables.size() + " callable failure(s) occurred:"); + for (PipelineCallable<?> c : failedCallables) { + System.err.println(c.getName() + ": " + c.getMessage()); + } + } + boolean hasFailures = !failures.isEmpty() || !failedCallables.isEmpty(); List<PipelineResult.StageResult> stages = Lists.newArrayList(); for (CrunchControlledJob job : control.getSuccessfulJobList()) { stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getCounters(), job.getStartTimeMsec(), job.getJobStartTimeMsec(), job.getJobEndTimeMsec(), job.getEndTimeMsec())); } - for (PCollectionImpl<?> c : outputTargets.keySet()) { - if (toMaterialize.containsKey(c)) { - MaterializableIterable iter = toMaterialize.get(c); - if (iter.isSourceTarget()) { - iter.materialize(); - c.materializeAt((SourceTarget) iter.getSource()); - } - } else { - boolean materialized = false; - for (Target t : outputTargets.get(c)) { - if (!materialized) { - if (t instanceof SourceTarget) { - c.materializeAt((SourceTarget) t); - materialized = true; - } else { - SourceTarget st = t.asSourceTarget(c.getPType()); - if (st != null) { - c.materializeAt(st); + if (!hasFailures) { + for (PCollectionImpl<?> c : outputTargets.keySet()) { + if (toMaterialize.containsKey(c)) { + MaterializableIterable iter = toMaterialize.get(c); + if (iter.isSourceTarget()) { + iter.materialize(); + c.materializeAt((SourceTarget) iter.getSource()); + } + } else { + boolean materialized = false; + for (Target t : outputTargets.get(c)) { + if (!materialized) { + if (t instanceof SourceTarget) { + c.materializeAt((SourceTarget) t); materialized = true; + } else { + SourceTarget st = t.asSourceTarget(c.getPType()); + if (st != null) { + c.materializeAt(st); + materialized = true; + } } } } @@ -156,7 +167,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe synchronized (this) { if (killSignal.getCount() == 0) { status.set(Status.KILLED); - } else if (!failures.isEmpty()) { + } else if (!failures.isEmpty() || !failedCallables.isEmpty()) { status.set(Status.FAILED); } else { status.set(Status.SUCCEEDED); http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index 41da5a6..d341184 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -147,6 +147,7 @@ class JobPrototype { job.setJarByClass(jarClass); Set<DoNode> outputNodes = Sets.newHashSet(); + Set<Target> allTargets = Sets.newHashSet(); Path outputPath = new Path(workingPath, "output"); MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null); for (Target target : targetsToNodePaths.keySet()) { @@ -159,6 +160,7 @@ class JobPrototype { } outputNodes.add(walkPath(nodePath.descendingIterator(), node)); } + allTargets.add(target); } Set<DoNode> mapSideNodes = Sets.newHashSet(); @@ -173,7 +175,7 @@ class JobPrototype { } mapSideNodes.add(walkPath(nodePath.descendingIterator(), node)); } - + allTargets.add(target); } } @@ -229,6 +231,7 @@ class JobPrototype { jobID, job, jobNameBuilder, + allTargets, new CrunchJobHooks.PrepareHook(job), new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index c9a6136..7a1ff4e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -18,10 +18,16 @@ package org.apache.crunch.impl.mr.plan; import java.io.IOException; -import java.util.*; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; @@ -47,14 +53,17 @@ public class MSCRPlanner { private final MRPipeline pipeline; private final Map<PCollectionImpl<?>, Set<Target>> outputs; private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; + private final Map<PipelineCallable<?>, Set<Target>> pipelineCallables; private int lastJobID = 0; public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs, - Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) { + Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize, + Map<PipelineCallable<?>, Set<Target>> pipelineCallables) { this.pipeline = pipeline; this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR); this.outputs.putAll(outputs); this.toMaterialize = toMaterialize; + this.pipelineCallables = pipelineCallables; } // Used to ensure that we always build pipelines starting from the deepest @@ -74,8 +83,7 @@ public class MSCRPlanner { }; public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException { - Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = - Maps.<PCollectionImpl<?>, PCollectionImpl<?>, Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR); + Map<PCollectionImpl<?>, Set<Target>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR); for (PCollectionImpl<?> pcollect : outputs.keySet()) { targetDeps.put(pcollect, pcollect.getTargetDependencies()); } @@ -109,7 +117,7 @@ public class MSCRPlanner { } if (!hasInputs) { LOG.warn("No input sources for pipeline, nothing to do..."); - return new MRExecutor(conf, jarClass, outputs, toMaterialize); + return new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables); } // Create a new graph that splits up up dependent GBK nodes. @@ -183,7 +191,7 @@ public class MSCRPlanner { // Finally, construct the jobs from the prototypes and return. DotfileWriter dotfileWriter = new DotfileWriter(); - MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize); + MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables); for (JobPrototype proto : Sets.newHashSet(assignments.values())) { dotfileWriter.addJobPrototype(proto); exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID)); http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java index 2c1f1be..e727ec1 100644 --- a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java @@ -17,15 +17,26 @@ */ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineCallable; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.plan.JobNameBuilder; import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.To; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; import java.io.IOException; +import java.util.Map; +import java.util.Set; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -37,7 +48,8 @@ public class CrunchJobControlTest { public void testMaxRunningJobs() throws IOException, InterruptedException { Configuration conf = new Configuration(); conf.setInt(RuntimeParameters.MAX_RUNNING_JOBS, 2); - CrunchJobControl jobControl = new CrunchJobControl(conf, "group"); + CrunchJobControl jobControl = new CrunchJobControl(conf, "group", + ImmutableMap.<PipelineCallable<?>, Set<Target>>of()); CrunchControlledJob job1 = createJob(1); CrunchControlledJob job2 = createJob(2); CrunchControlledJob job3 = createJob(3); @@ -60,13 +72,81 @@ public class CrunchJobControlTest { verify(job3).submit(); } - private CrunchControlledJob createJob(int jobID) throws IOException, InterruptedException { + private class IncrementingPipelineCallable extends PipelineCallable<Void> { + + private String name; + private boolean executed; + + public IncrementingPipelineCallable(String name) { + this.name = name; + } + + @Override + public Status call() { + executed = true; + return Status.SUCCESS; + } + + public boolean isExecuted() { return executed; } + + @Override + public Void getOutput(Pipeline pipeline) { + return null; + } + } + + @Test + public void testSequentialDo() throws IOException, InterruptedException { + Target t1 = To.textFile("foo"); + Target t2 = To.textFile("bar"); + Target t3 = To.textFile("baz"); + IncrementingPipelineCallable first = new IncrementingPipelineCallable("first"); + IncrementingPipelineCallable second = new IncrementingPipelineCallable("second"); + IncrementingPipelineCallable third = new IncrementingPipelineCallable("third"); + CrunchControlledJob job1 = createJob(1, ImmutableSet.of(t1)); + CrunchControlledJob job2 = createJob(2, ImmutableSet.of(t2)); + CrunchControlledJob job3 = createJob(3, ImmutableSet.of(t3)); + Configuration conf = new Configuration(); + Map<PipelineCallable<?>, Set<Target>> pipelineCallables = Maps.newHashMap(); + pipelineCallables.put(first, ImmutableSet.<Target>of()); + pipelineCallables.put(second, ImmutableSet.<Target>of(t1)); + pipelineCallables.put(third, ImmutableSet.<Target>of(t2, t3)); + CrunchJobControl jobControl = new CrunchJobControl(conf, "group", pipelineCallables); + + jobControl.addJob(job1); + jobControl.addJob(job2); + jobControl.addJob(job3); + jobControl.pollJobStatusAndStartNewOnes(); + verify(job1).submit(); + verify(job2).submit(); + verify(job3).submit(); + assertTrue(first.isExecuted()); + + setSuccess(job1); + jobControl.pollJobStatusAndStartNewOnes(); + assertTrue(second.isExecuted()); + + setSuccess(job2); + jobControl.pollJobStatusAndStartNewOnes(); + assertFalse(third.isExecuted()); + + setSuccess(job3); + jobControl.pollJobStatusAndStartNewOnes(); + assertTrue(third.isExecuted()); + } + + private CrunchControlledJob createJob(int jobID) { + return createJob(jobID, ImmutableSet.<Target>of()); + } + + private CrunchControlledJob createJob(int jobID, Set<Target> targets) { Job mrJob = mock(Job.class); when(mrJob.getConfiguration()).thenReturn(new Configuration()); CrunchControlledJob job = new CrunchControlledJob( jobID, mrJob, new JobNameBuilder(mrJob.getConfiguration(), "test", 1, 1), + targets, mock(CrunchControlledJob.Hook.class), mock(CrunchControlledJob.Hook.class)); return spy(job); http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index b2216f1..6f4a19f 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -278,6 +278,14 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { def shard(numPartitions: Int) = wrap(Shard.shard(native, numPartitions)) /** + * Adds this PCollection as a dependency for the given PipelineCallable + * and then registers it to the Pipeline associated with this instance. + */ + def sequentialDo[Output](label: String, fn: PipelineCallable[Output]) = { + native.sequentialDo(label, fn) + } + + /** * Gets the number of elements represented by this PCollection. * * @return The number of elements in this PCollection. http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala index b800612..08b4697 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala @@ -113,6 +113,10 @@ trait PipelineLike { */ def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt)) + /** + * Adds the given {@code SeqDoFn} to the pipeline execution and returns its output. + */ + def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn) /** * Returns a handler for controlling the execution of the underlying MapReduce http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java new file mode 100644 index 0000000..51b65af --- /dev/null +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.ImmutableMap; +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.test.CrunchTestSupport; +import org.junit.Test; + +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; + +public class SparkPipelineCallableIT extends CrunchTestSupport { + @Test + public void testSparkShakes() throws Exception { + run(new SparkPipeline("local", "PC", SparkPipelineCallableIT.class, tempDir.getDefaultConfiguration()), + tempDir.copyResourceFileName("shakes.txt"), false /* fail */); + } + + @Test + public void testFailure() throws Exception { + run(new SparkPipeline("local", "PC", SparkPipelineCallableIT.class, tempDir.getDefaultConfiguration()), + tempDir.copyResourceFileName("shakes.txt"), true /* fail */); + } + + public static int INC1 = 0; + public static int INC2 = 0; + + public static void run(Pipeline p, final String input, final boolean fail) { + + PTable<String, Long> top3 = p.sequentialDo(new PipelineCallable<PCollection<String>>() { + @Override + public Status call() { + INC1 = 17; + return fail ? Status.FAILURE : Status.SUCCESS; + } + + @Override + public PCollection<String> getOutput(Pipeline pipeline) { + return pipeline.readTextFile(input); + } + }.named("first")) + .sequentialDo("onInput", new PipelineCallable<PCollection<String>>() { + @Override + protected PCollection<String> getOutput(Pipeline pipeline) { + return getOnlyPCollection(); + } + + @Override + public Status call() throws Exception { + return Status.SUCCESS; + } + }) + .count() + .sequentialDo("label", new PipelineCallable<PTable<String, Long>>() { + @Override + public Status call() { + INC2 = 29; + if (getPCollection("label") != null) { + return Status.SUCCESS; + } + return Status.FAILURE; + } + + @Override + public PTable<String, Long> getOutput(Pipeline pipeline) { + return (PTable<String, Long>) getOnlyPCollection(); + } + }.named("second")) + .top(3); + + if (fail) { + assertFalse(p.run().succeeded()); + } else { + Map<String, Long> counts = top3.materializeToMap(); + assertEquals(ImmutableMap.of("", 788L, "Enter Macbeth.", 7L, "Exeunt.", 21L), counts); + assertEquals(17, INC1); + assertEquals(29, INC2); + } + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java index 7a69707..95ccd2c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java @@ -146,7 +146,8 @@ public class SparkPipeline extends DistributedPipeline { } copyConfiguration(conf, sparkContext.hadoopConfiguration()); - SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets, toMaterialize, cachedCollections); + SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets, + toMaterialize, cachedCollections, allPipelineCallables); runtime.execute(); outputTargets.clear(); return runtime; http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 22375ee..a9537e5 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -18,13 +18,17 @@ package org.apache.crunch.impl.spark; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CombineFn; import org.apache.crunch.PCollection; +import org.apache.crunch.PipelineCallable; import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; import org.apache.crunch.SourceTarget; @@ -58,10 +62,13 @@ import org.apache.spark.storage.StorageLevel; import java.io.IOException; import java.net.URI; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -79,6 +86,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe private Map<PCollectionImpl<?>, Set<Target>> outputTargets; private Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; private Map<PCollection<?>, StorageLevel> toCache; + private Map<PipelineCallable<?>, Set<Target>> allPipelineCallables; + private Set<PipelineCallable<?>> activePipelineCallables; private final CountDownLatch doneSignal = new CountDownLatch(1); private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY); private boolean started; @@ -104,7 +113,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe Configuration conf, Map<PCollectionImpl<?>, Set<Target>> outputTargets, Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize, - Map<PCollection<?>, StorageLevel> toCache) { + Map<PCollection<?>, StorageLevel> toCache, + Map<PipelineCallable<?>, Set<Target>> allPipelineCallables) { this.pipeline = pipeline; this.sparkContext = sparkContext; this.conf = conf; @@ -115,6 +125,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe this.outputTargets.putAll(outputTargets); this.toMaterialize = toMaterialize; this.toCache = toCache; + this.allPipelineCallables = allPipelineCallables; + this.activePipelineCallables = allPipelineCallables.keySet(); this.status.set(Status.READY); this.monitorThread = new Thread(new Runnable() { @Override @@ -201,13 +213,67 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe doneSignal.await(); } + private void runCallables(Set<Target> unfinished) { + Set<PipelineCallable<?>> oldCallables = activePipelineCallables; + activePipelineCallables = Sets.newHashSet(); + List<PipelineCallable<?>> callablesToRun = Lists.newArrayList(); + List<PipelineCallable<?>> failedCallables = Lists.newArrayList(); + for (PipelineCallable<?> pipelineCallable : oldCallables) { + if (Sets.intersection(allPipelineCallables.get(pipelineCallable), unfinished).isEmpty()) { + if (pipelineCallable.runSingleThreaded()) { + try { + if (pipelineCallable.call() != PipelineCallable.Status.SUCCESS) { + failedCallables.add(pipelineCallable); + } + } catch (Throwable t) { + pipelineCallable.setMessage(t.getLocalizedMessage()); + failedCallables.add(pipelineCallable); + } + } else { + callablesToRun.add(pipelineCallable); + } + } else { + // Still need to run this one + activePipelineCallables.add(pipelineCallable); + } + } + + ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + try { + List<Future<PipelineCallable.Status>> res = es.invokeAll(callablesToRun); + for (int i = 0; i < res.size(); i++) { + if (res.get(i).get() != PipelineCallable.Status.SUCCESS) { + failedCallables.add((PipelineCallable) callablesToRun.get(i)); + } + } + } catch (Throwable t) { + t.printStackTrace(); + failedCallables.addAll((List) callablesToRun); + } finally { + es.shutdownNow(); + } + + if (!failedCallables.isEmpty()) { + LOG.error(failedCallables.size() + " callable failure(s) occurred:"); + for (PipelineCallable<?> c : failedCallables) { + LOG.error(c.getName() + ": " + c.getMessage()); + } + status.set(Status.FAILED); + set(PipelineResult.EMPTY); + doneSignal.countDown(); + } + } + private void monitorLoop() { status.set(Status.RUNNING); long start = System.currentTimeMillis(); - Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.<PCollectionImpl<?>, PCollectionImpl<?>, Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR); + Map<PCollectionImpl<?>, Set<Target>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR); + Set<Target> unfinished = Sets.newHashSet(); for (PCollectionImpl<?> pcollect : outputTargets.keySet()) { targetDeps.put(pcollect, pcollect.getTargetDependencies()); + unfinished.addAll(outputTargets.get(pcollect)); } + runCallables(unfinished); while (!targetDeps.isEmpty() && doneSignal.getCount() > 0) { Set<Target> allTargets = Sets.newHashSet(); for (PCollectionImpl<?> pcollect : targetDeps.keySet()) { @@ -271,21 +337,26 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe } } } + unfinished.removeAll(targets); } - for (PCollectionImpl<?> output : pcolToRdd.keySet()) { - if (toMaterialize.containsKey(output)) { - MaterializableIterable mi = toMaterialize.get(output); - if (mi.isSourceTarget()) { - output.materializeAt((SourceTarget) mi.getSource()); + if (status.get() == Status.RUNNING) { + for (PCollectionImpl<?> output : pcolToRdd.keySet()) { + if (toMaterialize.containsKey(output)) { + MaterializableIterable mi = toMaterialize.get(output); + if (mi.isSourceTarget()) { + output.materializeAt((SourceTarget) mi.getSource()); + } } + targetDeps.remove(output); } - targetDeps.remove(output); } + runCallables(unfinished); } if (status.get() != Status.FAILED || status.get() != Status.KILLED) { status.set(Status.SUCCEEDED); set(new PipelineResult( - ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(), start, System.currentTimeMillis())), + ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(), + start, System.currentTimeMillis())), Status.SUCCEEDED)); } else { set(PipelineResult.EMPTY);
