[ 
https://issues.apache.org/jira/browse/GOBBLIN-1707?focusedWorklogId=814854&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814854
 ]

ASF GitHub Bot logged work on GOBBLIN-1707:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Oct/22 01:10
            Start Date: 08/Oct/22 01:10
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990566896


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem 
targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it 
somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws 
IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless 
scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = 
icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> 
isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new 
Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' 
and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: 
MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = 
icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead 
use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", 
dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), 
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of 
`FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: 
'{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, 
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: 
metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, 
although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit 
each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by 
path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior 
copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, 
absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is 
possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot 
compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied 
prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file 
now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, 
copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : 
snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the 
(n+1)-th element of the iterator will omit all manifest files and listed data 
files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} 
from the n-th or prior elements.  Given the order of the {@link 
Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file 
dependencies: each file is returned exactly once with the (oldest) snapshot 
from which it first becomes reachable.
    
   >  Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present 
(for the snapshot it itself deems current).
   



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem 
targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it 
somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws 
IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless 
scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = 
icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> 
isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new 
Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' 
and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: 
MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = 
icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead 
use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", 
dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), 
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of 
`FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: 
'{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, 
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: 
metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, 
although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit 
each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by 
path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior 
copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, 
absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is 
possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot 
compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied 
prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file 
now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, 
copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : 
snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the 
(n+1)-th element of the iterator will omit all manifest files and listed data 
files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} 
from the n-th or prior elements.  Given the order of the {@link 
Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file 
dependencies: each file is returned exactly once with the (oldest) snapshot 
from which it first becomes reachable.
   > 
   >  Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present 
(for the snapshot it itself deems current).
   





Issue Time Tracking
-------------------

    Worklog Id:     (was: 814854)
    Time Spent: 4h  (was: 3h 50m)

> Add Iceberg support to DistCp
> -----------------------------
>
>                 Key: GOBBLIN-1707
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1707
>             Project: Apache Gobblin
>          Issue Type: Task
>          Components: gobblin-core
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> Add capability for iceberg copy/replication to distcp.  Support incremental 
> copy (only of delta changes since last time) in addition to full copy on 
> first time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to