phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990566896


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem 
targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it 
somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws 
IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless 
scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = 
icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> 
isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new 
Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' 
and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: 
MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = 
icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead 
use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", 
dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), 
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of 
`FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: 
'{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, 
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: 
metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, 
although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit 
each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by 
path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior 
copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, 
absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is 
possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot 
compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied 
prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file 
now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, 
copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : 
snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the 
(n+1)-th element of the iterator will omit
   >  all manifest files and listed data files, already reflected in a {@link 
IcebergSnapshotInfo#getManifestFiles()}
   >  from the n-th or prior elements.  Given the order of the {@link 
Iterator<IcebergSnapshotInfo>} returned, this
   >  mirrors the snapshot-to-file dependencies: each file is returned exactly 
once with the (oldest) snapshot from
   >  which it first becomes reachable.
   >  
   >  Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present 
(for the snapshot it itself deems current).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to