[ 
https://issues.apache.org/jira/browse/GOBBLIN-1824?focusedWorklogId=858822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-858822
 ]

ASF GitHub Bot logged work on GOBBLIN-1824:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Apr/23 01:46
            Start Date: 25/Apr/23 01:46
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175911613


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> 
getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = 
Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = 
Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new 
ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some 
softbound issue
-        //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, 
fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we 
won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some 
softbound issue
+            //todo: After partition, change this to directly return iterator 
so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = 
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, 
fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, 
fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, 
permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where 
we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);
           }
-        } else if (this.deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
-          toDelete.add(targetFs.getFileStatus(fileToCopy));
-        }
+        }}
+        );
+        pendingTask.add(future);
+      }
+      for (Future f: pendingTask) {
+        f.get();
       }

Review Comment:
   NBD as presently formulated... but if you did want to detect system-level 
issues, such as a fatal problem w/ the target FS, in the beginning rather than 
pointlessly creating thousands of futures, you could use a `CompletionService` 
to poll for results in between when adding additional ones:
   
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
   
   just maintain a count of how many futures you're expecting, so you know when 
they're all done.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 858822)
    Time Spent: 0.5h  (was: 20m)

> Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs
> -----------------------------------------------------------------------
>
>                 Key: GOBBLIN-1824
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1824
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Optimizing Permission Calculation and Introducing Multithreading in 
> Manifest-Based DistCp Work Planning



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to