This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be600217a Incorporate the mod time of enclosing dirs into the 
`SourceHadoopFsEndPoint.getWatermark` (#3859)
be600217a is described below

commit be600217aff57d851cd82b72bc59b2639a3465fb
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Jan 12 14:00:36 2024 -0800

    Incorporate the mod time of enclosing dirs into the 
`SourceHadoopFsEndPoint.getWatermark` (#3859)
---
 .../copy/replication/SourceHadoopFsEndPoint.java   | 31 +++++++++++++++-------
 .../org/apache/gobblin/util/FileListUtils.java     | 13 +++++++--
 2 files changed, 32 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
index d74a434e1..136492c23 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.data.management.copy.replication;
 
+import java.util.List;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -27,6 +28,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.api.client.util.Lists;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
@@ -34,8 +39,6 @@ import com.typesafe.config.Config;
 import org.apache.gobblin.source.extractor.ComparableWatermark;
 import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.util.FileListUtils;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
 
 
 @Slf4j
@@ -73,28 +76,36 @@ public class SourceHadoopFsEndPoint extends 
HadoopFsEndPoint {
       long curTs = -1;
       FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
 
+      // root dirs MUST participate in the watermark calculation, despite 
themselves not belonging among the files to copy (i.e. `allFileStatus`).
+      // this is necessary because some compute engines, like spark, first 
write files to a temp subdir beneath the ultimate dest dir.  (we don't
+      // consider temp subdirs valid path and will skip these.)  once all 
executors have written their file, spark moves each from that temp subdir
+      // up to the enclosing ultimate dir location.  such file movement DOES 
NOT update the mod time of the file itself, only its enclosing dir.
+      // unless we incorporate the enclosing dir's mod time into the 
watermark, we'd miss such changes to the enclosing root dir's contents.
+      List<FileStatus> rootFileStatuses = Lists.newArrayList();
       Collection<Path> validPaths = 
ReplicationDataValidPathPicker.getValidPaths(this);
       for (Path p : validPaths) {
         try {
-          this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p, 
super.getPathFilter(), super.isApplyFilterToDirectories()));
+          FileStatus fileStatus = fs.getFileStatus(p);
+          rootFileStatuses.add(fileStatus);
+          this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, 
fileStatus, super.getPathFilter(), super.isApplyFilterToDirectories()));
         } catch (Exception e) {
           log.error(String.format("Error while try read file in directory %s 
to get watermark", p.toString()));
         }
       }
 
-      for (FileStatus f : this.allFileStatus) {
+      for (FileStatus f : rootFileStatuses) {
         if (f.getModificationTime() > curTs) {
           curTs = f.getModificationTime();
         }
       }
-
-      ComparableWatermark result = new LongWatermark(curTs);
-      this.cachedWatermark = Optional.of(result);
-
-      if (this.cachedWatermark.isPresent()) {
-        this.initialized = true;
+      for (FileStatus f : this.allFileStatus) {
+        if (f.getModificationTime() > curTs) {
+          curTs = f.getModificationTime();
+        }
       }
 
+      this.cachedWatermark = Optional.of(new LongWatermark(curTs));
+      this.initialized = true;
       return this.cachedWatermark;
     } catch (IOException e) {
       log.error("Error while retrieve the watermark for " + this);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
index 6971bd64d..9d269c9a3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
@@ -125,8 +125,17 @@ public class FileListUtils {
   public static List<FileStatus> listFilesRecursively(FileSystem fs, Path 
path, PathFilter fileFilter,
       boolean applyFilterToDirectories)
       throws IOException {
-    return listFilesRecursivelyHelper(fs, Lists.newArrayList(), 
fs.getFileStatus(path), fileFilter,
-        applyFilterToDirectories, false);
+    return listFilesRecursively(fs, fs.getFileStatus(path), fileFilter, 
applyFilterToDirectories);
+  }
+
+  /**
+   * Helper method to list out all files under a specified {@link FileStatus}. 
If applyFilterToDirectories is false, the supplied
+   * {@link PathFilter} will only be applied to files.
+   */
+  public static List<FileStatus> listFilesRecursively(FileSystem fs, 
FileStatus beneathThisFile, PathFilter fileFilter,
+      boolean applyFilterToDirectories)
+      throws IOException {
+    return listFilesRecursivelyHelper(fs, Lists.newArrayList(), 
beneathThisFile, fileFilter, applyFilterToDirectories, false);
   }
 
   private static List<FileStatus> listFilesRecursivelyHelper(FileSystem fs, 
List<FileStatus> files,

Reply via email to