Updated Branches: refs/heads/master 246109962 -> 1082111c7
CRUNCH-218: Add a WriteMode for checkpoint outputs, and make invalid checkpoint targets throw a CrunchRuntimeException. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1082111c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1082111c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1082111c Branch: refs/heads/master Commit: 1082111c741916f62208cf783a0e69850fe60018 Parents: 2461099 Author: Josh Wills <[email protected]> Authored: Wed Jun 12 10:22:29 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Sat Jun 29 08:37:22 2013 -0700 ---------------------------------------------------------------------- .../crunch/contrib/io/jdbc/DataBaseSource.java | 7 +- .../it/java/org/apache/crunch/CheckpointIT.java | 100 +++++++++++++++++++ .../src/main/java/org/apache/crunch/Source.java | 7 ++ .../src/main/java/org/apache/crunch/Target.java | 11 +- .../org/apache/crunch/impl/mem/MemPipeline.java | 2 +- .../org/apache/crunch/impl/mr/MRPipeline.java | 13 ++- .../impl/mr/collect/DoCollectionImpl.java | 5 + .../crunch/impl/mr/collect/DoTableImpl.java | 5 + .../crunch/impl/mr/collect/InputCollection.java | 5 + .../crunch/impl/mr/collect/InputTable.java | 5 + .../crunch/impl/mr/collect/PCollectionImpl.java | 2 + .../impl/mr/collect/PGroupedTableImpl.java | 5 + .../crunch/impl/mr/collect/UnionCollection.java | 11 +- .../crunch/impl/mr/collect/UnionTable.java | 11 +- .../crunch/impl/mr/exec/CrunchJobHooks.java | 2 +- .../apache/crunch/io/SourceTargetHelper.java | 14 +++ .../apache/crunch/io/impl/FileSourceImpl.java | 19 +++- .../apache/crunch/io/impl/FileTargetImpl.java | 32 +++++- .../apache/crunch/io/impl/SourceTargetImpl.java | 12 ++- .../impl/mr/collect/DoCollectionImplTest.java | 4 + .../crunch/io/hbase/HBaseSourceTarget.java | 10 ++ .../org/apache/crunch/io/hbase/HBaseTarget.java | 3 +- 22 files changed, 270 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java index 23ca685..6ba5e06 100644 --- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java @@ -114,8 +114,13 @@ public class DataBaseSource<T extends DBWritable & Writable> implements Source<T } @Override + public long getLastModifiedAt(Configuration configuration) { + return -1; + } + + @Override public PType<T> getType() { return ptype; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java b/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java new file mode 100644 index 0000000..acb039d --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Target.WriteMode; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; +import org.junit.Rule; +import org.junit.Test; + +public class CheckpointIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testCheckpoints() throws Exception { + String inputPath = tmpDir.copyResourceFileName("shakes.txt"); + Pipeline p = new MRPipeline(CheckpointIT.class); + String inter = tmpDir.getFileName("intermediate"); + PipelineResult one = run(p, tmpDir, inputPath, inter, false); + assertTrue(one.succeeded()); + assertEquals(2, one.getStageResults().size()); + PipelineResult two = run(p, tmpDir, inputPath, inter, false); + assertTrue(two.succeeded()); + assertEquals(1, two.getStageResults().size()); + } + + @Test + public void testUnsuccessfulCheckpoint() throws Exception { + String inputPath = tmpDir.copyResourceFileName("shakes.txt"); + Pipeline p = new MRPipeline(CheckpointIT.class); + String inter = tmpDir.getFileName("intermediate"); + PipelineResult one = run(p, tmpDir, inputPath, inter, true); + assertFalse(one.succeeded()); + PipelineResult two = run(p, tmpDir, inputPath, inter, false); + assertTrue(two.succeeded()); + assertEquals(2, two.getStageResults().size()); + } + + @Test + public void testModifiedFileCheckpoint() throws Exception { + String inputPath = tmpDir.copyResourceFileName("shakes.txt"); + Pipeline p = new MRPipeline(CheckpointIT.class); + Path inter = tmpDir.getPath("intermediate"); + PipelineResult one = run(p, tmpDir, inputPath, inter.toString(), false); + assertTrue(one.succeeded()); + assertEquals(2, one.getStageResults().size()); + // Update the input path + inputPath = tmpDir.copyResourceFileName("shakes.txt"); + PipelineResult two = run(p, tmpDir, inputPath, inter.toString(), false); + assertTrue(two.succeeded()); + assertEquals(2, two.getStageResults().size()); + } + + public static PipelineResult run(Pipeline pipeline, TemporaryPath tmpDir, + String shakesInputPath, String intermediatePath, + final boolean fail) + throws Exception { + PCollection<String> shakes = pipeline.readTextFile(shakesInputPath); + PTable<String, Long> cnts = shakes.parallelDo("split words", new DoFn<String, String>() { + @Override + public void process(String line, Emitter<String> emitter) { + if (fail) { + throw new RuntimeException("Failure!"); + } + for (String word : line.split("\\s+")) { + emitter.emit(word); + } + } + }, Avros.strings()).count(); + cnts.write(To.avroFile(intermediatePath), WriteMode.CHECKPOINT); + PTable<String, Long> singleCounts = cnts.keys().count(); + singleCounts.write(To.textFile(tmpDir.getFileName("singleCounts")), WriteMode.OVERWRITE); + return pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/Source.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java index f54d135..b744c8f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Source.java +++ b/crunch-core/src/main/java/org/apache/crunch/Source.java @@ -49,4 +49,11 @@ public interface Source<T> { * Returns the number of bytes in this {@code Source}. */ long getSize(Configuration configuration); + + /** + * Returns the time (in milliseconds) that this {@code Source} was most recently + * modified (e.g., because an input file was edited or new files were added to + * a directory.) + */ + long getLastModifiedAt(Configuration configuration); } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/Target.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java index 0a0c23d..48dc2cd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Target.java +++ b/crunch-core/src/main/java/org/apache/crunch/Target.java @@ -50,7 +50,13 @@ public interface Target { * add the output of this pipeline to the target. This was the * behavior in Crunch up to version 0.4.0. */ - APPEND + APPEND, + + /** + * If the output target exists and is newer than any of its source inputs, don't rewrite it, + * just start the pipeline from here. Only works with {@code SourceTarget} instances. + */ + CHECKPOINT } /** @@ -58,8 +64,9 @@ public interface Target { * * @param writeMode The strategy for handling existing outputs * @param conf The ever-useful {@code Configuration} instance + * @return true if the target did exist */ - void handleExisting(WriteMode writeMode, Configuration conf); + boolean handleExisting(WriteMode writeMode, long lastModifiedAt, Configuration conf); /** * Checks to see if this {@code Target} instance is compatible with the http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 9001e51..60677fc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -176,7 +176,7 @@ public class MemPipeline implements Pipeline { @Override public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode) { - target.handleExisting(writeMode, getConfiguration()); + target.handleExisting(writeMode, -1, getConfiguration()); if (writeMode != Target.WriteMode.APPEND && activeTargets.contains(target)) { throw new CrunchRuntimeException("Target " + target + " is already written in the current run." http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index f167846..4fb2876 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -209,8 +209,17 @@ public class MRPipeline implements Pipeline { pcollection = pcollection.parallelDo("UnionCollectionWrapper", (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); } - target.handleExisting(writeMode, getConfiguration()); - if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) { + boolean exists = target.handleExisting(writeMode, ((PCollectionImpl) pcollection).getLastModifiedAt(), + getConfiguration()); + if (exists && writeMode == WriteMode.CHECKPOINT) { + SourceTarget<?> st = target.asSourceTarget(pcollection.getPType()); + if (st == null) { + throw new CrunchRuntimeException("Target " + target + " does not support checkpointing"); + } else { + ((PCollectionImpl) pcollection).materializeAt(st); + } + return; + } else if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) { throw new CrunchRuntimeException("Target " + target + " is already written in current run." + " Use WriteMode.APPEND in order to write additional data to it."); } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java index 8881e3f..917ef65 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java @@ -68,4 +68,9 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S> { public DoNode createDoNode() { return DoNode.createFnNode(getName(), fn, ntype); } + + @Override + public long getLastModifiedAt() { + return parent.getLastModifiedAt(); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java index 176643b..5329c7a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java @@ -81,4 +81,9 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> public boolean hasCombineFn() { return fn instanceof CombineFn; } + + @Override + public long getLastModifiedAt() { + return parent.getLastModifiedAt(); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java index ace5cc1..a4958e7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java @@ -71,6 +71,11 @@ public class InputCollection<S> extends PCollectionImpl<S> { } @Override + public long getLastModifiedAt() { + return source.getLastModifiedAt(pipeline.getConfiguration()); + } + + @Override public boolean equals(Object obj) { if (obj == null || !(obj instanceof InputCollection)) { return false; http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java index 71f11c5..8317452 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java @@ -75,6 +75,11 @@ public class InputTable<K, V> extends PTableBase<K, V> { } @Override + public long getLastModifiedAt() { + return source.getLastModifiedAt(pipeline.getConfiguration()); + } + + @Override public int hashCode() { return asCollection.hashCode(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index 6ea9c4c..b5f1cef 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -284,6 +284,8 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { protected abstract long getSizeInternal(); + public abstract long getLastModifiedAt(); + /** * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline. * @return The PCollectionImpl instance to be chained http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java index d277b75..c385e16 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java @@ -154,6 +154,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V> } @Override + public long getLastModifiedAt() { + return parent.getLastModifiedAt(); + } + + @Override protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() { // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs // TODO This should be implemented in a cleaner way in the planner http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java index 7b3dd7b..b6e1fdd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java @@ -29,7 +29,8 @@ public class UnionCollection<S> extends PCollectionImpl<S> { private List<PCollectionImpl<S>> parents; private long size = 0; - + private long lastModifiedAt = -1; + private static <S> String flatName(List<PCollectionImpl<S>> collections) { StringBuilder sb = new StringBuilder("union("); for (int i = 0; i < collections.size(); i++) { @@ -50,6 +51,9 @@ public class UnionCollection<S> extends PCollectionImpl<S> { throw new IllegalStateException("Cannot union PCollections from different Pipeline instances"); } size += parent.getSize(); + if (parent.getLastModifiedAt() > lastModifiedAt) { + this.lastModifiedAt = parent.getLastModifiedAt(); + } } } @@ -59,6 +63,11 @@ public class UnionCollection<S> extends PCollectionImpl<S> { } @Override + public long getLastModifiedAt() { + return lastModifiedAt; + } + + @Override protected void acceptInternal(PCollectionImpl.Visitor visitor) { visitor.visitUnionCollection(this); } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java index a369432..91f518a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java @@ -33,7 +33,8 @@ public class UnionTable<K, V> extends PTableBase<K, V> { private PTableType<K, V> ptype; private List<PCollectionImpl<Pair<K, V>>> parents; private long size; - + private long lastModifiedAt = -1; + private static <K, V> String flatName(List<PTableBase<K, V>> tables) { StringBuilder sb = new StringBuilder("union("); for (int i = 0; i < tables.size(); i++) { @@ -56,6 +57,9 @@ public class UnionTable<K, V> extends PTableBase<K, V> { } this.parents.add(parent); size += parent.getSize(); + if (parent.getLastModifiedAt() > lastModifiedAt) { + this.lastModifiedAt = parent.getLastModifiedAt(); + } } } @@ -65,6 +69,11 @@ public class UnionTable<K, V> extends PTableBase<K, V> { } @Override + public long getLastModifiedAt() { + return lastModifiedAt; + } + + @Override public PTableType<K, V> getPTableType() { return ptype; } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java index b06847b..dee80f1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java @@ -79,7 +79,7 @@ public final class CrunchJobHooks { } private synchronized void handleMultiPaths() throws IOException { - if (!multiPaths.isEmpty()) { + if (job.isSuccessful() && !multiPaths.isEmpty()) { // Need to handle moving the data from the output directory of the // job to the output locations specified in the paths. FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java index f4400de..66764cf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java @@ -45,4 +45,18 @@ public class SourceTargetHelper { } return size; } + + public static long getLastModifiedAt(FileSystem fs, Path path) throws IOException { + FileStatus[] stati = fs.globStatus(path); + if (stati == null || stati.length == 0) { + return -1L; + } + long lastMod = -1; + for (FileStatus status : stati) { + if (lastMod < status.getModificationTime()) { + lastMod = status.getModificationTime(); + } + } + return lastMod; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index 44139b0..b232abb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -31,9 +31,7 @@ import org.apache.crunch.io.CrunchInputs; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.SourceTargetHelper; -import org.apache.crunch.io.avro.AvroFileReaderFactory; import org.apache.crunch.types.PType; -import org.apache.crunch.types.avro.AvroType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -138,6 +136,23 @@ public class FileSourceImpl<T> implements Source<T> { } @Override + public long getLastModifiedAt(Configuration conf) { + long lastMod = -1; + for (Path path : paths) { + try { + FileSystem fs = path.getFileSystem(conf); + long lm = SourceTargetHelper.getLastModifiedAt(fs, path); + if (lm > lastMod) { + lastMod = lm; + } + } catch (IOException e) { + LOG.error("Could not determine last modification time for source: " + toString(), e); + } + } + return lastMod; + } + + @Override public boolean equals(Object other) { if (other == null || !getClass().equals(other.getClass())) { return false; http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index 4e3dd9a..50a1fd3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -31,6 +31,7 @@ import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.OutputHandler; import org.apache.crunch.io.PathTarget; +import org.apache.crunch.io.SourceTargetHelper; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; @@ -103,6 +104,11 @@ public class FileTargetImpl implements PathTarget { FileUtil.copy(srcFs, s, dstFs, d, true, true, conf); } } + dstFs.create(getSuccessIndicator(), true).close(); + } + + private Path getSuccessIndicator() { + return new Path(path, "_SUCCESS"); } protected Path getSourcePattern(Path workingPath, int index) { @@ -184,7 +190,7 @@ public class FileTargetImpl implements PathTarget { } @Override - public void handleExisting(WriteMode strategy, Configuration conf) { + public boolean handleExisting(WriteMode strategy, long lastModForSource, Configuration conf) { FileSystem fs = null; try { fs = path.getFileSystem(conf); @@ -194,8 +200,14 @@ public class FileTargetImpl implements PathTarget { } boolean exists = false; + boolean successful = false; + long lastModForTarget = -1; try { exists = fs.exists(path); + if (exists) { + successful = fs.exists(getSuccessIndicator()); + lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path); + } } catch (IOException e) { LOG.error("Exception checking existence of path: " + path, e); throw new CrunchRuntimeException(e); @@ -217,11 +229,29 @@ public class FileTargetImpl implements PathTarget { case APPEND: LOG.info("Adding output files to existing path: " + path); break; + case CHECKPOINT: + if (successful && lastModForTarget > lastModForSource) { + LOG.info("Re-starting pipeline from checkpoint path: " + path); + break; + } else { + if (!successful) { + LOG.info("_SUCCESS file not found, Removing data at existing checkpoint path: " + path); + } else { + LOG.info("Source data has recent updates. Removing data at existing checkpoint path: " + path); + } + try { + fs.delete(path, true); + } catch (IOException e) { + LOG.error("Exception thrown removing data at checkpoint path: " + path, e); + } + return false; + } default: throw new CrunchRuntimeException("Unknown WriteMode: " + strategy); } } else { LOG.info("Will write output files to new path: " + path); } + return exists; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java index 4d2b88a..5dd4d69 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java @@ -60,6 +60,9 @@ class SourceTargetImpl<T> implements SourceTarget<T> { @Override public <S> SourceTarget<S> asSourceTarget(PType<S> ptype) { + if (ptype != null && ptype.equals(source.getType())) { + return (SourceTarget<S>) this; + } return target.asSourceTarget(ptype); } @@ -83,7 +86,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> { } @Override - public void handleExisting(WriteMode strategy, Configuration conf) { - target.handleExisting(strategy, conf); + public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) { + return target.handleExisting(strategy, lastModifiedAt, conf); + } + + @Override + public long getLastModifiedAt(Configuration configuration) { + return source.getLastModifiedAt(configuration); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java index fd582bc..b025119 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java @@ -107,6 +107,10 @@ public class DoCollectionImplTest { return internalSize; } + @Override + public long getLastModifiedAt() { + return -1; + } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index 230c701..6a5a124 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -22,6 +22,8 @@ import java.io.DataOutputStream; import java.io.IOException; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.Pair; import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; @@ -45,6 +47,8 @@ import org.apache.hadoop.mapreduce.Job; public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>, TableSource<ImmutableBytesWritable, Result> { + private static final Log LOG = LogFactory.getLog(HBaseSourceTarget.class); + private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf( Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class)); @@ -120,4 +124,10 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair< // TODO something smarter here. return 1000L * 1000L * 1000L; } + + @Override + public long getLastModifiedAt(Configuration configuration) { + LOG.warn("Cannot determine last modified time for source: " + toString()); + return -1; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index eceb31d..83d62c8 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -116,7 +116,8 @@ public class HBaseTarget implements MapReduceTarget { } @Override - public void handleExisting(WriteMode strategy, Configuration conf) { + public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) { LOG.info("HBaseTarget ignores checks for existing outputs..."); + return false; } }
