This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7bbf6761c [GOBBLIN-1824]Improving the Efficiency of Work Planning in
Manifest-Based DistCp Jobs (#3686)
7bbf6761c is described below
commit 7bbf6761c63055eb9ecc9f2756d4b3d68b7a1b08
Author: Zihan Li <[email protected]>
AuthorDate: Wed May 3 14:17:31 2023 -0700
[GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based
DistCp Jobs (#3686)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
* [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based
DistCp Jobs
* remove un-intended change
* address comments and remove the multi threading since it does not improve
too much
* address comments
* make cache TTL configurable
* add comments to describe the difference of
resolveReplicatedOwnerAndPermissionsRecursivelyWithCache and
resolveReplicatedOwnerAndPermissionsRecursively
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../gobblin/data/management/copy/CopyableFile.java | 34 ++++++++++++++++++
.../data/management/copy/ManifestBasedDataset.java | 41 ++++++++++++++--------
2 files changed, 61 insertions(+), 14 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 23c27b61d..85fa80f0f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,10 +17,13 @@
package org.apache.gobblin.data.management.copy;
+import com.google.common.cache.Cache;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -58,6 +61,7 @@ import org.apache.gobblin.util.guid.Guid;
@Setter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EqualsAndHashCode(callSuper = true)
+@Slf4j
public class CopyableFile extends CopyEntity implements File {
private static final byte[] EMPTY_CHECKSUM = new byte[0];
@@ -375,6 +379,36 @@ public class CopyableFile extends CopyEntity implements
File {
return ownerAndPermissions;
}
+ /**
+ * Compute the correct {@link OwnerAndPermission} obtained from replicating
source owner and permissions and applying
+ * the {@link PreserveAttributes} rules for fromPath and every ancestor up
to but excluding toPath.
+ * Unlike the resolveReplicatedOwnerAndPermissionsRecursively() method, this
method utilizes permissionMap as a cache to minimize the number of calls to
HDFS.
+ * It is recommended to use this method when recursively calculating
permissions for numerous files that share the same ancestor.
+ *
+ * @return A list of the computed {@link OwnerAndPermission}s starting from
fromPath, up to but excluding toPath.
+ * @throws IOException if toPath is not an ancestor of fromPath.
+ */
+ public static List<OwnerAndPermission>
resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs,
Path fromPath,
+ Path toPath, CopyConfiguration copyConfiguration, Cache<String,
OwnerAndPermission> permissionMap)
+ throws IOException, ExecutionException {
+
+ if (!PathUtils.isAncestor(toPath, fromPath)) {
+ throw new IOException(String.format("toPath %s must be an ancestor of
fromPath %s.", toPath, fromPath));
+ }
+
+ List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
+ Path currentPath = fromPath;
+
+ while (currentPath.getParent() != null && PathUtils.isAncestor(toPath,
currentPath.getParent())) {
+ Path finalCurrentPath = currentPath;
+ ownerAndPermissions.add(permissionMap.get(finalCurrentPath.toString(),
() -> resolveReplicatedOwnerAndPermission(sourceFs,
+ finalCurrentPath, copyConfiguration)));
+ currentPath = currentPath.getParent();
+ }
+
+ return ownerAndPermissions;
+ }
+
public static Map<String, OwnerAndPermission>
resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs,
Path fromPath,
Path toPath, CopyConfiguration copyConfiguration) throws IOException {
Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(),
"Source path must be a directory.");
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 17de89458..fbb88640b 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.data.management.copy;
import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonIOException;
@@ -27,6 +29,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
@@ -47,12 +50,15 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
private static final String COMMON_FILES_PARENT =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
+ private static final String PERMISSION_CACHE_TTL_SECONDS =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds";
+ private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
private static final String DEFAULT_COMMON_FILES_PARENT = "/";
private final FileSystem fs;
private final Path manifestPath;
private final Properties properties;
private final boolean deleteFileThatNotExistOnSource;
private final String commonFilesParent;
+ private final int permissionCacheTTLSeconds;
public ManifestBasedDataset(final FileSystem fs, Path manifestPath,
Properties properties) {
this.fs = fs;
@@ -60,6 +66,7 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
this.properties = properties;
this.deleteFileThatNotExistOnSource =
Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE,
"false"));
this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT,
DEFAULT_COMMON_FILES_PARENT);
+ this.permissionCacheTTLSeconds =
Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS,
DEFAULT_PERMISSION_CACHE_TTL_SECONDS));
}
@Override
@@ -82,25 +89,31 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
List<FileStatus> toDelete = Lists.newArrayList();
//todo: put permission preserve logic here?
try {
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ Cache<String, OwnerAndPermission> permissionMap =
CacheBuilder.newBuilder().expireAfterAccess(permissionCacheTTLSeconds,
TimeUnit.SECONDS).build();
+ int numFiles = 0;
while (manifests.hasNext()) {
+ numFiles++;
+ CopyManifest.CopyableUnit file = manifests.next();
//todo: We can use fileSet to partition the data in case of some
softbound issue
//todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
- CopyManifest.CopyableUnit file = manifests.next();
Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
+ if (fs.exists(fileToCopy)) {
boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile,
fileToCopy, configuration)
+ FileStatus srcFile = fs.getFileStatus(fileToCopy);
+ OwnerAndPermission replicatedPermission =
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+ if (!existOnTarget || shouldCopy(targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+ CopyableFile.Builder copyableFileBuilder =
+ CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy,
configuration)
.fileSet(datasetURN())
.datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
-
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
+ .ancestorsOwnerAndPermission(
+
CopyableFile.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs,
fileToCopy.getParent(),
+ new Path(commonFilesParent), configuration,
permissionMap))
+ .destinationOwnerAndPermission(replicatedPermission);
+ CopyableFile copyableFile = copyableFileBuilder.build();
+ copyableFile.setFsDatasets(fs, targetFs);
copyEntities.add(copyableFile);
if (existOnTarget && srcFile.isFile()) {
// this is to match the existing publishing behavior where we
won't rewrite the target when it's already existed
@@ -108,7 +121,7 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
- } else if (this.deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
+ } else if (deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)) {
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
@@ -117,6 +130,7 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete,
this.properties, Optional.<Path>absent());
copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(),
step, 1));
}
+ log.info(String.format("Workunits calculation took %s milliseconds to
process %s files", System.currentTimeMillis() - startTime, numFiles));
} catch (JsonIOException| JsonSyntaxException e) {
//todo: update error message to point to a sample json file instead of
schema which is hard to understand
log.warn(String.format("Failed to read Manifest path %s on filesystem
%s, please make sure it's in correct json format with schema"
@@ -134,11 +148,10 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
return Collections.singleton(new FileSet.Builder<>(datasetURN(),
this).add(copyEntities).build()).iterator();
}
- private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs,
FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration
copyConfiguration)
+ private static boolean shouldCopy(FileSystem targetFs, FileStatus
fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
throws IOException {
if (fileInSource.isDirectory() || fileInSource.getModificationTime() ==
fileInTarget.getModificationTime()) {
// if source is dir or source and dst has same version, we compare the
permission to determine whether it needs another sync
- OwnerAndPermission replicatedPermission =
CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource,
copyConfiguration);
return !replicatedPermission.hasSameOwnerAndPermission(targetFs,
fileInTarget);
}
return fileInSource.getModificationTime() >
fileInTarget.getModificationTime();