This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bbf6761c [GOBBLIN-1824]Improving the Efficiency of Work Planning in 
Manifest-Based DistCp Jobs (#3686)
7bbf6761c is described below

commit 7bbf6761c63055eb9ecc9f2756d4b3d68b7a1b08
Author: Zihan Li <[email protected]>
AuthorDate: Wed May 3 14:17:31 2023 -0700

    [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based 
DistCp Jobs (#3686)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based 
DistCp Jobs
    
    * remove un-intended change
    
    * address comments and remove the multi threading since it does not improve 
too much
    
    * address comments
    
    * make cache TTL configurable
    
    * add comments to describe the difference of 
resolveReplicatedOwnerAndPermissionsRecursivelyWithCache and 
resolveReplicatedOwnerAndPermissionsRecursively
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../gobblin/data/management/copy/CopyableFile.java | 34 ++++++++++++++++++
 .../data/management/copy/ManifestBasedDataset.java | 41 ++++++++++++++--------
 2 files changed, 61 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 23c27b61d..85fa80f0f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,10 +17,13 @@
 
 package org.apache.gobblin.data.management.copy;
 
+import com.google.common.cache.Cache;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,6 +61,7 @@ import org.apache.gobblin.util.guid.Guid;
 @Setter
 @NoArgsConstructor(access = AccessLevel.PROTECTED)
 @EqualsAndHashCode(callSuper = true)
+@Slf4j
 public class CopyableFile extends CopyEntity implements File {
   private static final byte[] EMPTY_CHECKSUM = new byte[0];
 
@@ -375,6 +379,36 @@ public class CopyableFile extends CopyEntity implements 
File {
     return ownerAndPermissions;
   }
 
+  /**
+   * Compute the correct {@link OwnerAndPermission} obtained from replicating 
source owner and permissions and applying
+   * the {@link PreserveAttributes} rules for fromPath and every ancestor up 
to but excluding toPath.
+   * Unlike the resolveReplicatedOwnerAndPermissionsRecursively() method, this 
method utilizes permissionMap as a cache to minimize the number of calls to 
HDFS.
+   * It is recommended to use this method when recursively calculating 
permissions for numerous files that share the same ancestor.
+   *
+   * @return A list of the computed {@link OwnerAndPermission}s starting from 
fromPath, up to but excluding toPath.
+   * @throws IOException if toPath is not an ancestor of fromPath.
+   */
+  public static List<OwnerAndPermission> 
resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, 
Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration, Cache<String, 
OwnerAndPermission> permissionMap)
+      throws IOException, ExecutionException {
+
+    if (!PathUtils.isAncestor(toPath, fromPath)) {
+      throw new IOException(String.format("toPath %s must be an ancestor of 
fromPath %s.", toPath, fromPath));
+    }
+
+    List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
+    Path currentPath = fromPath;
+
+    while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, 
currentPath.getParent())) {
+      Path finalCurrentPath = currentPath;
+      ownerAndPermissions.add(permissionMap.get(finalCurrentPath.toString(), 
() -> resolveReplicatedOwnerAndPermission(sourceFs,
+          finalCurrentPath, copyConfiguration)));
+      currentPath = currentPath.getParent();
+    }
+
+    return ownerAndPermissions;
+  }
+
   public static Map<String, OwnerAndPermission> 
resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs, 
Path fromPath,
       Path toPath, CopyConfiguration copyConfiguration) throws IOException {
     
Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(), 
"Source path must be a directory.");
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 17de89458..fbb88640b 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.data.management.copy;
 
 import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.JsonIOException;
@@ -27,6 +29,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
@@ -47,12 +50,15 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
 
   private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
   private static final String COMMON_FILES_PARENT = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
+  private static final String PERMISSION_CACHE_TTL_SECONDS = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds";
+  private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
   private static final String DEFAULT_COMMON_FILES_PARENT = "/";
   private final FileSystem fs;
   private final Path manifestPath;
   private final Properties properties;
   private final boolean deleteFileThatNotExistOnSource;
   private final String commonFilesParent;
+  private final int permissionCacheTTLSeconds;
 
   public ManifestBasedDataset(final FileSystem fs, Path manifestPath, 
Properties properties) {
     this.fs = fs;
@@ -60,6 +66,7 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
     this.properties = properties;
     this.deleteFileThatNotExistOnSource = 
Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, 
"false"));
     this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, 
DEFAULT_COMMON_FILES_PARENT);
+    this.permissionCacheTTLSeconds = 
Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS, 
DEFAULT_PERMISSION_CACHE_TTL_SECONDS));
   }
 
   @Override
@@ -82,25 +89,31 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
     List<FileStatus> toDelete = Lists.newArrayList();
     //todo: put permission preserve logic here?
     try {
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      Cache<String, OwnerAndPermission> permissionMap = 
CacheBuilder.newBuilder().expireAfterAccess(permissionCacheTTLSeconds, 
TimeUnit.SECONDS).build();
+      int numFiles = 0;
       while (manifests.hasNext()) {
+        numFiles++;
+        CopyManifest.CopyableUnit file = manifests.next();
         //todo: We can use fileSet to partition the data in case of some 
softbound issue
         //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
-        CopyManifest.CopyableUnit file = manifests.next();
         Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
+        if (fs.exists(fileToCopy)) {
           boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, 
fileToCopy, configuration)
+          FileStatus srcFile = fs.getFileStatus(fileToCopy);
+          OwnerAndPermission replicatedPermission = 
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+          if (!existOnTarget || shouldCopy(targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+            CopyableFile.Builder copyableFileBuilder =
+                CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, 
configuration)
                     .fileSet(datasetURN())
                     .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
+                    .ancestorsOwnerAndPermission(
+                        
CopyableFile.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, 
fileToCopy.getParent(),
+                            new Path(commonFilesParent), configuration, 
permissionMap))
+                    .destinationOwnerAndPermission(replicatedPermission);
+            CopyableFile copyableFile = copyableFileBuilder.build();
+            copyableFile.setFsDatasets(fs, targetFs);
             copyEntities.add(copyableFile);
             if (existOnTarget && srcFile.isFile()) {
               // this is to match the existing publishing behavior where we 
won't rewrite the target when it's already existed
@@ -108,7 +121,7 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
           }
-        } else if (this.deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
+        } else if (deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)) {
           toDelete.add(targetFs.getFileStatus(fileToCopy));
         }
       }
@@ -117,6 +130,7 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, 
this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), 
step, 1));
       }
+      log.info(String.format("Workunits calculation took %s milliseconds to 
process %s files", System.currentTimeMillis() - startTime, numFiles));
     } catch (JsonIOException| JsonSyntaxException e) {
       //todo: update error message to point to a sample json file instead of 
schema which is hard to understand
       log.warn(String.format("Failed to read Manifest path %s on filesystem 
%s, please make sure it's in correct json format with schema"
@@ -134,11 +148,10 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
     return Collections.singleton(new FileSet.Builder<>(datasetURN(), 
this).add(copyEntities).build()).iterator();
   }
 
-  private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs, 
FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration 
copyConfiguration)
+  private static boolean shouldCopy(FileSystem targetFs, FileStatus 
fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
       throws IOException {
     if (fileInSource.isDirectory() || fileInSource.getModificationTime() == 
fileInTarget.getModificationTime()) {
       // if source is dir or source and dst has same version, we compare the 
permission to determine whether it needs another sync
-      OwnerAndPermission replicatedPermission = 
CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource, 
copyConfiguration);
       return !replicatedPermission.hasSameOwnerAndPermission(targetFs, 
fileInTarget);
     }
     return fileInSource.getModificationTime() > 
fileInTarget.getModificationTime();

Reply via email to