This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dcdf8254e [GOBBLIN-2103] GaaS does not update flowgraph or templates 
if file lengths remain the same (#3990)
dcdf8254e is described below

commit dcdf8254e368c8a4a0caf2cba8c6e5ba9829f040
Author: abhishekmjain <[email protected]>
AuthorDate: Thu Jun 27 21:36:04 2024 +0530

    [GOBBLIN-2103] GaaS does not update flowgraph or templates if file lengths 
remain the same (#3990)
    
    * Fix update of templates if file length remains same
---
 .../apache/gobblin/util/SchedulerUtilsTest.java    | 24 ++++++++++++++++---
 .../gobblin/util/filesystem/FileStatusEntry.java   | 27 +++++++++++++++++++++-
 2 files changed, 47 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java 
b/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
index 476ba2ae4..67e100003 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
@@ -22,6 +22,7 @@ import com.google.common.io.Files;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.filesystem.PathAlterationListener;
 import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
 import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
@@ -58,6 +59,7 @@ public class SchedulerUtilsTest {
   private File subDir1;
   private File subDir11;
   private File subDir2;
+  private File subDir3;
 
   @BeforeClass
   public void setUp()
@@ -68,10 +70,12 @@ public class SchedulerUtilsTest {
     this.subDir1 = new File(this.jobConfigDir, "test1");
     this.subDir11 = new File(this.subDir1, "test11");
     this.subDir2 = new File(this.jobConfigDir, "test2");
+    this.subDir3 = new File(this.jobConfigDir, "test3");
 
     this.subDir1.mkdirs();
     this.subDir11.mkdirs();
     this.subDir2.mkdirs();
+    this.subDir3.mkdirs();
 
     Properties rootProps = new Properties();
     rootProps.setProperty("k1", "a1");
@@ -114,6 +118,11 @@ public class SchedulerUtilsTest {
     jobProps4.setProperty("k5", "b5");
     // test-job-conf-dir/test2/test21.PULL
     jobProps4.store(new FileWriter(new File(this.subDir2, "test21.PULL")), "");
+
+    Properties jobProps5 = new Properties();
+    jobProps5.setProperty("k1", "b1");
+    // test-job-conf-dir/test3/test31.PULL
+    jobProps5.store(new FileWriter(new File(this.subDir3, "test31.PULL")), "");
   }
 
   @Test
@@ -122,7 +131,7 @@ public class SchedulerUtilsTest {
     Properties properties = new Properties();
     properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, 
this.jobConfigDir.getAbsolutePath());
     List<Properties> jobConfigs = 
SchedulerUtils.loadGenericJobConfigs(properties, JobSpecResolver.mock());
-    Assert.assertEquals(jobConfigs.size(), 4);
+    Assert.assertEquals(jobConfigs.size(), 5);
 
     // test-job-conf-dir/test1/test11/test111.pull
     Properties jobProps1 = getJobConfigForFile(jobConfigs, "test111.pull");
@@ -263,12 +272,21 @@ public class SchedulerUtilsTest {
       File newJobConfigFile = new File(this.subDir11, "test112.pull");
       Files.append("k1=v1", newJobConfigFile, 
ConfigurationKeys.DEFAULT_CHARSET_ENCODING);
 
-      semaphore.acquire(3);
-      Assert.assertEquals(fileAltered.size(), 3);
+      // Create file with same content again
+      File sameContentFile = new File(this.subDir3, "test31.PULL");
+      Properties jobProps5 = new Properties();
+      jobProps5.setProperty("k1", "b1");
+      // test-job-conf-dir/test3/test31.PULL
+      jobProps5.store(new FileWriter(sameContentFile), "");
+
+      AssertWithBackoff.create().timeoutMs(2000).backoffFactor(2.0)
+          .assertEquals(input -> fileAltered.size(), 4, "should eventually 
succeed");
+      semaphore.acquire(4);
 
       Assert.assertTrue(fileAltered.contains(new Path("file:" + 
jobConfigFile)));
       Assert.assertTrue(fileAltered.contains(new Path("file:" + 
commonPropsFile)));
       Assert.assertTrue(fileAltered.contains(new Path("file:" + 
newJobConfigFile)));
+      Assert.assertTrue(fileAltered.contains(new Path("file:" + 
sameContentFile)));
     } finally {
       monitor.stop();
     }
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileStatusEntry.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileStatusEntry.java
index 24d11cfd9..12b658079 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileStatusEntry.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileStatusEntry.java
@@ -20,13 +20,19 @@ package org.apache.gobblin.util.filesystem;
 import com.google.common.base.Optional;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.FileTime;
+import java.nio.file.Paths;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import lombok.extern.slf4j.Slf4j;
 
+
+@Slf4j
 public class FileStatusEntry extends FileStatus {
 
   static final FileStatusEntry[] EMPTY_ENTRIES = new FileStatusEntry[0];
@@ -37,6 +43,8 @@ public class FileStatusEntry extends FileStatus {
   private boolean exists;
   private final FileSystem fs;
 
+  private FileTime changeTime;
+
   public Optional<FileStatus> _fileStatus;
 
   public FileStatusEntry(final Path path)
@@ -52,28 +60,34 @@ public class FileStatusEntry extends FileStatus {
     this.parent = parent;
     this.fs = fs;
     this._fileStatus = Optional.fromNullable(this.fs.getFileStatus(path));
+    this.changeTime = getChangeTime(path);
   }
 
   public boolean refresh(final Path path)
       throws IOException {
     if (_fileStatus.isPresent()) {
       Optional<FileStatus> oldStatus = this._fileStatus;
+      FileTime oldChangeTime = this.changeTime;
       try {
         this._fileStatus = Optional.of(this.fs.getFileStatus(path));
+        this.changeTime = getChangeTime(path);
         this.exists = this._fileStatus.isPresent();
 
+        // using ctime instead of modificationTime since modificationTime is 
set to start of epoch for a new file
         return (oldStatus.isPresent() != this._fileStatus.isPresent()
-            || oldStatus.get().getModificationTime() != 
this._fileStatus.get().getModificationTime()
+            || (oldChangeTime != null && 
!oldChangeTime.equals(this.changeTime))
             || oldStatus.get().isDirectory() != 
this._fileStatus.get().isDirectory()
             || oldStatus.get().getLen() != this._fileStatus.get().getLen());
       } catch (FileNotFoundException e) {
         _fileStatus = Optional.absent();
+        this.changeTime = null;
         this.exists = false;
         return true;
       }
     } else {
       if (path.getFileSystem(new Configuration()).exists(path)) {
         _fileStatus = Optional.of(this.fs.getFileStatus(path));
+        this.changeTime = getChangeTime(path);
         return true;
       } else {
         return false;
@@ -182,4 +196,15 @@ public class FileStatusEntry extends FileStatus {
   public int hashCode() {
     return getPath().hashCode();
   }
+
+  private FileTime getChangeTime(Path path) {
+    try{
+      java.nio.file.Path filePath = Paths.get(path.toUri().getPath());
+      return (FileTime) Files.getAttribute(filePath, "unix:ctime");
+    }
+    catch (Exception e) {
+      log.info("Exception occurred while getting ctime for {}", 
path.toUri().getPath(), e);
+      return null;
+    }
+  }
 }

Reply via email to