Blazer-007 commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1778488116
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws IOException if an I/O error occurs while accessing the table metadata or reading manifest files + */ + public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate) throws IOException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); + List<DataFile> dataFileList = new ArrayList<>(); + for (ManifestFile manifestFile : dataManifestFiles) { + ManifestReader<DataFile> manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io()); + CloseableIterator<DataFile> dataFiles = manifestReader.iterator(); + dataFiles.forEachRemaining(dataFile -> { + if (icebergPartitionFilterPredicate.test(dataFile.partition())) { + dataFileList.add(dataFile.copy()); + } + }); + } + return dataFileList; + } + + /** + * Replaces partitions in the table with the specified list of data files. + * + * @param dataFiles the list of data files to replace partitions with + */ + protected void replacePartitions(List<DataFile> dataFiles) { + if (dataFiles.isEmpty()) { + return; + } + ReplacePartitions replacePartitions = this.table.newReplacePartitions(); + dataFiles.forEach(replacePartitions::addFile); + replacePartitions.commit(); Review Comment: I have changed this API to use Overwrite API now as while testing on a bit of complex partition it is giving unusual results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org