[ 
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939414&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939414
 ]

ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Oct/24 09:00
            Start Date: 22/Oct/24 09:00
    Worklog Time Spent: 10m 
      Work Description: Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1810287691


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +227,67 @@ protected void registerIcebergTable(TableMetadata 
srcMetadata, TableMetadata dst
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws TableNotFoundException if error occurred while accessing the 
table metadata
+   * @throws RuntimeException if error occurred while reading the manifest file
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate)
+      throws TableNotFoundException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    long currentSnapshotId = currentSnapshot.snapshotId();
+    List<DataFile> knownDataFiles = new ArrayList<>();
+    log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", 
tableId, currentSnapshotId,
+        knownDataFiles.size());
+    //TODO: Add support for deleteManifests as well later
+    // Currently supporting dataManifests only
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());
+    for (ManifestFile manifestFile : dataManifestFiles) {
+      try (ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(manifestFile, this.tableOps.io());
+          CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
+        dataFiles.forEachRemaining(dataFile -> {
+          if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+            knownDataFiles.add(dataFile.copy());
+          }
+        });
+        log.info("~{}~ for snapshot '{}' - '{}' total known iceberg 
datafiles", tableId, currentSnapshotId,
+            knownDataFiles.size());
+      } catch (IOException e) {
+        String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read 
manifest file: %s", tableId,
+            currentSnapshotId, manifestFile.path());
+        log.error(errMsg, e);
+        throw new RuntimeException(errMsg, e);
+      }
+    }
+    return knownDataFiles;
+  }
+
+  /**
+   * Overwrite partition data files in the table for the specified partition 
col name & partition value.
+   * <p>
+   *   Overwrite partition replaces the partition using the expression filter 
provided.
+   * </p>
+   * @param dataFiles the list of data files to replace partitions with
+   * @param partitionColName the partition column name whose data files are to 
be replaced
+   * @param partitionValue  the partition column value on which data files 
will be replaced
+   */
+  protected void overwritePartition(List<DataFile> dataFiles, String 
partitionColName, String partitionValue)
+      throws TableNotFoundException {
+    if (dataFiles.isEmpty()) {
+      return;
+    }
+    log.info("~{}~ SnapshotId before overwrite: {}", tableId, 
accessTableMetadata().currentSnapshot().snapshotId());
+    OverwriteFiles overwriteFiles = this.table.newOverwrite();
+    overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, 
partitionValue));
+    dataFiles.forEach(overwriteFiles::addFile);
+    overwriteFiles.commit();
+    this.tableOps.refresh();
+    log.info("~{}~ SnapshotId after overwrite: {}", tableId, 
accessTableMetadata().currentSnapshot().snapshotId());

Review Comment:
   Thanks for this suggestion - Added as a note in comment before the line





Issue Time Tracking
-------------------

    Worklog Id:     (was: 939414)
    Time Spent: 8h 10m  (was: 8h)

> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
>                 Key: GOBBLIN-2159
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 8h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to