Blazer-007 commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1810287691
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -217,4 +227,67 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws TableNotFoundException if error occurred while accessing the table metadata + * @throws RuntimeException if error occurred while reading the manifest file + */ + public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate) + throws TableNotFoundException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + long currentSnapshotId = currentSnapshot.snapshotId(); + List<DataFile> knownDataFiles = new ArrayList<>(); + log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", tableId, currentSnapshotId, + knownDataFiles.size()); + //TODO: Add support for deleteManifests as well later + // Currently supporting dataManifests only + List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); + for (ManifestFile manifestFile : dataManifestFiles) { + try (ManifestReader<DataFile> manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io()); + CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) { + dataFiles.forEachRemaining(dataFile -> { + if (icebergPartitionFilterPredicate.test(dataFile.partition())) { + knownDataFiles.add(dataFile.copy()); + } + }); + log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", tableId, currentSnapshotId, + knownDataFiles.size()); + } catch (IOException e) { + String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read manifest file: %s", tableId, + currentSnapshotId, manifestFile.path()); + log.error(errMsg, e); + throw new RuntimeException(errMsg, e); + } + } + return knownDataFiles; + } + + /** + * Overwrite partition data files in the table for the specified partition col name & partition value. + * <p> + * Overwrite partition replaces the partition using the expression filter provided. + * </p> + * @param dataFiles the list of data files to replace partitions with + * @param partitionColName the partition column name whose data files are to be replaced + * @param partitionValue the partition column value on which data files will be replaced + */ + protected void overwritePartition(List<DataFile> dataFiles, String partitionColName, String partitionValue) + throws TableNotFoundException { + if (dataFiles.isEmpty()) { + return; + } + log.info("~{}~ SnapshotId before overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId()); + OverwriteFiles overwriteFiles = this.table.newOverwrite(); + overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue)); + dataFiles.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + this.tableOps.refresh(); + log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId()); Review Comment: Thanks for this suggestion - Added as a note in comment before the line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org