[
https://issues.apache.org/jira/browse/GOBBLIN-1824?focusedWorklogId=858822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-858822
]
ASF GitHub Bot logged work on GOBBLIN-1824:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Apr/23 01:46
Start Date: 25/Apr/23 01:46
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175911613
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>>
getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}
CopyManifest.CopyableUnitIterator manifests = null;
- List<CopyEntity> copyEntities = Lists.newArrayList();
- List<FileStatus> toDelete = Lists.newArrayList();
+ List<CopyEntity> copyEntities =
Collections.synchronizedList(Lists.newArrayList());
+ List<FileStatus> toDelete =
Collections.synchronizedList(Lists.newArrayList());
//todo: put permission preserve logic here?
+ ExecutorService queueExecutor = null;
try {
+ queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ List<Future> pendingTask = new ArrayList<>();
+ Map<String, OwnerAndPermission> permissionMap = new
ConcurrentHashMap<>();
while (manifests.hasNext()) {
- //todo: We can use fileSet to partition the data in case of some
softbound issue
- //todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
CopyManifest.CopyableUnit file = manifests.next();
- Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
- boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile,
fileToCopy, configuration)
- .fileSet(datasetURN())
- .datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
-
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
- copyEntities.add(copyableFile);
- if (existOnTarget && srcFile.isFile()) {
- // this is to match the existing publishing behavior where we
won't rewrite the target when it's already existed
- // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ Future future = queueExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ //todo: We can use fileSet to partition the data in case of some
softbound issue
+ //todo: After partition, change this to directly return iterator
so that we can save time if we meet resources limitation
+ Path fileToCopy = new Path(file.fileName);
+ if (fs.exists(fileToCopy)) {
+ boolean existOnTarget = false;
+ try {
+ existOnTarget = targetFs.exists(fileToCopy);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ FileStatus srcFile = fs.getFileStatus(fileToCopy);
+ OwnerAndPermission replicatedPermission =
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+ if (!existOnTarget || shouldCopy(targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+ CopyableFile.Builder copyableFileBuilder =
+ CopyableFile.fromOriginAndDestination(fs, srcFile,
fileToCopy, configuration)
+ .fileSet(datasetURN())
+ .datasetOutputPath(fileToCopy.toString())
+ .ancestorsOwnerAndPermission(CopyableFile
+
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs,
fileToCopy.getParent(),
+ new Path(commonFilesParent), configuration,
permissionMap))
+ .destinationOwnerAndPermission(replicatedPermission);
+ CopyableFile copyableFile = copyableFileBuilder.build();
+ copyableFile.setFsDatasets(fs, targetFs);
+ copyEntities.add(copyableFile);
+ if (existOnTarget && srcFile.isFile()) {
+ // this is to match the existing publishing behavior where
we won't rewrite the target when it's already existed
+ // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ toDelete.add(targetFs.getFileStatus(fileToCopy));
+ }
+ }
+ } else if (deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
+ } catch (IOException e) {
+ log.error("meet exception:", e);
}
- } else if (this.deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
- toDelete.add(targetFs.getFileStatus(fileToCopy));
- }
+ }}
+ );
+ pendingTask.add(future);
+ }
+ for (Future f: pendingTask) {
+ f.get();
}
Review Comment:
NBD as presently formulated... but if you did want to detect system-level
issues, such as a fatal problem w/ the target FS, in the beginning rather than
pointlessly creating thousands of futures, you could use a `CompletionService`
to poll for results in between when adding additional ones:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
just maintain a count of how many futures you're expecting, so you know when
they're all done.
Issue Time Tracking
-------------------
Worklog Id: (was: 858822)
Time Spent: 0.5h (was: 20m)
> Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs
> -----------------------------------------------------------------------
>
> Key: GOBBLIN-1824
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1824
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Optimizing Permission Calculation and Introducing Multithreading in
> Manifest-Based DistCp Work Planning
--
This message was sent by Atlassian Jira
(v8.20.10#820010)