[ https://issues.apache.org/jira/browse/GOBBLIN-1707?focusedWorklogId=814854&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814854 ]
ASF GitHub Bot logged work on GOBBLIN-1707: ------------------------------------------- Author: ASF GitHub Bot Created on: 08/Oct/22 01:10 Start Date: 08/Oct/22 01:10 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3575: URL: https://github.com/apache/gobblin/pull/3575#discussion_r990566896 ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java: ########## @@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati return copyEntities; } + /** Not intended to escape this class... yet `public` visibility in case it somehow does */ + @RequiredArgsConstructor + public static class WrappedIOException extends RuntimeException { + @Getter + private final IOException wrappedException; + + public void rethrowWrapped() throws IOException { + throw wrappedException; + } + } + /** * Finds all files of the Iceberg's current snapshot * Returns a map of path, file status for each file that needs to be copied */ - protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException { - Map<Path, FileStatus> result = Maps.newHashMap(); + protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + Map<Path, FileStatus> results = Maps.newHashMap(); IcebergTable icebergTable = this.getIcebergTable(); + // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg + IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly(); + if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) && + isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) { + log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", + dbName, inputTableName, currentSnapshotOverview.getSnapshotId(), + currentSnapshotOverview.getManifestListPath(), + currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>")); + return results; + } Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator(); Iterator<String> filePathsIterator = Iterators.concat( Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> { - // TODO: decide: is it too much to print for every snapshot--instead use `.debug`? - log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName, - snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>")); - return snapshotInfo.getAllPaths().iterator(); + // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()` + String manListPath = snapshotInfo.getManifestListPath(); + log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName, + snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>")); + // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data; + // most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend + // also on incremental copy being run always atomically: to commit each iceberg only upon its full success. + // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is + // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date. + // hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply + // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some + // metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least + // some of the children pointed to within could have been copied prior, when they previously appeared as a + // child of the current file's predecessor (which this new meta file now replaces). + if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) { + List<String> missingPaths = snapshotInfo.getSnapshotApexPaths(); + for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) { Review Comment: sure thing. I added this: > This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} from the n-th or prior elements. Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from which it first becomes reachable. > Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current). ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java: ########## @@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati return copyEntities; } + /** Not intended to escape this class... yet `public` visibility in case it somehow does */ + @RequiredArgsConstructor + public static class WrappedIOException extends RuntimeException { + @Getter + private final IOException wrappedException; + + public void rethrowWrapped() throws IOException { + throw wrappedException; + } + } + /** * Finds all files of the Iceberg's current snapshot * Returns a map of path, file status for each file that needs to be copied */ - protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException { - Map<Path, FileStatus> result = Maps.newHashMap(); + protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + Map<Path, FileStatus> results = Maps.newHashMap(); IcebergTable icebergTable = this.getIcebergTable(); + // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg + IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly(); + if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) && + isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) { + log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", + dbName, inputTableName, currentSnapshotOverview.getSnapshotId(), + currentSnapshotOverview.getManifestListPath(), + currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>")); + return results; + } Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator(); Iterator<String> filePathsIterator = Iterators.concat( Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> { - // TODO: decide: is it too much to print for every snapshot--instead use `.debug`? - log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName, - snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>")); - return snapshotInfo.getAllPaths().iterator(); + // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()` + String manListPath = snapshotInfo.getManifestListPath(); + log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName, + snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>")); + // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data; + // most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend + // also on incremental copy being run always atomically: to commit each iceberg only upon its full success. + // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is + // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date. + // hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply + // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some + // metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least + // some of the children pointed to within could have been copied prior, when they previously appeared as a + // child of the current file's predecessor (which this new meta file now replaces). + if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) { + List<String> missingPaths = snapshotInfo.getSnapshotApexPaths(); + for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) { Review Comment: sure thing. I added this: > This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} from the n-th or prior elements. Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from which it first becomes reachable. > > Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current). Issue Time Tracking ------------------- Worklog Id: (was: 814854) Time Spent: 4h (was: 3h 50m) > Add Iceberg support to DistCp > ----------------------------- > > Key: GOBBLIN-1707 > URL: https://issues.apache.org/jira/browse/GOBBLIN-1707 > Project: Apache Gobblin > Issue Type: Task > Components: gobblin-core > Reporter: Kip Kohn > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Add capability for iceberg copy/replication to distcp. Support incremental > copy (only of delta changes since last time) in addition to full copy on > first time. -- This message was sent by Atlassian Jira (v8.20.10#820010)