[GOBBLIN-422] Update fs snapshot in previously failed workunits with the 
current effectiveSnapshot

Closes #2299 from ragepati/ragepati-
filebasedsource-prevfssnapshot


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/979ad2a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/979ad2a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/979ad2a0

Branch: refs/heads/0.12.0
Commit: 979ad2a090600495ad3b47de462d39b2f2ab33ea
Parents: 8636b0c
Author: Raul Agepati <[email protected]>
Authored: Mon Mar 5 15:05:10 2018 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Mon Mar 5 15:05:10 2018 -0800

----------------------------------------------------------------------
 .../gobblin/source/extractor/filebased/FileBasedSource.java     | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/979ad2a0/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 46a0de0..e34a28f 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -152,6 +152,11 @@ public abstract class FileBasedSource<S, D> extends 
AbstractSource<S, D> {
         // file is not pulled this run
       }
     }
+    // Update the snapshot from the previous run with the new files processed 
in this run
+    // Otherwise a corrupt file could cause re-processing of already processed 
files
+    for (WorkUnit workUnit : previousWorkUnitsForRetry) {
+      workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT, 
StringUtils.join(effectiveSnapshot, ","));
+    }
 
     if (!filesToPull.isEmpty()) {
       logFilesToPull(filesToPull);

Reply via email to