[GOBBLIN-422] Update fs snapshot in previously failed workunits with the current effectiveSnapshot
Closes #2299 from ragepati/ragepati- filebasedsource-prevfssnapshot Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/979ad2a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/979ad2a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/979ad2a0 Branch: refs/heads/0.12.0 Commit: 979ad2a090600495ad3b47de462d39b2f2ab33ea Parents: 8636b0c Author: Raul Agepati <[email protected]> Authored: Mon Mar 5 15:05:10 2018 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Mar 5 15:05:10 2018 -0800 ---------------------------------------------------------------------- .../gobblin/source/extractor/filebased/FileBasedSource.java | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/979ad2a0/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java index 46a0de0..e34a28f 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java @@ -152,6 +152,11 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> { // file is not pulled this run } } + // Update the snapshot from the previous run with the new files processed in this run + // Otherwise a corrupt file could cause re-processing of already processed files + for (WorkUnit workUnit : previousWorkUnitsForRetry) { + workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT, StringUtils.join(effectiveSnapshot, ",")); + } if (!filesToPull.isEmpty()) { logFilesToPull(filesToPull);
