This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be46c7c  [GOBBLIN-1221] Preserve source file's ModTime by configuration
be46c7c is described below

commit be46c7c34ad4576f15d2cb08037c672bf17a8bd3
Author: Lei Sun <autumn...@gmail.com>
AuthorDate: Thu Jul 30 10:46:15 2020 -0700

    [GOBBLIN-1221] Preserve source file's ModTime by configuration
    
    Preserve source file's ModTime By Configuration
    
    Enhance the EmbeddedDistcpTest cleanup
    
    Improve the test cases when running along with the
    whole module
    
    Address comments
    
    Closes #3069 from autumnust/preserveTsInDistcp
---
 .../data/management/copy/PreserveAttributes.java   |  4 +-
 .../copy/publisher/CopyDataPublisher.java          | 52 +++++++++-----
 .../embedded/EmbeddedGobblinDistcpTest.java        | 80 +++++++++++++++++++++-
 3 files changed, 113 insertions(+), 23 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
index b9ba027..8b01098 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
@@ -44,7 +44,8 @@ public class PreserveAttributes {
     OWNER('u'),
     GROUP('g'),
     PERMISSION('p'),
-    VERSION('v');
+    VERSION('v'),
+    MOD_TIME('t');
 
     private final char token;
 
@@ -89,6 +90,7 @@ public class PreserveAttributes {
    * * g -> preserve group
    * * p -> preserve permissions
    * * v -> preserve version
+   * * t -> preserve file's modTime
    * Characters not in this character set will be ignored.
    *
    * @param s String of the form \[rbugpv]*\
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index f983c2a..41d4d85 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -186,8 +186,37 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
   }
 
   /**
+   * Unlike other preserving attributes of files (ownership, group, etc.), 
which is preserved in writer,
+   * some of the attributes have to be set during publish phase like ModTime,
+   * and versionStrategy (usually relevant to modTime as well), since they are 
subject to change with Publish(rename)
+   */
+  private void preserveFileAttrInPublisher(CopyableFile copyableFile) throws 
IOException {
+    // Preserving File ModTime, and set the access time to an initializing 
value when ModTime is declared to be preserved.
+    if 
(copyableFile.getPreserve().preserve(PreserveAttributes.Option.MOD_TIME)) {
+      fs.setTimes(copyableFile.getDestination(), 
copyableFile.getOriginTimestamp(), -1);
+    }
+
+    // Preserving File Version.
+    DataFileVersionStrategy srcVS = this.srcDataFileVersionStrategy;
+    DataFileVersionStrategy dstVS = this.dstDataFileVersionStrategy;
+
+    // Prefer to use copyableFile's specific version strategy
+    if (copyableFile.getDataFileVersionStrategy() != null) {
+      Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of(
+          DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, 
copyableFile.getDataFileVersionStrategy()));
+      srcVS = 
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs, 
versionStrategyConfig);
+      dstVS = 
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, 
versionStrategyConfig);
+    }
+
+    if (copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION)
+        && 
dstVS.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) {
+      dstVS.setVersion(copyableFile.getDestination(),
+          srcVS.getVersion(copyableFile.getOrigin().getPath()));
+    }
+  }
+
+  /**
    * Publish data for a {@link CopyableDataset}.
-   *
    */
   private void publishFileSet(CopyEntity.DatasetAndPartition 
datasetAndPartition,
       Collection<WorkUnitState> datasetWorkUnitStates) throws IOException {
@@ -233,23 +262,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
       CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
       if (copyEntity instanceof CopyableFile) {
         CopyableFile copyableFile = (CopyableFile) copyEntity;
-        DataFileVersionStrategy srcVS = this.srcDataFileVersionStrategy;
-        DataFileVersionStrategy dstVS = this.dstDataFileVersionStrategy;
-
-        // Prefer to use copyableFile's specific version strategy
-        if (copyableFile.getDataFileVersionStrategy() != null) {
-          Config versionStrategyConfig = 
ConfigFactory.parseMap(ImmutableMap.of(
-              DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, 
copyableFile.getDataFileVersionStrategy()));
-          srcVS = 
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs, 
versionStrategyConfig);
-          dstVS = 
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, 
versionStrategyConfig);
-        }
-
-        if 
(copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION)
-            && 
dstVS.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) {
-          dstVS.setVersion(copyableFile.getDestination(),
-              srcVS.getVersion(copyableFile.getOrigin().getPath()));
-        }
-
+        preserveFileAttrInPublisher(copyableFile);
         if (wus.getWorkingState() == WorkingState.COMMITTED) {
           
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, 
copyableFile, wus);
           // Dataset Output path is injected in each copyableFile.
@@ -287,7 +300,8 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     additionalMetadata.put(SlaEventKeys.DATASET_OUTPUT_PATH, 
fileSetRoot.or("Unknown"));
     
CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, 
datasetAndPartition,
         Long.toString(datasetOriginTimestamp), 
Long.toString(datasetUpstreamTimestamp), additionalMetadata);
-    }
+  }
+
 
   private static boolean hasCopyableFiles(Collection<WorkUnitState> workUnits) 
throws IOException {
     for (WorkUnitState wus : workUnits) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index a54ba95..c042ac1 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -163,10 +163,11 @@ public class EmbeddedGobblinDistcpTest {
     }
 
     if (metaStoreClient != null) {
-      // Clean the source table and DB
-      if (metaStoreClient.tableExists(TEST_DB, TEST_TABLE)) {
-        metaStoreClient.dropTable(TEST_DB, TEST_TABLE);
+      // Clean out all tables in case there are any, to avoid db-drop failure.
+      for (String tblName : metaStoreClient.getAllTables(TEST_DB)) {
+        metaStoreClient.dropTable(TEST_DB, tblName);
       }
+
       if (metaStoreClient.getAllDatabases().contains(TEST_DB)) {
         metaStoreClient.dropDatabase(TEST_DB);
       }
@@ -279,6 +280,79 @@ public class EmbeddedGobblinDistcpTest {
     Assert.assertEquals((long) versionStrategy.getVersion(new 
Path(tmpTarget.getAbsolutePath(), fileName)), 123l);
   }
 
+  @Test
+  public void testWithModTimePreserve() throws Exception {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    String fileName = "file";
+
+    File tmpSource = Files.createTempDir();
+    tmpSource.deleteOnExit();
+    File tmpTarget = Files.createTempDir();
+    tmpTarget.deleteOnExit();
+
+    File tmpFile = new File(tmpSource, fileName);
+    Assert.assertTrue(tmpFile.createNewFile());
+
+    FileOutputStream os = new FileOutputStream(tmpFile);
+    for (int i = 0; i < 100; i++) {
+      os.write("myString".getBytes(Charsets.UTF_8));
+    }
+    os.close();
+
+    long originalModTime = fs.getFileStatus(new 
Path(tmpFile.getPath())).getModificationTime();
+    Assert.assertNotNull(originalModTime);
+
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+    EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new 
Path(tmpSource.getAbsolutePath()),
+        new Path(tmpTarget.getAbsolutePath()));
+    embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+    embedded.setConfiguration(CopyConfiguration.PRESERVE_ATTRIBUTES_KEY, "t");
+    embedded.run();
+
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertTrue(new File(tmpTarget, fileName).exists());
+    Assert.assertEquals(fs.getFileStatus(new Path(new File(tmpTarget, 
fileName).getAbsolutePath())).getModificationTime()
+        , originalModTime);
+  }
+
+  @Test
+  public void testWithModTimePreserveNegative() throws Exception {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    String fileName = "file_oh";
+
+    File tmpSource = Files.createTempDir();
+    tmpSource.deleteOnExit();
+    File tmpTarget = Files.createTempDir();
+    tmpTarget.deleteOnExit();
+
+    File tmpFile = new File(tmpSource, fileName);
+    Assert.assertTrue(tmpFile.createNewFile());
+
+    FileOutputStream os = new FileOutputStream(tmpFile);
+    for (int i = 0; i < 100; i++) {
+      os.write("myString".getBytes(Charsets.UTF_8));
+    }
+    os.close();
+
+    long originalModTime = fs.getFileStatus(new 
Path(tmpFile.getPath())).getModificationTime();
+    Assert.assertFalse(new File(tmpTarget, fileName).exists());
+    // Give a minimal gap between file creation and copy
+    Thread.sleep(1000);
+
+    // Negative case, not preserving the timestamp.
+    tmpTarget.deleteOnExit();
+    EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new 
Path(tmpSource.getAbsolutePath()),
+        new Path(tmpTarget.getAbsolutePath()));
+    embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+    embedded.run();
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertTrue(new File(tmpTarget, fileName).exists());
+    long newModTime = fs.getFileStatus(new Path(new File(tmpTarget, 
fileName).getAbsolutePath())).getModificationTime();
+    Assert.assertTrue(newModTime != originalModTime);
+  }
+
   public static class MyDataFileVersion implements 
DataFileVersionStrategy<Long>, 
DataFileVersionStrategy.DataFileVersionFactory<Long> {
     private static final Map<Path, Long> versions = new HashMap<>();
 

Reply via email to