This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 101abec  [GOBBLIN-697] Implementation of data file versioning and 
preservation in distcp.
101abec is described below

commit 101abec7abc75c63c5afb63b6727dbd5781ea7a0
Author: ibuenros <[email protected]>
AuthorDate: Fri Mar 15 23:42:41 2019 -0700

    [GOBBLIN-697] Implementation of data file versioning and preservation in 
distcp.
    
    Closes #2568 from ibuenros/fileversion
---
 .../data/management/copy/PreserveAttributes.java   |   6 +-
 .../copy/publisher/CopyDataPublisher.java          |  19 +++
 .../embedded/EmbeddedGobblinDistcpTest.java        |  80 ++++++++++++-
 .../util/filesystem/DataFileVersionStrategy.java   | 129 +++++++++++++++++++++
 .../filesystem/ModTimeDataFileVersionStrategy.java |  68 +++++++++++
 5 files changed, 299 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
index 30eb972..b9ba027 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/PreserveAttributes.java
@@ -43,7 +43,8 @@ public class PreserveAttributes {
     BLOCK_SIZE('b'),
     OWNER('u'),
     GROUP('g'),
-    PERMISSION('p');
+    PERMISSION('p'),
+    VERSION('v');
 
     private final char token;
 
@@ -87,9 +88,10 @@ public class PreserveAttributes {
    * * u -> preserve owner
    * * g -> preserve group
    * * p -> preserve permissions
+   * * v -> preserve version
    * Characters not in this character set will be ignored.
    *
-   * @param s String of the form \[rbugp]*\
+   * @param s String of the form \[rbugpv]*\
    * @return Parsed {@link PreserveAttributes}
    */
   public static PreserveAttributes fromMnemonicString(String s) {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index cfd715a..4f7c0ed 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -27,6 +27,9 @@ import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -36,6 +39,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -84,6 +88,8 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
   protected final EventSubmitter eventSubmitter;
   protected final RecoveryHelper recoveryHelper;
   protected final Optional<LineageInfo> lineageInfo;
+  protected final DataFileVersionStrategy srcDataFileVersionStrategy;
+  protected final DataFileVersionStrategy dstDataFileVersionStrategy;
 
   /**
    * Build a new {@link CopyDataPublisher} from {@link State}. The constructor 
expects the following to be set in the
@@ -119,6 +125,12 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
 
     this.recoveryHelper = new RecoveryHelper(this.fs, state);
     this.recoveryHelper.purgeOldPersistedFile();
+
+    Config config = ConfigUtils.propertiesToConfig(state.getProperties());
+
+    this.srcDataFileVersionStrategy = DataFileVersionStrategy
+        
.instantiateDataFileVersionStrategy(HadoopUtils.getSourceFileSystem(state), 
config);
+    this.dstDataFileVersionStrategy = 
DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, config);
   }
 
   @Override
@@ -219,6 +231,13 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
       CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
       if (copyEntity instanceof CopyableFile) {
         CopyableFile copyableFile = (CopyableFile) copyEntity;
+
+        if 
(copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION)
+            && 
this.dstDataFileVersionStrategy.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE))
 {
+          
this.dstDataFileVersionStrategy.setVersion(copyableFile.getDestination(),
+              
this.srcDataFileVersionStrategy.getVersion(copyableFile.getOrigin().getPath()));
+        }
+
         if (wus.getWorkingState() == WorkingState.COMMITTED) {
           
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, 
copyableFile, wus);
           // Dataset Output path is injected in each copyableFile.
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index c12dea4..24b18a5 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -19,15 +19,24 @@ package org.apache.gobblin.runtime.embedded;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.api.client.util.Charsets;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
+import com.typesafe.config.Config;
 
 
 public class EmbeddedGobblinDistcpTest {
@@ -62,4 +71,73 @@ public class EmbeddedGobblinDistcpTest {
     Assert.assertTrue(new File(tmpTarget, fileName).exists());
   }
 
+  @Test
+  public void testWithVersionPreserve() throws Exception {
+    String fileName = "file";
+
+    File tmpSource = Files.createTempDir();
+    tmpSource.deleteOnExit();
+    File tmpTarget = Files.createTempDir();
+    tmpTarget.deleteOnExit();
+
+    File tmpFile = new File(tmpSource, fileName);
+    tmpFile.createNewFile();
+
+    FileOutputStream os = new FileOutputStream(tmpFile);
+    for (int i = 0; i < 100; i++) {
+      os.write("myString".getBytes(Charsets.UTF_8));
+    }
+    os.close();
+
+    MyDataFileVersion versionStrategy = new MyDataFileVersion();
+    versionStrategy.setVersion(new Path(tmpFile.getAbsolutePath()), 123L);
+
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertFalse(new File(tmpTarget, fileName).exists());
+
+    EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new 
Path(tmpSource.getAbsolutePath()),
+        new Path(tmpTarget.getAbsolutePath()));
+    embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
+    
embedded.setConfiguration(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY,
 MyDataFileVersion.class.getName());
+    embedded.setConfiguration(CopyConfiguration.PRESERVE_ATTRIBUTES_KEY, "v");
+    embedded.run();
+
+    Assert.assertTrue(new File(tmpSource, fileName).exists());
+    Assert.assertTrue(new File(tmpTarget, fileName).exists());
+    Assert.assertEquals((long) versionStrategy.getVersion(new 
Path(tmpTarget.getAbsolutePath(), fileName)), 123l);
+  }
+
+  public static class MyDataFileVersion implements 
DataFileVersionStrategy<Long>, 
DataFileVersionStrategy.DataFileVersionFactory<Long> {
+    private static final Map<Path, Long> versions = new HashMap<>();
+
+    @Override
+    public DataFileVersionStrategy<Long> 
createDataFileVersionStrategy(FileSystem fs, Config config) {
+      return this;
+    }
+
+    @Override
+    public Long getVersion(Path path)
+        throws IOException {
+      return versions.get(PathUtils.getPathWithoutSchemeAndAuthority(path));
+    }
+
+    @Override
+    public boolean setVersion(Path path, Long version)
+        throws IOException {
+      versions.put(PathUtils.getPathWithoutSchemeAndAuthority(path), version);
+      return true;
+    }
+
+    @Override
+    public boolean setDefaultVersion(Path path)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public Set<Characteristic> applicableCharacteristics() {
+      return Sets.newHashSet(Characteristic.SETTABLE);
+    }
+  }
+
 }
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
new file mode 100644
index 0000000..ca237ee
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * An interface to set and get "versions" to data files.
+ *
+ * The version is a rough signature to the data contents. It allows data 
preserving functionality (like copy) to replicate
+ * the version independently of metadata with other semantics like file 
modification time.
+ *
+ * Examples where this might be useful is data syncing between two locations. 
Relying on modification times to detect
+ * data changes may lead to a feedback loop of copying: data gets created at 
location A at time 0,
+ * at time 1 data is copied to location B, sync mechanism might incorrectly 
believe that since mod time of location B
+ * is higher, it should be synced back to location A, etc.
+ *
+ * Required properties:
+ * - REPLICABLE: Two calls to `getVersion` on a file that has clearly not been 
modified must return the same version.
+ * - MONOTONOUS: The default version of a file is an increasing function of 
modification time.
+ * - CONSERVATIVE: If file f had its version last set to v, but the versioning 
implementation determines the file MIGHT
+ *                 have been modified and it chooses to return a version, it 
will return a value strictly larger than v.
+ *
+ * A common pattern to achieve monotonicity and conservativeness will be to 
invalidate the version of a data file
+ * if it is detected that the file was modified without updating the version 
(e.g. by a process which is unaware of versioning).
+ *
+ * @param <T> the type for the version objects. Must be comparable and 
serializable.
+ */
+public interface DataFileVersionStrategy<T extends Comparable<T> & 
Serializable> {
+
+  /**
+   * Characteristics a {@link DataFileVersionStrategy} may have.
+   */
+  enum Characteristic {
+    /** The default version for a data file is the modtime of the file. 
Versions can in general be compared against modtimes. */
+    COMPATIBLE_WITH_MODTIME,
+    /** Version can be explicitly set. If false, `set*` methods will always 
return false */
+    SETTABLE,
+    /** If a file has been modified and a set* method was not called, 
`getVersion` will throw an error. */
+    STRICT
+  }
+
+  String DATA_FILE_VERSION_STRATEGY_KEY = 
"org.apache.gobblin.dataFileVersionStrategy";
+
+  /**
+   * Instantiate a {@link DataFileVersionStrategy} according to input 
configuration.
+   */
+  static DataFileVersionStrategy instantiateDataFileVersionStrategy(FileSystem 
fs, Config config) throws IOException {
+    String versionStrategy = ConfigUtils.getString(config, 
DATA_FILE_VERSION_STRATEGY_KEY, 
ModTimeDataFileVersionStrategy.Factory.class.getName());
+
+    ClassAliasResolver resolver = new 
ClassAliasResolver(DataFileVersionFactory.class);
+
+    try {
+      Class<? extends DataFileVersionFactory> klazz = 
resolver.resolveClass(versionStrategy);
+      return klazz.newInstance().createDataFileVersionStrategy(fs, config);
+    } catch (ReflectiveOperationException roe) {
+      throw new IOException(roe);
+    }
+  }
+
+  /**
+   * A Factory for {@link DataFileVersionStrategy}s.
+   */
+  interface DataFileVersionFactory<T extends Comparable<T> & Serializable> {
+    /**
+     * Build a {@link DataFileVersionStrategy} with the input configuration.
+     */
+    DataFileVersionStrategy<T> createDataFileVersionStrategy(FileSystem fs, 
Config config);
+  }
+
+  /**
+   * Get the version of a path.
+   */
+  T getVersion(Path path) throws IOException;
+
+  /**
+   * Set the version of a path to a specific version (generally replicated 
from another path).
+   *
+   * @return false if the version is not settable.
+   * @throws IOException if the version is settable but could not be set 
successfully.
+   */
+  boolean setVersion(Path path, T version) throws IOException;
+
+  /**
+   * Set the version of a path to a value automatically set by the versioning 
system. Note this call must respect the
+   * monotonicity requirement.
+   *
+   * @return false if the version is not settable.
+   * @throws IOException if the version is settable but could not be set 
successfully.
+   */
+  boolean setDefaultVersion(Path path) throws IOException;
+
+  /**
+   * @return The list of optional characteristics this {@link 
DataFileVersionStrategy} satisfies.
+   */
+  Set<Characteristic> applicableCharacteristics();
+
+  /**
+   * @return whether this implementation have the specified characteristic.
+   */
+  default boolean hasCharacteristic(Characteristic characteristic) {
+    return applicableCharacteristics().contains(characteristic);
+  }
+
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
new file mode 100644
index 0000000..2f2fa22
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.ImmutableSet;
+import com.typesafe.config.Config;
+
+import lombok.Data;
+
+
+/**
+ * An implementation of {@link DataFileVersionStrategy} that uses modtime as 
the file version.
+ *
+ * This is the default implementation and does data comparisons purely based 
on modification time.
+ */
+@Data
+public class ModTimeDataFileVersionStrategy implements 
DataFileVersionStrategy<Long> {
+
+       public static class Factory implements 
DataFileVersionStrategy.DataFileVersionFactory<Long> {
+               @Override
+               public DataFileVersionStrategy<Long> 
createDataFileVersionStrategy(FileSystem fs, Config config) {
+                       return new ModTimeDataFileVersionStrategy(fs);
+               }
+       }
+
+  private final FileSystem fs;
+
+  @Override
+  public Long getVersion(Path path) throws IOException {
+    return this.fs.getFileStatus(path).getModificationTime();
+  }
+
+  @Override
+  public boolean setVersion(Path path, Long version) {
+    return false;
+  }
+
+  @Override
+  public boolean setDefaultVersion(Path path) {
+    return false;
+  }
+
+  @Override
+  public Set<Characteristic> applicableCharacteristics() {
+    return ImmutableSet.of(Characteristic.COMPATIBLE_WITH_MODTIME);
+  }
+}

Reply via email to