This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be46c7c [GOBBLIN-1221] Preserve source file's ModTime by configuration
be46c7c is described below
commit be46c7c34ad4576f15d2cb08037c672bf17a8bd3
Author: Lei Sun <[email protected]>
AuthorDate: Thu Jul 30 10:46:15 2020 -0700
[GOBBLIN-1221] Preserve source file's ModTime by configuration
Preserve source file's ModTime By Configuration
Enhance the EmbeddedDistcpTest cleanup
Improve the test cases when running along with the
whole module
Address comments
Closes #3069 from autumnust/preserveTsInDistcp
---
.../data/management/copy/PreserveAttributes.java | 4 +-
.../copy/publisher/CopyDataPublisher.java | 52 +++++++++-----
.../embedded/EmbeddedGobblinDistcpTest.java | 80 +++++++++++++++++++++-
3 files changed, 113 insertions(+), 23 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
index b9ba027..8b01098 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
@@ -44,7 +44,8 @@ public class PreserveAttributes {
OWNER('u'),
GROUP('g'),
PERMISSION('p'),
- VERSION('v');
+ VERSION('v'),
+ MOD_TIME('t');
private final char token;
@@ -89,6 +90,7 @@ public class PreserveAttributes {
* * g -> preserve group
* * p -> preserve permissions
* * v -> preserve version
+ * * t -> preserve file's modTime
* Characters not in this character set will be ignored.
*
* @param s String of the form \[rbugpv]*\
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index f983c2a..41d4d85 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -186,8 +186,37 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
}
/**
+ * Unlike other preserving attributes of files (ownership, group, etc.),
which is preserved in writer,
+ * some of the attributes have to be set during publish phase like ModTime,
+ * and versionStrategy (usually relevant to modTime as well), since they are
subject to change with Publish(rename)
+ */
+ private void preserveFileAttrInPublisher(CopyableFile copyableFile) throws
IOException {
+ // Preserving File ModTime, and set the access time to an initializing
value when ModTime is declared to be preserved.
+ if
(copyableFile.getPreserve().preserve(PreserveAttributes.Option.MOD_TIME)) {
+ fs.setTimes(copyableFile.getDestination(),
copyableFile.getOriginTimestamp(), -1);
+ }
+
+ // Preserving File Version.
+ DataFileVersionStrategy srcVS = this.srcDataFileVersionStrategy;
+ DataFileVersionStrategy dstVS = this.dstDataFileVersionStrategy;
+
+ // Prefer to use copyableFile's specific version strategy
+ if (copyableFile.getDataFileVersionStrategy() != null) {
+ Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of(
+ DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY,
copyableFile.getDataFileVersionStrategy()));
+ srcVS =
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs,
versionStrategyConfig);
+ dstVS =
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs,
versionStrategyConfig);
+ }
+
+ if (copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION)
+ &&
dstVS.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) {
+ dstVS.setVersion(copyableFile.getDestination(),
+ srcVS.getVersion(copyableFile.getOrigin().getPath()));
+ }
+ }
+
+ /**
* Publish data for a {@link CopyableDataset}.
- *
*/
private void publishFileSet(CopyEntity.DatasetAndPartition
datasetAndPartition,
Collection<WorkUnitState> datasetWorkUnitStates) throws IOException {
@@ -233,23 +262,7 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
if (copyEntity instanceof CopyableFile) {
CopyableFile copyableFile = (CopyableFile) copyEntity;
- DataFileVersionStrategy srcVS = this.srcDataFileVersionStrategy;
- DataFileVersionStrategy dstVS = this.dstDataFileVersionStrategy;
-
- // Prefer to use copyableFile's specific version strategy
- if (copyableFile.getDataFileVersionStrategy() != null) {
- Config versionStrategyConfig =
ConfigFactory.parseMap(ImmutableMap.of(
- DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY,
copyableFile.getDataFileVersionStrategy()));
- srcVS =
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs,
versionStrategyConfig);
- dstVS =
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs,
versionStrategyConfig);
- }
-
- if
(copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION)
- &&
dstVS.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) {
- dstVS.setVersion(copyableFile.getDestination(),
- srcVS.getVersion(copyableFile.getOrigin().getPath()));
- }
-
+ preserveFileAttrInPublisher(copyableFile);
if (wus.getWorkingState() == WorkingState.COMMITTED) {
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter,
copyableFile, wus);
// Dataset Output path is injected in each copyableFile.
@@ -287,7 +300,8 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
additionalMetadata.put(SlaEventKeys.DATASET_OUTPUT_PATH,
fileSetRoot.or("Unknown"));
CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter,
datasetAndPartition,
Long.toString(datasetOriginTimestamp),
Long.toString(datasetUpstreamTimestamp), additionalMetadata);
- }
+ }
+
private static boolean hasCopyableFiles(Collection<WorkUnitState> workUnits)
throws IOException {
for (WorkUnitState wus : workUnits) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index a54ba95..c042ac1 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -163,10 +163,11 @@ public class EmbeddedGobblinDistcpTest {
}
if (metaStoreClient != null) {
- // Clean the source table and DB
- if (metaStoreClient.tableExists(TEST_DB, TEST_TABLE)) {
- metaStoreClient.dropTable(TEST_DB, TEST_TABLE);
+ // Clean out all tables in case there are any, to avoid db-drop failure.
+ for (String tblName : metaStoreClient.getAllTables(TEST_DB)) {
+ metaStoreClient.dropTable(TEST_DB, tblName);
}
+
if (metaStoreClient.getAllDatabases().contains(TEST_DB)) {
metaStoreClient.dropDatabase(TEST_DB);
}
@@ -279,6 +280,79 @@ public class EmbeddedGobblinDistcpTest {
Assert.assertEquals((long) versionStrategy.getVersion(new
Path(tmpTarget.getAbsolutePath(), fileName)), 123l);
}
+ @Test
+ public void testWithModTimePreserve() throws Exception {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ String fileName = "file";
+
+ File tmpSource = Files.createTempDir();
+ tmpSource.deleteOnExit();
+ File tmpTarget = Files.createTempDir();
+ tmpTarget.deleteOnExit();
+
+ File tmpFile = new File(tmpSource, fileName);
+ Assert.assertTrue(tmpFile.createNewFile());
+
+ FileOutputStream os = new FileOutputStream(tmpFile);
+ for (int i = 0; i < 100; i++) {
+ os.write("myString".getBytes(Charsets.UTF_8));
+ }
+ os.close();
+
+ long originalModTime = fs.getFileStatus(new
Path(tmpFile.getPath())).getModificationTime();
+ Assert.assertNotNull(originalModTime);
+
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+ EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new
Path(tmpSource.getAbsolutePath()),
+ new Path(tmpTarget.getAbsolutePath()));
+ embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+ embedded.setConfiguration(CopyConfiguration.PRESERVE_ATTRIBUTES_KEY, "t");
+ embedded.run();
+
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertTrue(new File(tmpTarget, fileName).exists());
+ Assert.assertEquals(fs.getFileStatus(new Path(new File(tmpTarget,
fileName).getAbsolutePath())).getModificationTime()
+ , originalModTime);
+ }
+
+ @Test
+ public void testWithModTimePreserveNegative() throws Exception {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ String fileName = "file_oh";
+
+ File tmpSource = Files.createTempDir();
+ tmpSource.deleteOnExit();
+ File tmpTarget = Files.createTempDir();
+ tmpTarget.deleteOnExit();
+
+ File tmpFile = new File(tmpSource, fileName);
+ Assert.assertTrue(tmpFile.createNewFile());
+
+ FileOutputStream os = new FileOutputStream(tmpFile);
+ for (int i = 0; i < 100; i++) {
+ os.write("myString".getBytes(Charsets.UTF_8));
+ }
+ os.close();
+
+ long originalModTime = fs.getFileStatus(new
Path(tmpFile.getPath())).getModificationTime();
+ Assert.assertFalse(new File(tmpTarget, fileName).exists());
+ // Give a minimal gap between file creation and copy
+ Thread.sleep(1000);
+
+ // Negative case, not preserving the timestamp.
+ tmpTarget.deleteOnExit();
+ EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new
Path(tmpSource.getAbsolutePath()),
+ new Path(tmpTarget.getAbsolutePath()));
+ embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+ embedded.run();
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertTrue(new File(tmpTarget, fileName).exists());
+ long newModTime = fs.getFileStatus(new Path(new File(tmpTarget,
fileName).getAbsolutePath())).getModificationTime();
+ Assert.assertTrue(newModTime != originalModTime);
+ }
+
public static class MyDataFileVersion implements
DataFileVersionStrategy<Long>,
DataFileVersionStrategy.DataFileVersionFactory<Long> {
private static final Map<Path, Long> versions = new HashMap<>();