[ https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939414&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939414 ]
ASF GitHub Bot logged work on GOBBLIN-2159: ------------------------------------------- Author: ASF GitHub Bot Created on: 22/Oct/24 09:00 Start Date: 22/Oct/24 09:00 Worklog Time Spent: 10m Work Description: Blazer-007 commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1810287691 ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -217,4 +227,67 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws TableNotFoundException if error occurred while accessing the table metadata + * @throws RuntimeException if error occurred while reading the manifest file + */ + public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate) + throws TableNotFoundException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + long currentSnapshotId = currentSnapshot.snapshotId(); + List<DataFile> knownDataFiles = new ArrayList<>(); + log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", tableId, currentSnapshotId, + knownDataFiles.size()); + //TODO: Add support for deleteManifests as well later + // Currently supporting dataManifests only + List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); + for (ManifestFile manifestFile : dataManifestFiles) { + try (ManifestReader<DataFile> manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io()); + CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) { + dataFiles.forEachRemaining(dataFile -> { + if (icebergPartitionFilterPredicate.test(dataFile.partition())) { + knownDataFiles.add(dataFile.copy()); + } + }); + log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", tableId, currentSnapshotId, + knownDataFiles.size()); + } catch (IOException e) { + String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read manifest file: %s", tableId, + currentSnapshotId, manifestFile.path()); + log.error(errMsg, e); + throw new RuntimeException(errMsg, e); + } + } + return knownDataFiles; + } + + /** + * Overwrite partition data files in the table for the specified partition col name & partition value. + * <p> + * Overwrite partition replaces the partition using the expression filter provided. + * </p> + * @param dataFiles the list of data files to replace partitions with + * @param partitionColName the partition column name whose data files are to be replaced + * @param partitionValue the partition column value on which data files will be replaced + */ + protected void overwritePartition(List<DataFile> dataFiles, String partitionColName, String partitionValue) + throws TableNotFoundException { + if (dataFiles.isEmpty()) { + return; + } + log.info("~{}~ SnapshotId before overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId()); + OverwriteFiles overwriteFiles = this.table.newOverwrite(); + overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue)); + dataFiles.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + this.tableOps.refresh(); + log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId()); Review Comment: Thanks for this suggestion - Added as a note in comment before the line Issue Time Tracking ------------------- Worklog Id: (was: 939414) Time Spent: 8h 10m (was: 8h) > Support Partition Based Copy in Iceberg Distcp > ---------------------------------------------- > > Key: GOBBLIN-2159 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2159 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vivek Rai > Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)