This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be600217a Incorporate the mod time of enclosing dirs into the
`SourceHadoopFsEndPoint.getWatermark` (#3859)
be600217a is described below
commit be600217aff57d851cd82b72bc59b2639a3465fb
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Jan 12 14:00:36 2024 -0800
Incorporate the mod time of enclosing dirs into the
`SourceHadoopFsEndPoint.getWatermark` (#3859)
---
.../copy/replication/SourceHadoopFsEndPoint.java | 31 +++++++++++++++-------
.../org/apache/gobblin/util/FileListUtils.java | 13 +++++++--
2 files changed, 32 insertions(+), 12 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
index d74a434e1..136492c23 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.data.management.copy.replication;
+import java.util.List;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -27,6 +28,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.api.client.util.Lists;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.typesafe.config.Config;
@@ -34,8 +39,6 @@ import com.typesafe.config.Config;
import org.apache.gobblin.source.extractor.ComparableWatermark;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.util.FileListUtils;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -73,28 +76,36 @@ public class SourceHadoopFsEndPoint extends
HadoopFsEndPoint {
long curTs = -1;
FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
+ // root dirs MUST participate in the watermark calculation, despite
themselves not belonging among the files to copy (i.e. `allFileStatus`).
+ // this is necessary because some compute engines, like spark, first
write files to a temp subdir beneath the ultimate dest dir. (we don't
+ // consider temp subdirs valid path and will skip these.) once all
executors have written their file, spark moves each from that temp subdir
+ // up to the enclosing ultimate dir location. such file movement DOES
NOT update the mod time of the file itself, only its enclosing dir.
+ // unless we incorporate the enclosing dir's mod time into the
watermark, we'd miss such changes to the enclosing root dir's contents.
+ List<FileStatus> rootFileStatuses = Lists.newArrayList();
Collection<Path> validPaths =
ReplicationDataValidPathPicker.getValidPaths(this);
for (Path p : validPaths) {
try {
- this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p,
super.getPathFilter(), super.isApplyFilterToDirectories()));
+ FileStatus fileStatus = fs.getFileStatus(p);
+ rootFileStatuses.add(fileStatus);
+ this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs,
fileStatus, super.getPathFilter(), super.isApplyFilterToDirectories()));
} catch (Exception e) {
log.error(String.format("Error while try read file in directory %s
to get watermark", p.toString()));
}
}
- for (FileStatus f : this.allFileStatus) {
+ for (FileStatus f : rootFileStatuses) {
if (f.getModificationTime() > curTs) {
curTs = f.getModificationTime();
}
}
-
- ComparableWatermark result = new LongWatermark(curTs);
- this.cachedWatermark = Optional.of(result);
-
- if (this.cachedWatermark.isPresent()) {
- this.initialized = true;
+ for (FileStatus f : this.allFileStatus) {
+ if (f.getModificationTime() > curTs) {
+ curTs = f.getModificationTime();
+ }
}
+ this.cachedWatermark = Optional.of(new LongWatermark(curTs));
+ this.initialized = true;
return this.cachedWatermark;
} catch (IOException e) {
log.error("Error while retrieve the watermark for " + this);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
index 6971bd64d..9d269c9a3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
@@ -125,8 +125,17 @@ public class FileListUtils {
public static List<FileStatus> listFilesRecursively(FileSystem fs, Path
path, PathFilter fileFilter,
boolean applyFilterToDirectories)
throws IOException {
- return listFilesRecursivelyHelper(fs, Lists.newArrayList(),
fs.getFileStatus(path), fileFilter,
- applyFilterToDirectories, false);
+ return listFilesRecursively(fs, fs.getFileStatus(path), fileFilter,
applyFilterToDirectories);
+ }
+
+ /**
+ * Helper method to list out all files under a specified {@link FileStatus}.
If applyFilterToDirectories is false, the supplied
+ * {@link PathFilter} will only be applied to files.
+ */
+ public static List<FileStatus> listFilesRecursively(FileSystem fs,
FileStatus beneathThisFile, PathFilter fileFilter,
+ boolean applyFilterToDirectories)
+ throws IOException {
+ return listFilesRecursivelyHelper(fs, Lists.newArrayList(),
beneathThisFile, fileFilter, applyFilterToDirectories, false);
}
private static List<FileStatus> listFilesRecursivelyHelper(FileSystem fs,
List<FileStatus> files,