[ https://issues.apache.org/jira/browse/GOBBLIN-1707?focusedWorklogId=814828&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814828 ]
ASF GitHub Bot logged work on GOBBLIN-1707: ------------------------------------------- Author: ASF GitHub Bot Created on: 07/Oct/22 22:33 Start Date: 07/Oct/22 22:33 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3575: URL: https://github.com/apache/gobblin/pull/3575#discussion_r990533651 ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java: ########## @@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati return copyEntities; } + /** Not intended to escape this class... yet `public` visibility in case it somehow does */ + @RequiredArgsConstructor + public static class WrappedIOException extends RuntimeException { + @Getter + private final IOException wrappedException; + + public void rethrowWrapped() throws IOException { + throw wrappedException; + } + } + /** * Finds all files of the Iceberg's current snapshot * Returns a map of path, file status for each file that needs to be copied */ - protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException { - Map<Path, FileStatus> result = Maps.newHashMap(); + protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + Map<Path, FileStatus> results = Maps.newHashMap(); IcebergTable icebergTable = this.getIcebergTable(); + // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg + IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly(); + if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) && + isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) { + log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", + dbName, inputTableName, currentSnapshotOverview.getSnapshotId(), + currentSnapshotOverview.getManifestListPath(), + currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>")); + return results; + } Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator(); Iterator<String> filePathsIterator = Iterators.concat( Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> { - // TODO: decide: is it too much to print for every snapshot--instead use `.debug`? - log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName, - snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>")); - return snapshotInfo.getAllPaths().iterator(); + // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()` + String manListPath = snapshotInfo.getManifestListPath(); + log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName, + snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>")); + // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data; + // most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend + // also on incremental copy being run always atomically: to commit each iceberg only upon its full success. + // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is + // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date. + // hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply + // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some + // metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least + // some of the children pointed to within could have been copied prior, when they previously appeared as a + // child of the current file's predecessor (which this new meta file now replaces). + if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) { + List<String> missingPaths = snapshotInfo.getSnapshotApexPaths(); + for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) { + if (!isPathPresentOnTarget(new Path(mfi.getManifestFilePath()), targetFs, copyConfig)) { + missingPaths.add(mfi.getManifestFilePath()); + mfi.getListedFilePaths().stream().filter(p -> + !isPathPresentOnTarget(new Path(p), targetFs, copyConfig) + ).forEach(missingPaths::add); + } + } + return missingPaths.iterator(); + } else { + log.info("{}.{} - snapshot '{}' already present on target... skipping (with contents)", + dbName, inputTableName, snapshotInfo.getSnapshotId()); + // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list + Optional<String> nonReplicatedMetadataPath = snapshotInfo.getMetadataPath().filter(p -> + !isPathPresentOnTarget(new Path(p), targetFs, copyConfig)); + log.info("{}.{} - metadata is {}already present on target", dbName, inputTableName, nonReplicatedMetadataPath.isPresent() ? "NOT " : ""); + return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator()); + } }) ); Iterable<String> filePathsIterable = () -> filePathsIterator; - // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would - // likely benefit from parallelism - for (String pathString : filePathsIterable) { - Path path = new Path(pathString); - result.put(path, this.sourceFs.getFileStatus(path)); + try { + // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely + // to benefit from parallelism + for (String pathString : filePathsIterable) { + try { + Path path = new Path(pathString); + results.put(path, this.sourceFs.getFileStatus(path)); + } catch (FileNotFoundException fnfe) { + if (!shouldTolerateMissingSourceFiles) { + throw fnfe; + } else { + // log, but otherwise swallow... to continue on + log.warn("MIA source file... did premature deletion subvert time-travel or maybe metadata read interleaved with delete?", fnfe); + } + } + } + } catch (WrappedIOException wrapper) { + wrapper.rethrowWrapped(); + } + return results; + } + + /** @returns whether `path` is present on `targetFs`, tunneling checked exceptions and caching results throughout */ + protected static boolean isPathPresentOnTarget(Path path, FileSystem targetFs, CopyConfiguration copyConfig) { + try { + // omit considering timestamp (or other markers of freshness), as files should be immutable Review Comment: it's technically possible to change files in place, but to do so, breaks the iceberg's repeatability. it's not something we should ever encourage... instead write new files and create a snapshot w/ those that substitutes out and hence replaces the original ones! for distcp specifically, the real issue w/ in-place mods to data files is that every delta copy must devolve into a full comparison of the filestatus (between source and dest) for the entire iceberg table. that's a huge amount of effort in some cases... all because of misbehaving writers/updaters. I suggest that if we do learn we're working w/ writers that do this, we later return here to add the necessary complexity. it's likely something we'll control via configuration, so it's not always on. Issue Time Tracking ------------------- Worklog Id: (was: 814828) Time Spent: 2h 50m (was: 2h 40m) > Add Iceberg support to DistCp > ----------------------------- > > Key: GOBBLIN-1707 > URL: https://issues.apache.org/jira/browse/GOBBLIN-1707 > Project: Apache Gobblin > Issue Type: Task > Components: gobblin-core > Reporter: Kip Kohn > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Add capability for iceberg copy/replication to distcp. Support incremental > copy (only of delta changes since last time) in addition to full copy on > first time. -- This message was sent by Atlassian Jira (v8.20.10#820010)