phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175884690


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> 
getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = 
Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = 
Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new 
ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some 
softbound issue
-        //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, 
fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we 
won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some 
softbound issue
+            //todo: After partition, change this to directly return iterator 
so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }

Review Comment:
   a. what sorts of `IOException` are swallowed here?
   b. is it appropriate to print every one (e.g. rather than logging at a 
particular log level that could be adjusted by config)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> 
getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = 
Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = 
Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new 
ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some 
softbound issue
-        //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, 
fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we 
won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some 
softbound issue
+            //todo: After partition, change this to directly return iterator 
so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = 
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, 
fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, 
fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, 
permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where 
we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);

Review Comment:
   in the case of a general FS misconfiguration (e.g. issue w/ target FS), does 
this mean we'll now log one error message for every manifest entry, whereas 
before, with sync processing, only the first one failed, and then we bail out 
and go no further through the manifest?
   
   and even so, swallowing the exception may hide issues (e.g. from the 
automatic troubleshooter).  why not allow them to percolate as 
`j.u.concurrent.ExecutionException`s? (and unwrap back to the underlying 
exception at a higher layer.)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java:
##########
@@ -375,6 +377,35 @@ public static List<OwnerAndPermission> 
resolveReplicatedOwnerAndPermissionsRecur
     return ownerAndPermissions;
   }
 
+  /**
+   * Compute the correct {@link OwnerAndPermission} obtained from replicating 
source owner and permissions and applying
+   * the {@link PreserveAttributes} rules for fromPath and every ancestor up 
to but excluding toPath.
+   * Use permissionMap as a cache to reduce the call to hdfs
+   *
+   * @return A list of the computed {@link OwnerAndPermission}s starting from 
fromPath, up to but excluding toPath.
+   * @throws IOException if toPath is not an ancestor of fromPath.
+   */
+  public static List<OwnerAndPermission> 
resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, 
Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration, Map<String, 
OwnerAndPermission> permissionMap) throws IOException {
+
+    if (!PathUtils.isAncestor(toPath, fromPath)) {
+      throw new IOException(String.format("toPath %s must be an ancestor of 
fromPath %s.", toPath, fromPath));
+    }
+
+    List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
+    Path currentPath = fromPath;
+
+    while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, 
currentPath.getParent())) {
+      if (!permissionMap.containsKey(currentPath.toString())) {
+        permissionMap.put(currentPath.toString(), 
resolveReplicatedOwnerAndPermission(sourceFs, currentPath, copyConfiguration));

Review Comment:
   a guava `LoadingCache<K, V>` would better encapsulate the memoization here.
   
   e.g. have the caller implement `CacheLoader<K,V>.load` and pass in a 
`LoadingCache`; e.g.:
   
https://guava.dev/releases/20.0/api/docs/com/google/common/cache/CacheLoader.html#load-K-
   
   that would mean reworking this static method to take a `Function<K, V>`.  
even better however would be to create a class for `OwnerAndPermission` 
resolution, which would get constructed w/ the `FileSystem` and 
`CopyConfiguration`, to keep them unchanged throughout.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> 
getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = 
Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = 
Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new 
ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some 
softbound issue
-        //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, 
fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we 
won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some 
softbound issue
+            //todo: After partition, change this to directly return iterator 
so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = 
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, 
fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, 
fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, 
permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where 
we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);
           }
-        } else if (this.deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
-          toDelete.add(targetFs.getFileStatus(fileToCopy));
-        }
+        }}
+        );
+        pendingTask.add(future);
+      }
+      for (Future f: pendingTask) {
+        f.get();
       }
       if (!toDelete.isEmpty()) {
         //todo: add support sync for empty dir
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, 
this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), 
step, 1));
       }
+      log.info(String.format("Calculate workunits take %s milliseconds", 
System.currentTimeMillis() - startTime));

Review Comment:
   nit: probably should be past tense, so:
   ```
   "WorkUnit calculation took %s ms"
   ```
   or
   ```
   "Calculating WorkUnits took %s ms"
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> 
getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = 
Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = 
Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new 
ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some 
softbound issue
-        //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, 
fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we 
won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some 
softbound issue
+            //todo: After partition, change this to directly return iterator 
so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = 
CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, 
targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, 
fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            
.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, 
fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, 
permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where 
we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite 
destination file during rename, instead of relying on this delete step which is 
needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);
           }
-        } else if (this.deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
-          toDelete.add(targetFs.getFileStatus(fileToCopy));
-        }
+        }}
+        );
+        pendingTask.add(future);
+      }
+      for (Future f: pendingTask) {
+        f.get();
       }

Review Comment:
   NBD as presently formulated... but if you did want to detect system-level 
issues, such as a fatal problem w/ the target FS, in the beginning rather than 
pointlessly creating thousands of futures, you could use a `CompletionService` 
to poll for results in between when adding additional ones:
   
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
   just maintain a count of how many futures you're expecting, so you know when 
they're all done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to