This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new be46c7c [GOBBLIN-1221] Preserve source file's ModTime by configuration be46c7c is described below commit be46c7c34ad4576f15d2cb08037c672bf17a8bd3 Author: Lei Sun <autumn...@gmail.com> AuthorDate: Thu Jul 30 10:46:15 2020 -0700 [GOBBLIN-1221] Preserve source file's ModTime by configuration Preserve source file's ModTime By Configuration Enhance the EmbeddedDistcpTest cleanup Improve the test cases when running along with the whole module Address comments Closes #3069 from autumnust/preserveTsInDistcp --- .../data/management/copy/PreserveAttributes.java | 4 +- .../copy/publisher/CopyDataPublisher.java | 52 +++++++++----- .../embedded/EmbeddedGobblinDistcpTest.java | 80 +++++++++++++++++++++- 3 files changed, 113 insertions(+), 23 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java index b9ba027..8b01098 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java @@ -44,7 +44,8 @@ public class PreserveAttributes { OWNER('u'), GROUP('g'), PERMISSION('p'), - VERSION('v'); + VERSION('v'), + MOD_TIME('t'); private final char token; @@ -89,6 +90,7 @@ public class PreserveAttributes { * * g -> preserve group * * p -> preserve permissions * * v -> preserve version + * * t -> preserve file's modTime * Characters not in this character set will be ignored. * * @param s String of the form \[rbugpv]*\ diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index f983c2a..41d4d85 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -186,8 +186,37 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl } /** + * Unlike other preserving attributes of files (ownership, group, etc.), which is preserved in writer, + * some of the attributes have to be set during publish phase like ModTime, + * and versionStrategy (usually relevant to modTime as well), since they are subject to change with Publish(rename) + */ + private void preserveFileAttrInPublisher(CopyableFile copyableFile) throws IOException { + // Preserving File ModTime, and set the access time to an initializing value when ModTime is declared to be preserved. + if (copyableFile.getPreserve().preserve(PreserveAttributes.Option.MOD_TIME)) { + fs.setTimes(copyableFile.getDestination(), copyableFile.getOriginTimestamp(), -1); + } + + // Preserving File Version. + DataFileVersionStrategy srcVS = this.srcDataFileVersionStrategy; + DataFileVersionStrategy dstVS = this.dstDataFileVersionStrategy; + + // Prefer to use copyableFile's specific version strategy + if (copyableFile.getDataFileVersionStrategy() != null) { + Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of( + DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, copyableFile.getDataFileVersionStrategy())); + srcVS = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs, versionStrategyConfig); + dstVS = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, versionStrategyConfig); + } + + if (copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION) + && dstVS.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) { + dstVS.setVersion(copyableFile.getDestination(), + srcVS.getVersion(copyableFile.getOrigin().getPath())); + } + } + + /** * Publish data for a {@link CopyableDataset}. - * */ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, Collection<WorkUnitState> datasetWorkUnitStates) throws IOException { @@ -233,23 +262,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus); if (copyEntity instanceof CopyableFile) { CopyableFile copyableFile = (CopyableFile) copyEntity; - DataFileVersionStrategy srcVS = this.srcDataFileVersionStrategy; - DataFileVersionStrategy dstVS = this.dstDataFileVersionStrategy; - - // Prefer to use copyableFile's specific version strategy - if (copyableFile.getDataFileVersionStrategy() != null) { - Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of( - DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, copyableFile.getDataFileVersionStrategy())); - srcVS = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs, versionStrategyConfig); - dstVS = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, versionStrategyConfig); - } - - if (copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION) - && dstVS.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) { - dstVS.setVersion(copyableFile.getDestination(), - srcVS.getVersion(copyableFile.getOrigin().getPath())); - } - + preserveFileAttrInPublisher(copyableFile); if (wus.getWorkingState() == WorkingState.COMMITTED) { CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus); // Dataset Output path is injected in each copyableFile. @@ -287,7 +300,8 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl additionalMetadata.put(SlaEventKeys.DATASET_OUTPUT_PATH, fileSetRoot.or("Unknown")); CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, datasetAndPartition, Long.toString(datasetOriginTimestamp), Long.toString(datasetUpstreamTimestamp), additionalMetadata); - } + } + private static boolean hasCopyableFiles(Collection<WorkUnitState> workUnits) throws IOException { for (WorkUnitState wus : workUnits) { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java index a54ba95..c042ac1 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java @@ -163,10 +163,11 @@ public class EmbeddedGobblinDistcpTest { } if (metaStoreClient != null) { - // Clean the source table and DB - if (metaStoreClient.tableExists(TEST_DB, TEST_TABLE)) { - metaStoreClient.dropTable(TEST_DB, TEST_TABLE); + // Clean out all tables in case there are any, to avoid db-drop failure. + for (String tblName : metaStoreClient.getAllTables(TEST_DB)) { + metaStoreClient.dropTable(TEST_DB, tblName); } + if (metaStoreClient.getAllDatabases().contains(TEST_DB)) { metaStoreClient.dropDatabase(TEST_DB); } @@ -279,6 +280,79 @@ public class EmbeddedGobblinDistcpTest { Assert.assertEquals((long) versionStrategy.getVersion(new Path(tmpTarget.getAbsolutePath(), fileName)), 123l); } + @Test + public void testWithModTimePreserve() throws Exception { + FileSystem fs = FileSystem.getLocal(new Configuration()); + String fileName = "file"; + + File tmpSource = Files.createTempDir(); + tmpSource.deleteOnExit(); + File tmpTarget = Files.createTempDir(); + tmpTarget.deleteOnExit(); + + File tmpFile = new File(tmpSource, fileName); + Assert.assertTrue(tmpFile.createNewFile()); + + FileOutputStream os = new FileOutputStream(tmpFile); + for (int i = 0; i < 100; i++) { + os.write("myString".getBytes(Charsets.UTF_8)); + } + os.close(); + + long originalModTime = fs.getFileStatus(new Path(tmpFile.getPath())).getModificationTime(); + Assert.assertNotNull(originalModTime); + + Assert.assertTrue(new File(tmpSource, fileName).exists()); + Assert.assertFalse(new File(tmpTarget, fileName).exists()); + + EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new Path(tmpSource.getAbsolutePath()), + new Path(tmpTarget.getAbsolutePath())); + embedded.setLaunchTimeout(30, TimeUnit.SECONDS); + embedded.setConfiguration(CopyConfiguration.PRESERVE_ATTRIBUTES_KEY, "t"); + embedded.run(); + + Assert.assertTrue(new File(tmpSource, fileName).exists()); + Assert.assertTrue(new File(tmpTarget, fileName).exists()); + Assert.assertEquals(fs.getFileStatus(new Path(new File(tmpTarget, fileName).getAbsolutePath())).getModificationTime() + , originalModTime); + } + + @Test + public void testWithModTimePreserveNegative() throws Exception { + FileSystem fs = FileSystem.getLocal(new Configuration()); + String fileName = "file_oh"; + + File tmpSource = Files.createTempDir(); + tmpSource.deleteOnExit(); + File tmpTarget = Files.createTempDir(); + tmpTarget.deleteOnExit(); + + File tmpFile = new File(tmpSource, fileName); + Assert.assertTrue(tmpFile.createNewFile()); + + FileOutputStream os = new FileOutputStream(tmpFile); + for (int i = 0; i < 100; i++) { + os.write("myString".getBytes(Charsets.UTF_8)); + } + os.close(); + + long originalModTime = fs.getFileStatus(new Path(tmpFile.getPath())).getModificationTime(); + Assert.assertFalse(new File(tmpTarget, fileName).exists()); + // Give a minimal gap between file creation and copy + Thread.sleep(1000); + + // Negative case, not preserving the timestamp. + tmpTarget.deleteOnExit(); + EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new Path(tmpSource.getAbsolutePath()), + new Path(tmpTarget.getAbsolutePath())); + embedded.setLaunchTimeout(30, TimeUnit.SECONDS); + embedded.run(); + Assert.assertTrue(new File(tmpSource, fileName).exists()); + Assert.assertTrue(new File(tmpTarget, fileName).exists()); + long newModTime = fs.getFileStatus(new Path(new File(tmpTarget, fileName).getAbsolutePath())).getModificationTime(); + Assert.assertTrue(newModTime != originalModTime); + } + public static class MyDataFileVersion implements DataFileVersionStrategy<Long>, DataFileVersionStrategy.DataFileVersionFactory<Long> { private static final Map<Path, Long> versions = new HashMap<>();