[
https://issues.apache.org/jira/browse/GOBBLIN-1824?focusedWorklogId=858821&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-858821
]
ASF GitHub Bot logged work on GOBBLIN-1824:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Apr/23 01:45
Start Date: 25/Apr/23 01:45
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175884690
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>>
getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}
CopyManifest.CopyableUnitIterator manifests = null;
- List<CopyEntity> copyEntities = Lists.newArrayList();
- List<FileStatus> toDelete = Lists.newArrayList();
+ List<CopyEntity> copyEntities =
Collections.synchronizedList(Lists.newArrayList());
+ List<FileStatus> toDelete =
Collections.synchronizedList(Lists.newArrayList());
//todo: put permission preserve logic here?
+ ExecutorService queueExecutor = null;
try {
+ queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ List<Future> pendingTask = new ArrayList<>();
+ Map<String, OwnerAndPermission> permissionMap = new
ConcurrentHashMap<>();
while (manifests.hasNext()) {
- //todo: We can use fileSet to partition the data in case of some
softbound issue
- //todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
CopyManifest.CopyableUnit file = manifests.next();
- Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
- boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile,
fileToCopy, configuration)
- .fileSet(datasetURN())
- .datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
-
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
- copyEntities.add(copyableFile);
- if (existOnTarget && srcFile.isFile()) {
- // this is to match the existing publishing behavior where we
won't rewrite the target when it's already existed
- // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ Future future = queueExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ //todo: We can use fileSet to partition the data in case of some
softbound issue
+ //todo: After partition, change this to directly return iterator
so that we can save time if we meet resources limitation
+ Path fileToCopy = new Path(file.fileName);
+ if (fs.exists(fileToCopy)) {
+ boolean existOnTarget = false;
+ try {
+ existOnTarget = targetFs.exists(fileToCopy);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
Review Comment:
a. what sorts of `IOException` are swallowed here?
b. is it appropriate to print every one (e.g. rather than logging at a
particular log level that could be adjusted by config)
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>>
getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}
CopyManifest.CopyableUnitIterator manifests = null;
- List<CopyEntity> copyEntities = Lists.newArrayList();
- List<FileStatus> toDelete = Lists.newArrayList();
+ List<CopyEntity> copyEntities =
Collections.synchronizedList(Lists.newArrayList());
+ List<FileStatus> toDelete =
Collections.synchronizedList(Lists.newArrayList());
//todo: put permission preserve logic here?
+ ExecutorService queueExecutor = null;
try {
+ queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ List<Future> pendingTask = new ArrayList<>();
+ Map<String, OwnerAndPermission> permissionMap = new
ConcurrentHashMap<>();
while (manifests.hasNext()) {
- //todo: We can use fileSet to partition the data in case of some
softbound issue
- //todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
CopyManifest.CopyableUnit file = manifests.next();
- Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
- boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile,
fileToCopy, configuration)
- .fileSet(datasetURN())
- .datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
-
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
- copyEntities.add(copyableFile);
- if (existOnTarget && srcFile.isFile()) {
- // this is to match the existing publishing behavior where we
won't rewrite the target when it's already existed
- // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ Future future = queueExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ //todo: We can use fileSet to partition the data in case of some
softbound issue
+ //todo: After partition, change this to directly return iterator
so that we can save time if we meet resources limitation
+ Path fileToCopy = new Path(file.fileName);
+ if (fs.exists(fileToCopy)) {
+ boolean existOnTarget = false;
+ try {
+ existOnTarget = targetFs.exists(fileToCopy);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ FileStatus srcFile = fs.getFileStatus(fileToCopy);
+ OwnerAndPermission replicatedPermission =
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+ if (!existOnTarget || shouldCopy(targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+ CopyableFile.Builder copyableFileBuilder =
+ CopyableFile.fromOriginAndDestination(fs, srcFile,
fileToCopy, configuration)
+ .fileSet(datasetURN())
+ .datasetOutputPath(fileToCopy.toString())
+ .ancestorsOwnerAndPermission(CopyableFile
+
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs,
fileToCopy.getParent(),
+ new Path(commonFilesParent), configuration,
permissionMap))
+ .destinationOwnerAndPermission(replicatedPermission);
+ CopyableFile copyableFile = copyableFileBuilder.build();
+ copyableFile.setFsDatasets(fs, targetFs);
+ copyEntities.add(copyableFile);
+ if (existOnTarget && srcFile.isFile()) {
+ // this is to match the existing publishing behavior where
we won't rewrite the target when it's already existed
+ // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ toDelete.add(targetFs.getFileStatus(fileToCopy));
+ }
+ }
+ } else if (deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
+ } catch (IOException e) {
+ log.error("meet exception:", e);
Review Comment:
in the case of a general FS misconfiguration (e.g. issue w/ target FS), does
this mean we'll now log one error message for every manifest entry, whereas
before, with sync processing, only the first one failed, and then we bail out
and go no further through the manifest?
and even so, swallowing the exception may hide issues (e.g. from the
automatic troubleshooter). why not allow them to percolate as
`j.u.concurrent.ExecutionException`s? (and unwrap back to the underlying
exception at a higher layer.)
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java:
##########
@@ -375,6 +377,35 @@ public static List<OwnerAndPermission>
resolveReplicatedOwnerAndPermissionsRecur
return ownerAndPermissions;
}
+ /**
+ * Compute the correct {@link OwnerAndPermission} obtained from replicating
source owner and permissions and applying
+ * the {@link PreserveAttributes} rules for fromPath and every ancestor up
to but excluding toPath.
+ * Use permissionMap as a cache to reduce the call to hdfs
+ *
+ * @return A list of the computed {@link OwnerAndPermission}s starting from
fromPath, up to but excluding toPath.
+ * @throws IOException if toPath is not an ancestor of fromPath.
+ */
+ public static List<OwnerAndPermission>
resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs,
Path fromPath,
+ Path toPath, CopyConfiguration copyConfiguration, Map<String,
OwnerAndPermission> permissionMap) throws IOException {
+
+ if (!PathUtils.isAncestor(toPath, fromPath)) {
+ throw new IOException(String.format("toPath %s must be an ancestor of
fromPath %s.", toPath, fromPath));
+ }
+
+ List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
+ Path currentPath = fromPath;
+
+ while (currentPath.getParent() != null && PathUtils.isAncestor(toPath,
currentPath.getParent())) {
+ if (!permissionMap.containsKey(currentPath.toString())) {
+ permissionMap.put(currentPath.toString(),
resolveReplicatedOwnerAndPermission(sourceFs, currentPath, copyConfiguration));
Review Comment:
a guava `LoadingCache<K, V>` would better encapsulate the memoization here.
e.g. have the caller implement `CacheLoader<K,V>.load` and pass in a
`LoadingCache`; e.g.:
https://guava.dev/releases/20.0/api/docs/com/google/common/cache/CacheLoader.html#load-K-
that would mean reworking this static method to take a `Function<K, V>`.
even better however would be to create a class for `OwnerAndPermission`
resolution, which would get constructed w/ the `FileSystem` and
`CopyConfiguration`, to keep them unchanged throughout.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>>
getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}
CopyManifest.CopyableUnitIterator manifests = null;
- List<CopyEntity> copyEntities = Lists.newArrayList();
- List<FileStatus> toDelete = Lists.newArrayList();
+ List<CopyEntity> copyEntities =
Collections.synchronizedList(Lists.newArrayList());
+ List<FileStatus> toDelete =
Collections.synchronizedList(Lists.newArrayList());
//todo: put permission preserve logic here?
+ ExecutorService queueExecutor = null;
try {
+ queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ List<Future> pendingTask = new ArrayList<>();
+ Map<String, OwnerAndPermission> permissionMap = new
ConcurrentHashMap<>();
while (manifests.hasNext()) {
- //todo: We can use fileSet to partition the data in case of some
softbound issue
- //todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
CopyManifest.CopyableUnit file = manifests.next();
- Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
- boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile,
fileToCopy, configuration)
- .fileSet(datasetURN())
- .datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
-
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
- copyEntities.add(copyableFile);
- if (existOnTarget && srcFile.isFile()) {
- // this is to match the existing publishing behavior where we
won't rewrite the target when it's already existed
- // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ Future future = queueExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ //todo: We can use fileSet to partition the data in case of some
softbound issue
+ //todo: After partition, change this to directly return iterator
so that we can save time if we meet resources limitation
+ Path fileToCopy = new Path(file.fileName);
+ if (fs.exists(fileToCopy)) {
+ boolean existOnTarget = false;
+ try {
+ existOnTarget = targetFs.exists(fileToCopy);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ FileStatus srcFile = fs.getFileStatus(fileToCopy);
+ OwnerAndPermission replicatedPermission =
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+ if (!existOnTarget || shouldCopy(targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+ CopyableFile.Builder copyableFileBuilder =
+ CopyableFile.fromOriginAndDestination(fs, srcFile,
fileToCopy, configuration)
+ .fileSet(datasetURN())
+ .datasetOutputPath(fileToCopy.toString())
+ .ancestorsOwnerAndPermission(CopyableFile
+
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs,
fileToCopy.getParent(),
+ new Path(commonFilesParent), configuration,
permissionMap))
+ .destinationOwnerAndPermission(replicatedPermission);
+ CopyableFile copyableFile = copyableFileBuilder.build();
+ copyableFile.setFsDatasets(fs, targetFs);
+ copyEntities.add(copyableFile);
+ if (existOnTarget && srcFile.isFile()) {
+ // this is to match the existing publishing behavior where
we won't rewrite the target when it's already existed
+ // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ toDelete.add(targetFs.getFileStatus(fileToCopy));
+ }
+ }
+ } else if (deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
+ } catch (IOException e) {
+ log.error("meet exception:", e);
}
- } else if (this.deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
- toDelete.add(targetFs.getFileStatus(fileToCopy));
- }
+ }}
+ );
+ pendingTask.add(future);
+ }
+ for (Future f: pendingTask) {
+ f.get();
}
if (!toDelete.isEmpty()) {
//todo: add support sync for empty dir
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete,
this.properties, Optional.<Path>absent());
copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(),
step, 1));
}
+ log.info(String.format("Calculate workunits take %s milliseconds",
System.currentTimeMillis() - startTime));
Review Comment:
nit: probably should be past tense, so:
```
"WorkUnit calculation took %s ms"
```
or
```
"Calculating WorkUnits took %s ms"
```
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>>
getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}
CopyManifest.CopyableUnitIterator manifests = null;
- List<CopyEntity> copyEntities = Lists.newArrayList();
- List<FileStatus> toDelete = Lists.newArrayList();
+ List<CopyEntity> copyEntities =
Collections.synchronizedList(Lists.newArrayList());
+ List<FileStatus> toDelete =
Collections.synchronizedList(Lists.newArrayList());
//todo: put permission preserve logic here?
+ ExecutorService queueExecutor = null;
try {
+ queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ List<Future> pendingTask = new ArrayList<>();
+ Map<String, OwnerAndPermission> permissionMap = new
ConcurrentHashMap<>();
while (manifests.hasNext()) {
- //todo: We can use fileSet to partition the data in case of some
softbound issue
- //todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
CopyManifest.CopyableUnit file = manifests.next();
- Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
- boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile,
fileToCopy, configuration)
- .fileSet(datasetURN())
- .datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
-
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
- copyEntities.add(copyableFile);
- if (existOnTarget && srcFile.isFile()) {
- // this is to match the existing publishing behavior where we
won't rewrite the target when it's already existed
- // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ Future future = queueExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ //todo: We can use fileSet to partition the data in case of some
softbound issue
+ //todo: After partition, change this to directly return iterator
so that we can save time if we meet resources limitation
+ Path fileToCopy = new Path(file.fileName);
+ if (fs.exists(fileToCopy)) {
+ boolean existOnTarget = false;
+ try {
+ existOnTarget = targetFs.exists(fileToCopy);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ FileStatus srcFile = fs.getFileStatus(fileToCopy);
+ OwnerAndPermission replicatedPermission =
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+ if (!existOnTarget || shouldCopy(targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+ CopyableFile.Builder copyableFileBuilder =
+ CopyableFile.fromOriginAndDestination(fs, srcFile,
fileToCopy, configuration)
+ .fileSet(datasetURN())
+ .datasetOutputPath(fileToCopy.toString())
+ .ancestorsOwnerAndPermission(CopyableFile
+
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs,
fileToCopy.getParent(),
+ new Path(commonFilesParent), configuration,
permissionMap))
+ .destinationOwnerAndPermission(replicatedPermission);
+ CopyableFile copyableFile = copyableFileBuilder.build();
+ copyableFile.setFsDatasets(fs, targetFs);
+ copyEntities.add(copyableFile);
+ if (existOnTarget && srcFile.isFile()) {
+ // this is to match the existing publishing behavior where
we won't rewrite the target when it's already existed
+ // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ toDelete.add(targetFs.getFileStatus(fileToCopy));
+ }
+ }
+ } else if (deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
+ } catch (IOException e) {
+ log.error("meet exception:", e);
}
- } else if (this.deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
- toDelete.add(targetFs.getFileStatus(fileToCopy));
- }
+ }}
+ );
+ pendingTask.add(future);
+ }
+ for (Future f: pendingTask) {
+ f.get();
}
Review Comment:
NBD as presently formulated... but if you did want to detect system-level
issues, such as a fatal problem w/ the target FS, in the beginning rather than
pointlessly creating thousands of futures, you could use a `CompletionService`
to poll for results in between when adding additional ones:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
just maintain a count of how many futures you're expecting, so you know when
they're all done.
Issue Time Tracking
-------------------
Worklog Id: (was: 858821)
Time Spent: 20m (was: 10m)
> Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs
> -----------------------------------------------------------------------
>
> Key: GOBBLIN-1824
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1824
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Optimizing Permission Calculation and Introducing Multithreading in
> Manifest-Based DistCp Work Planning
--
This message was sent by Atlassian Jira
(v8.20.10#820010)