phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175911613
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>>
getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}
CopyManifest.CopyableUnitIterator manifests = null;
- List<CopyEntity> copyEntities = Lists.newArrayList();
- List<FileStatus> toDelete = Lists.newArrayList();
+ List<CopyEntity> copyEntities =
Collections.synchronizedList(Lists.newArrayList());
+ List<FileStatus> toDelete =
Collections.synchronizedList(Lists.newArrayList());
//todo: put permission preserve logic here?
+ ExecutorService queueExecutor = null;
try {
+ queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ List<Future> pendingTask = new ArrayList<>();
+ Map<String, OwnerAndPermission> permissionMap = new
ConcurrentHashMap<>();
while (manifests.hasNext()) {
- //todo: We can use fileSet to partition the data in case of some
softbound issue
- //todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
CopyManifest.CopyableUnit file = manifests.next();
- Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
- boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile,
fileToCopy, configuration)
- .fileSet(datasetURN())
- .datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
-
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
- copyEntities.add(copyableFile);
- if (existOnTarget && srcFile.isFile()) {
- // this is to match the existing publishing behavior where we
won't rewrite the target when it's already existed
- // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ Future future = queueExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ //todo: We can use fileSet to partition the data in case of some
softbound issue
+ //todo: After partition, change this to directly return iterator
so that we can save time if we meet resources limitation
+ Path fileToCopy = new Path(file.fileName);
+ if (fs.exists(fileToCopy)) {
+ boolean existOnTarget = false;
+ try {
+ existOnTarget = targetFs.exists(fileToCopy);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ FileStatus srcFile = fs.getFileStatus(fileToCopy);
+ OwnerAndPermission replicatedPermission =
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+ if (!existOnTarget || shouldCopy(targetFs, srcFile,
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+ CopyableFile.Builder copyableFileBuilder =
+ CopyableFile.fromOriginAndDestination(fs, srcFile,
fileToCopy, configuration)
+ .fileSet(datasetURN())
+ .datasetOutputPath(fileToCopy.toString())
+ .ancestorsOwnerAndPermission(CopyableFile
+
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs,
fileToCopy.getParent(),
+ new Path(commonFilesParent), configuration,
permissionMap))
+ .destinationOwnerAndPermission(replicatedPermission);
+ CopyableFile copyableFile = copyableFileBuilder.build();
+ copyableFile.setFsDatasets(fs, targetFs);
+ copyEntities.add(copyableFile);
+ if (existOnTarget && srcFile.isFile()) {
+ // this is to match the existing publishing behavior where
we won't rewrite the target when it's already existed
+ // todo: Change the publish behavior to support overwrite
destination file during rename, instead of relying on this delete step which is
needed if we want to support task level publish
+ toDelete.add(targetFs.getFileStatus(fileToCopy));
+ }
+ }
+ } else if (deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
+ } catch (IOException e) {
+ log.error("meet exception:", e);
}
- } else if (this.deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
- toDelete.add(targetFs.getFileStatus(fileToCopy));
- }
+ }}
+ );
+ pendingTask.add(future);
+ }
+ for (Future f: pendingTask) {
+ f.get();
}
Review Comment:
NBD as presently formulated... but if you did want to detect system-level
issues, such as a fatal problem w/ the target FS, in the beginning rather than
pointlessly creating thousands of futures, you could use a `CompletionService`
to poll for results in between when adding additional ones:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
just maintain a count of how many futures you're expecting, so you know when
they're all done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]