phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175911613


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> 
getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = 
Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = 
Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new 
ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some 
softbound issue
-        //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, 
fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we 
won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some 
softbound issue
+            //todo: After partition, change this to directly return iterator 
so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = 
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, 
fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, 
fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, 
permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where 
we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);
           }
-        } else if (this.deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
-          toDelete.add(targetFs.getFileStatus(fileToCopy));
-        }
+        }}
+        );
+        pendingTask.add(future);
+      }
+      for (Future f: pendingTask) {
+        f.get();
       }

Review Comment:
   NBD as presently formulated... but if you did want to detect system-level 
issues, such as a fatal problem w/ the target FS, in the beginning rather than 
pointlessly creating thousands of futures, you could use a `CompletionService` 
to poll for results in between when adding additional ones:
   
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
   
   just maintain a count of how many futures you're expecting, so you know when 
they're all done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to