Updated Branches: refs/heads/master 6f33d586f -> 849ad5fe4
CRUNCH-132: Second cut at adding a WriteMode for Targets and changing the default to fail if an output directory already exists. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/849ad5fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/849ad5fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/849ad5fe Branch: refs/heads/master Commit: 849ad5fe48a411a4d925556f32ba5d8a366511d4 Parents: 6f33d58 Author: Josh Wills <[email protected]> Authored: Sat Feb 9 14:02:34 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Sun Feb 10 22:23:28 2013 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/io/hbase/HBaseTarget.java | 9 ++ .../apache/crunch/scrunch/PCollectionLike.scala | 4 + .../org/apache/crunch/scrunch/PipelineLike.scala | 22 +++ .../crunch/impl/mem/MemPipelineFileWritingIT.java | 4 +- .../main/java/org/apache/crunch/PCollection.java | 12 ++ crunch/src/main/java/org/apache/crunch/PTable.java | 6 + .../src/main/java/org/apache/crunch/Pipeline.java | 20 +++- crunch/src/main/java/org/apache/crunch/Target.java | 56 ++++++++- .../org/apache/crunch/impl/mem/MemPipeline.java | 27 ++++- .../crunch/impl/mem/collect/MemCollection.java | 6 + .../apache/crunch/impl/mem/collect/MemTable.java | 6 + .../java/org/apache/crunch/impl/mr/MRPipeline.java | 22 +++- .../crunch/impl/mr/collect/PCollectionImpl.java | 13 ++- .../apache/crunch/impl/mr/collect/PTableBase.java | 20 +++- .../org/apache/crunch/io/impl/FileTargetImpl.java | 52 ++++++++ .../apache/crunch/io/impl/SourceTargetImpl.java | 5 + .../test/java/org/apache/crunch/WriteModeTest.java | 103 +++++++++++++++ 17 files changed, 376 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index 44864e8..eceb31d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -20,6 +20,8 @@ package org.apache.crunch.io.hbase; import java.io.IOException; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.CrunchOutputs; @@ -39,6 +41,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class HBaseTarget implements MapReduceTarget { + private static final Log LOG = LogFactory.getLog(HBaseTarget.class); + protected String table; public HBaseTarget(String table) { @@ -110,4 +114,9 @@ public class HBaseTarget implements MapReduceTarget { public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { return null; } + + @Override + public void handleExisting(WriteMode strategy, Configuration conf) { + LOG.info("HBaseTarget ignores checks for existing outputs..."); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index f6441ac..68fe7a5 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -28,6 +28,10 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { def write(target: Target): FullType = wrap(native.write(target)) + def write(target: Target, writeMode: Target.WriteMode): FullType = { + wrap(native.write(target, writeMode)) + } + def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = { new PCollection[T](native.parallelDo(fn, ptype)) } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala index 5a10ee7..c183e5e 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala @@ -61,6 +61,17 @@ trait PipelineLike { def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target) /** + * Writes a parallel collection to a target using an output strategy. + * + * @param collection The collection to write. + * @param target The destination target for this write. + * @param writeMode The WriteMode to use for handling existing outputs. + */ + def write(collection: PCollection[_], target: Target, writeMode: Target.WriteMode): Unit = { + jpipeline.write(collection.native, target, writeMode) + } + + /** * Writes a parallel table to a target. * * @param table The table to write. @@ -69,6 +80,17 @@ trait PipelineLike { def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target) /** + * Writes a parallel table to a target. + * + * @param table The table to write. + * @param target The destination target for this write. + * @param writeMode The write mode to use on the target + */ + def write(table: PTable[_, _], target: Target, writeMode: Target.WriteMode): Unit = { + jpipeline.write(table.native, target, writeMode) + } + + /** * Constructs and executes a series of MapReduce jobs in order * to write data to the output targets. */ http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java index dc9652d..976a43e 100644 --- a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java +++ b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java @@ -40,10 +40,10 @@ public class MemPipelineFileWritingIT { @Test public void testMemPipelineFileWriter() throws Exception { - File tmpDir = baseTmpDir.getRootFile(); + File tmpDir = baseTmpDir.getFile("mempipe"); Pipeline p = MemPipeline.getInstance(); PCollection<String> lines = MemPipeline.collectionOf("hello", "world"); - p.writeTextFile(lines, tmpDir.getAbsolutePath()); + p.writeTextFile(lines, tmpDir.toString()); p.done(); assertTrue(tmpDir.exists()); File[] files = tmpDir.listFiles(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java index 798c262..1783677 100644 --- a/crunch/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch/src/main/java/org/apache/crunch/PCollection.java @@ -136,6 +136,18 @@ public interface PCollection<S> { PCollection<S> write(Target target); /** + * Write the contents of this {@code PCollection} to the given {@code Target}, + * using the given {@code Target.WriteMode} to handle existing + * targets. + * + * @param target + * The target + * @param writeMode + * The rule for handling existing outputs at the target location + */ + PCollection<S> write(Target target, Target.WriteMode writeMode); + + /** * Returns a reference to the data set represented by this PCollection that * may be used by the client to read the data locally. */ http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/PTable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java b/crunch/src/main/java/org/apache/crunch/PTable.java index b754a2c..b32bd80 100644 --- a/crunch/src/main/java/org/apache/crunch/PTable.java +++ b/crunch/src/main/java/org/apache/crunch/PTable.java @@ -68,6 +68,12 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { PTable<K, V> write(Target target); /** + * Writes this {@code PTable} to the given {@code Target}, using the + * given {@code Target.WriteMode} to handle existing targets. + */ + PTable<K, V> write(Target target, Target.WriteMode writeMode); + + /** * Returns the {@code PTableType} of this {@code PTable}. */ PTableType<K, V> getPTableType(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/Pipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java index bcf8727..af1d86a 100644 --- a/crunch/src/main/java/org/apache/crunch/Pipeline.java +++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java @@ -63,7 +63,9 @@ public interface Pipeline { <K, V> PTable<K, V> read(TableSource<K, V> tableSource); /** - * Write the given collection to the given target on the next pipeline run. + * Write the given collection to the given target on the next pipeline run. The + * system will check to see if the target's location already exists using the + * {@code WriteMode.DEFAULT} rule for the given {@code Target}. * * @param collection * The collection @@ -73,6 +75,22 @@ public interface Pipeline { void write(PCollection<?> collection, Target target); /** + * Write the contents of the {@code PCollection} to the given {@code Target}, + * using the storage format specified by the target and the given + * {@code WriteMode} for cases where the referenced {@code Target} + * already exists. + * + * @param collection + * The collection + * @param target + * The target to write to + * @param writeMode + * The strategy to use for handling existing outputs + */ + void write(PCollection<?> collection, Target target, + Target.WriteMode writeMode); + + /** * Create the given PCollection and read the data it contains into the * returned Collection instance for client use. * http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/Target.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/Target.java b/crunch/src/main/java/org/apache/crunch/Target.java index ea6fd9d..0a0c23d 100644 --- a/crunch/src/main/java/org/apache/crunch/Target.java +++ b/crunch/src/main/java/org/apache/crunch/Target.java @@ -19,13 +19,65 @@ package org.apache.crunch; import org.apache.crunch.io.OutputHandler; import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; /** - * A {@code Target} represents the output destination of a Crunch job. - * + * A {@code Target} represents the output destination of a Crunch {@code PCollection} + * in the context of a Crunch job. */ public interface Target { + + /** + * An enum to represent different options the client may specify + * for handling the case where the output path, table, etc. referenced + * by a {@code Target} already exists. + */ + enum WriteMode { + /** + * Check to see if the output target already exists before running + * the pipeline, and if it does, print an error and throw an exception. + */ + DEFAULT, + + /** + * Check to see if the output target already exists, and if it does, + * delete it and overwrite it with the new output (if any). + */ + OVERWRITE, + + /** + * If the output target does not exist, create it. If it does exist, + * add the output of this pipeline to the target. This was the + * behavior in Crunch up to version 0.4.0. + */ + APPEND + } + + /** + * Apply the given {@code WriteMode} to this {@code Target} instance. + * + * @param writeMode The strategy for handling existing outputs + * @param conf The ever-useful {@code Configuration} instance + */ + void handleExisting(WriteMode writeMode, Configuration conf); + + /** + * Checks to see if this {@code Target} instance is compatible with the + * given {@code PType}. + * + * @param handler The {@link OutputHandler} that is managing the output for the job + * @param ptype The {@code PType} to check + * @return True if this Target can write data in the form of the given {@code PType}, + * false otherwise + */ boolean accept(OutputHandler handler, PType<?> ptype); + /** + * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target} + * for the given {@code PType}, if possible. If it is not possible, return {@code null}. + * + * @param ptype The {@code PType} to use in constructing the {@code SourceTarget} + * @return A new {@code SourceTarget} or null if such a {@code SourceTarget} does not exist + */ <T> SourceTarget<T> asSourceTarget(PType<T> ptype); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 95c9e72..488cdd9 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -19,9 +19,11 @@ package org.apache.crunch.impl.mem; import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; @@ -30,6 +32,7 @@ import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.crunch.Target; +import org.apache.crunch.Target.WriteMode; import org.apache.crunch.impl.mem.collect.MemCollection; import org.apache.crunch.impl.mem.collect.MemTable; import org.apache.crunch.io.At; @@ -45,6 +48,7 @@ import org.apache.hadoop.mapreduce.Counters; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class MemPipeline implements Pipeline { @@ -52,6 +56,8 @@ public class MemPipeline implements Pipeline { private static Counters COUNTERS = new Counters(); private static final MemPipeline INSTANCE = new MemPipeline(); + private int outputIndex = 0; + public static Counters getCounters() { return COUNTERS; } @@ -103,7 +109,8 @@ public class MemPipeline implements Pipeline { } private Configuration conf = new Configuration(); - + private Set<Target> activeTargets = Sets.newHashSet(); + private MemPipeline() { } @@ -149,11 +156,24 @@ public class MemPipeline implements Pipeline { @Override public void write(PCollection<?> collection, Target target) { + write(collection, target, Target.WriteMode.DEFAULT); + } + + @Override + public void write(PCollection<?> collection, Target target, + Target.WriteMode writeMode) { + target.handleExisting(writeMode, getConfiguration()); + if (writeMode != WriteMode.APPEND && activeTargets.contains(target)) { + throw new CrunchRuntimeException("Target " + target + " is already written in the current run." + + " Use WriteMode.APPEND in order to write additional data to it."); + } + activeTargets.add(target); if (target instanceof PathTarget) { Path path = ((PathTarget) target).getPath(); try { FileSystem fs = path.getFileSystem(conf); - FSDataOutputStream os = fs.create(new Path(path, "out")); + FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex)); + outputIndex++; if (collection instanceof PTable) { for (Object o : collection.materialize()) { Pair p = (Pair) o; @@ -193,12 +213,13 @@ public class MemPipeline implements Pipeline { @Override public PipelineResult run() { + activeTargets.clear(); return PipelineResult.EMPTY; } @Override public PipelineResult done() { - return PipelineResult.EMPTY; + return run(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index cc9f3fc..b1d6be5 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -142,6 +142,12 @@ public class MemCollection<S> implements PCollection<S> { } @Override + public PCollection<S> write(Target target, Target.WriteMode writeMode) { + getPipeline().write(this, target, writeMode); + return this; + } + + @Override public Iterable<S> materialize() { return collect; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java index 524d492..8d9649d 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -87,6 +87,12 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< } @Override + public PTable<K, V> write(Target target, Target.WriteMode writeMode) { + getPipeline().write(this, target, writeMode); + return this; + } + + @Override public PTableType<K, V> getPTableType() { return ptype; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 9c98937..2d4d137 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -34,6 +34,7 @@ import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; import org.apache.crunch.Target; +import org.apache.crunch.Target.WriteMode; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.collect.InputCollection; import org.apache.crunch.impl.mr.collect.InputTable; @@ -206,17 +207,36 @@ public class MRPipeline implements Pipeline { return read(From.textFile(pathName)); } - @SuppressWarnings("unchecked") public void write(PCollection<?> pcollection, Target target) { + write(pcollection, target, Target.WriteMode.DEFAULT); + } + + @SuppressWarnings("unchecked") + public void write(PCollection<?> pcollection, Target target, + Target.WriteMode writeMode) { if (pcollection instanceof PGroupedTableImpl) { pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup(); } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) { pcollection = pcollection.parallelDo("UnionCollectionWrapper", (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); } + target.handleExisting(writeMode, getConfiguration()); + if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) { + throw new CrunchRuntimeException("Target " + target + " is already written in current run." + + " Use WriteMode.APPEND in order to write additional data to it."); + } addOutput((PCollectionImpl<?>) pcollection, target); } + private boolean targetInCurrentRun(Target target) { + for (Set<Target> targets : outputTargets.values()) { + if (targets.contains(target)) { + return true; + } + } + return false; + } + private void addOutput(PCollectionImpl<?> impl, Target target) { if (!outputTargets.containsKey(impl)) { outputTargets.put(impl, Sets.<Target> newHashSet()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index f48308a..79b7c83 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -54,7 +54,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { private final String name; protected MRPipeline pipeline; - private SourceTarget<S> materializedAt; + protected SourceTarget<S> materializedAt; private final ParallelDoOptions options; public PCollectionImpl(String name) { @@ -130,6 +130,17 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } @Override + public PCollection<S> write(Target target, Target.WriteMode writeMode) { + if (materializedAt != null) { + getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target, + writeMode); + } else { + getPipeline().write(this, target, writeMode); + } + return this; + } + + @Override public Iterable<S> materialize() { if (getSize() == 0) { LOG.warn("Materializing an empty PCollection: " + this.getName()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java index 69ea8a3..a41e979 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java @@ -28,7 +28,9 @@ import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.TableSource; import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.lib.Cogroup; import org.apache.crunch.lib.Join; @@ -81,11 +83,27 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P @Override public PTable<K, V> write(Target target) { - getPipeline().write(this, target); + if (getMaterializedAt() != null) { + getPipeline().write(new InputTable<K, V>( + (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target); + } else { + getPipeline().write(this, target); + } return this; } @Override + public PTable<K, V> write(Target target, Target.WriteMode writeMode) { + if (getMaterializedAt() != null) { + getPipeline().write(new InputTable<K, V>( + (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target, writeMode); + } else { + getPipeline().write(this, target, writeMode); + } + return this; + } + + @Override public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) { return parallelDo(filterFn, getPTableType()); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index 46a6386..c1c29e4 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -17,7 +17,12 @@ */ package org.apache.crunch.io.impl; +import java.io.IOException; + import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.FileNamingScheme; @@ -25,12 +30,17 @@ import org.apache.crunch.io.OutputHandler; import org.apache.crunch.io.PathTarget; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FileTargetImpl implements PathTarget { + private static final Log LOG = LogFactory.getLog(FileTargetImpl.class); + protected final Path path; private final Class<? extends FileOutputFormat> outputFormatClass; private final FileNamingScheme fileNamingScheme; @@ -107,4 +117,46 @@ public class FileTargetImpl implements PathTarget { // By default, assume that we cannot do this. return null; } + + @Override + public void handleExisting(WriteMode strategy, Configuration conf) { + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + LOG.error("Could not retrieve FileSystem object to check for existing path", e); + throw new CrunchRuntimeException(e); + } + + boolean exists = false; + try { + exists = fs.exists(path); + } catch (IOException e) { + LOG.error("Exception checking existence of path: " + path, e); + throw new CrunchRuntimeException(e); + } + + if (exists) { + switch (strategy) { + case DEFAULT: + LOG.error("Path " + path + " already exists!"); + throw new CrunchRuntimeException("Path already exists: " + path); + case OVERWRITE: + LOG.info("Removing data at existing path: " + path); + try { + fs.delete(path, true); + } catch (IOException e) { + LOG.error("Exception thrown removing data at path: " + path, e); + } + break; + case APPEND: + LOG.info("Adding output files to existing path: " + path); + break; + default: + throw new CrunchRuntimeException("Unknown WriteMode: " + strategy); + } + } else { + LOG.info("Will write output files to new path: " + path); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java index 9626b26..4d2b88a 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java @@ -81,4 +81,9 @@ class SourceTargetImpl<T> implements SourceTarget<T> { public String toString() { return source.toString(); } + + @Override + public void handleExisting(WriteMode strategy, Configuration conf) { + target.handleExisting(strategy, conf); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/test/java/org/apache/crunch/WriteModeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/WriteModeTest.java b/crunch/src/test/java/org/apache/crunch/WriteModeTest.java new file mode 100644 index 0000000..e99ac7b --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/WriteModeTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.Target.WriteMode; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +public class WriteModeTest { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test(expected=CrunchRuntimeException.class) + public void testDefault() throws Exception { + run(null, true); + } + + @Test(expected=CrunchRuntimeException.class) + public void testDefaultNoRun() throws Exception { + run(null, false); + } + + @Test + public void testOverwrite() throws Exception { + Path p = run(WriteMode.OVERWRITE, true); + PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString()); + assertEquals(ImmutableList.of("some", "string", "values"), lines.materialize()); + } + + @Test(expected=CrunchRuntimeException.class) + public void testOverwriteNoRun() throws Exception { + run(WriteMode.OVERWRITE, false); + } + + @Test + public void testAppend() throws Exception { + Path p = run(WriteMode.APPEND, true); + PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString()); + assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"), + lines.materialize()); + } + + @Test + public void testAppendNoRun() throws Exception { + Path p = run(WriteMode.APPEND, false); + PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString()); + assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"), + lines.materialize()); + } + + Path run(WriteMode writeMode, boolean doRun) throws Exception { + Path output = tmpDir.getPath("existing"); + FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration()); + if (fs.exists(output)) { + fs.delete(output, true); + } + Pipeline p = MemPipeline.getInstance(); + PCollection<String> data = MemPipeline.typedCollectionOf(Avros.strings(), + ImmutableList.of("some", "string", "values")); + data.write(To.textFile(output)); + + if (doRun) { + p.run(); + } + + if (writeMode == null) { + data.write(To.textFile(output)); + } else { + data.write(To.textFile(output), writeMode); + } + + p.run(); + + return output; + } +}
