This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 101abec [GOBBLIN-697] Implementation of data file versioning and
preservation in distcp.
101abec is described below
commit 101abec7abc75c63c5afb63b6727dbd5781ea7a0
Author: ibuenros <[email protected]>
AuthorDate: Fri Mar 15 23:42:41 2019 -0700
[GOBBLIN-697] Implementation of data file versioning and preservation in
distcp.
Closes #2568 from ibuenros/fileversion
---
.../data/management/copy/PreserveAttributes.java | 6 +-
.../copy/publisher/CopyDataPublisher.java | 19 +++
.../embedded/EmbeddedGobblinDistcpTest.java | 80 ++++++++++++-
.../util/filesystem/DataFileVersionStrategy.java | 129 +++++++++++++++++++++
.../filesystem/ModTimeDataFileVersionStrategy.java | 68 +++++++++++
5 files changed, 299 insertions(+), 3 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
index 30eb972..b9ba027 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
@@ -43,7 +43,8 @@ public class PreserveAttributes {
BLOCK_SIZE('b'),
OWNER('u'),
GROUP('g'),
- PERMISSION('p');
+ PERMISSION('p'),
+ VERSION('v');
private final char token;
@@ -87,9 +88,10 @@ public class PreserveAttributes {
* * u -> preserve owner
* * g -> preserve group
* * p -> preserve permissions
+ * * v -> preserve version
* Characters not in this character set will be ignored.
*
- * @param s String of the form \[rbugp]*\
+ * @param s String of the form \[rbugpv]*\
* @return Parsed {@link PreserveAttributes}
*/
public static PreserveAttributes fromMnemonicString(String s) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index cfd715a..4f7c0ed 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -27,6 +27,9 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,6 +39,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.typesafe.config.Config;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -84,6 +88,8 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
protected final EventSubmitter eventSubmitter;
protected final RecoveryHelper recoveryHelper;
protected final Optional<LineageInfo> lineageInfo;
+ protected final DataFileVersionStrategy srcDataFileVersionStrategy;
+ protected final DataFileVersionStrategy dstDataFileVersionStrategy;
/**
* Build a new {@link CopyDataPublisher} from {@link State}. The constructor
expects the following to be set in the
@@ -119,6 +125,12 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
this.recoveryHelper = new RecoveryHelper(this.fs, state);
this.recoveryHelper.purgeOldPersistedFile();
+
+ Config config = ConfigUtils.propertiesToConfig(state.getProperties());
+
+ this.srcDataFileVersionStrategy = DataFileVersionStrategy
+
.instantiateDataFileVersionStrategy(HadoopUtils.getSourceFileSystem(state),
config);
+ this.dstDataFileVersionStrategy =
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, config);
}
@Override
@@ -219,6 +231,13 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
if (copyEntity instanceof CopyableFile) {
CopyableFile copyableFile = (CopyableFile) copyEntity;
+
+ if
(copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION)
+ &&
this.dstDataFileVersionStrategy.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE))
{
+
this.dstDataFileVersionStrategy.setVersion(copyableFile.getDestination(),
+
this.srcDataFileVersionStrategy.getVersion(copyableFile.getOrigin().getPath()));
+ }
+
if (wus.getWorkingState() == WorkingState.COMMITTED) {
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter,
copyableFile, wus);
// Dataset Output path is injected in each copyableFile.
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index c12dea4..24b18a5 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -19,15 +19,24 @@ package org.apache.gobblin.runtime.embedded;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.api.client.util.Charsets;
+import com.google.common.collect.Sets;
import com.google.common.io.Files;
+import com.typesafe.config.Config;
public class EmbeddedGobblinDistcpTest {
@@ -62,4 +71,73 @@ public class EmbeddedGobblinDistcpTest {
Assert.assertTrue(new File(tmpTarget, fileName).exists());
}
+ @Test
+ public void testWithVersionPreserve() throws Exception {
+ String fileName = "file";
+
+ File tmpSource = Files.createTempDir();
+ tmpSource.deleteOnExit();
+ File tmpTarget = Files.createTempDir();
+ tmpTarget.deleteOnExit();
+
+ File tmpFile = new File(tmpSource, fileName);
+ tmpFile.createNewFile();
+
+ FileOutputStream os = new FileOutputStream(tmpFile);
+ for (int i = 0; i < 100; i++) {
+ os.write("myString".getBytes(Charsets.UTF_8));
+ }
+ os.close();
+
+ MyDataFileVersion versionStrategy = new MyDataFileVersion();
+ versionStrategy.setVersion(new Path(tmpFile.getAbsolutePath()), 123L);
+
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+ EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new
Path(tmpSource.getAbsolutePath()),
+ new Path(tmpTarget.getAbsolutePath()));
+ embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+
embedded.setConfiguration(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY,
MyDataFileVersion.class.getName());
+ embedded.setConfiguration(CopyConfiguration.PRESERVE_ATTRIBUTES_KEY, "v");
+ embedded.run();
+
+ Assert.assertTrue(new File(tmpSource, fileName).exists());
+ Assert.assertTrue(new File(tmpTarget, fileName).exists());
+ Assert.assertEquals((long) versionStrategy.getVersion(new
Path(tmpTarget.getAbsolutePath(), fileName)), 123l);
+ }
+
+ public static class MyDataFileVersion implements
DataFileVersionStrategy<Long>,
DataFileVersionStrategy.DataFileVersionFactory<Long> {
+ private static final Map<Path, Long> versions = new HashMap<>();
+
+ @Override
+ public DataFileVersionStrategy<Long>
createDataFileVersionStrategy(FileSystem fs, Config config) {
+ return this;
+ }
+
+ @Override
+ public Long getVersion(Path path)
+ throws IOException {
+ return versions.get(PathUtils.getPathWithoutSchemeAndAuthority(path));
+ }
+
+ @Override
+ public boolean setVersion(Path path, Long version)
+ throws IOException {
+ versions.put(PathUtils.getPathWithoutSchemeAndAuthority(path), version);
+ return true;
+ }
+
+ @Override
+ public boolean setDefaultVersion(Path path)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public Set<Characteristic> applicableCharacteristics() {
+ return Sets.newHashSet(Characteristic.SETTABLE);
+ }
+ }
+
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
new file mode 100644
index 0000000..ca237ee
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * An interface to set and get "versions" to data files.
+ *
+ * The version is a rough signature to the data contents. It allows data
preserving functionality (like copy) to replicate
+ * the version independently of metadata with other semantics like file
modification time.
+ *
+ * Examples where this might be useful is data syncing between two locations.
Relying on modification times to detect
+ * data changes may lead to a feedback loop of copying: data gets created at
location A at time 0,
+ * at time 1 data is copied to location B, sync mechanism might incorrectly
believe that since mod time of location B
+ * is higher, it should be synced back to location A, etc.
+ *
+ * Required properties:
+ * - REPLICABLE: Two calls to `getVersion` on a file that has clearly not been
modified must return the same version.
+ * - MONOTONOUS: The default version of a file is an increasing function of
modification time.
+ * - CONSERVATIVE: If file f had its version last set to v, but the versioning
implementation determines the file MIGHT
+ * have been modified and it chooses to return a version, it
will return a value strictly larger than v.
+ *
+ * A common pattern to achieve monotonicity and conservativeness will be to
invalidate the version of a data file
+ * if it is detected that the file was modified without updating the version
(e.g. by a process which is unaware of versioning).
+ *
+ * @param <T> the type for the version objects. Must be comparable and
serializable.
+ */
+public interface DataFileVersionStrategy<T extends Comparable<T> &
Serializable> {
+
+ /**
+ * Characteristics a {@link DataFileVersionStrategy} may have.
+ */
+ enum Characteristic {
+ /** The default version for a data file is the modtime of the file.
Versions can in general be compared against modtimes. */
+ COMPATIBLE_WITH_MODTIME,
+ /** Version can be explicitly set. If false, `set*` methods will always
return false */
+ SETTABLE,
+ /** If a file has been modified and a set* method was not called,
`getVersion` will throw an error. */
+ STRICT
+ }
+
+ String DATA_FILE_VERSION_STRATEGY_KEY =
"org.apache.gobblin.dataFileVersionStrategy";
+
+ /**
+ * Instantiate a {@link DataFileVersionStrategy} according to input
configuration.
+ */
+ static DataFileVersionStrategy instantiateDataFileVersionStrategy(FileSystem
fs, Config config) throws IOException {
+ String versionStrategy = ConfigUtils.getString(config,
DATA_FILE_VERSION_STRATEGY_KEY,
ModTimeDataFileVersionStrategy.Factory.class.getName());
+
+ ClassAliasResolver resolver = new
ClassAliasResolver(DataFileVersionFactory.class);
+
+ try {
+ Class<? extends DataFileVersionFactory> klazz =
resolver.resolveClass(versionStrategy);
+ return klazz.newInstance().createDataFileVersionStrategy(fs, config);
+ } catch (ReflectiveOperationException roe) {
+ throw new IOException(roe);
+ }
+ }
+
+ /**
+ * A Factory for {@link DataFileVersionStrategy}s.
+ */
+ interface DataFileVersionFactory<T extends Comparable<T> & Serializable> {
+ /**
+ * Build a {@link DataFileVersionStrategy} with the input configuration.
+ */
+ DataFileVersionStrategy<T> createDataFileVersionStrategy(FileSystem fs,
Config config);
+ }
+
+ /**
+ * Get the version of a path.
+ */
+ T getVersion(Path path) throws IOException;
+
+ /**
+ * Set the version of a path to a specific version (generally replicated
from another path).
+ *
+ * @return false if the version is not settable.
+ * @throws IOException if the version is settable but could not be set
successfully.
+ */
+ boolean setVersion(Path path, T version) throws IOException;
+
+ /**
+ * Set the version of a path to a value automatically set by the versioning
system. Note this call must respect the
+ * monotonicity requirement.
+ *
+ * @return false if the version is not settable.
+ * @throws IOException if the version is settable but could not be set
successfully.
+ */
+ boolean setDefaultVersion(Path path) throws IOException;
+
+ /**
+ * @return The list of optional characteristics this {@link
DataFileVersionStrategy} satisfies.
+ */
+ Set<Characteristic> applicableCharacteristics();
+
+ /**
+ * @return whether this implementation have the specified characteristic.
+ */
+ default boolean hasCharacteristic(Characteristic characteristic) {
+ return applicableCharacteristics().contains(characteristic);
+ }
+
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
new file mode 100644
index 0000000..2f2fa22
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.ImmutableSet;
+import com.typesafe.config.Config;
+
+import lombok.Data;
+
+
+/**
+ * An implementation of {@link DataFileVersionStrategy} that uses modtime as
the file version.
+ *
+ * This is the default implementation and does data comparisons purely based
on modification time.
+ */
+@Data
+public class ModTimeDataFileVersionStrategy implements
DataFileVersionStrategy<Long> {
+
+ public static class Factory implements
DataFileVersionStrategy.DataFileVersionFactory<Long> {
+ @Override
+ public DataFileVersionStrategy<Long>
createDataFileVersionStrategy(FileSystem fs, Config config) {
+ return new ModTimeDataFileVersionStrategy(fs);
+ }
+ }
+
+ private final FileSystem fs;
+
+ @Override
+ public Long getVersion(Path path) throws IOException {
+ return this.fs.getFileStatus(path).getModificationTime();
+ }
+
+ @Override
+ public boolean setVersion(Path path, Long version) {
+ return false;
+ }
+
+ @Override
+ public boolean setDefaultVersion(Path path) {
+ return false;
+ }
+
+ @Override
+ public Set<Characteristic> applicableCharacteristics() {
+ return ImmutableSet.of(Characteristic.COMPATIBLE_WITH_MODTIME);
+ }
+}