Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1810287691


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +227,67 @@ protected void registerIcebergTable(TableMetadata 
srcMetadata, TableMetadata dst
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws TableNotFoundException if error occurred while accessing the 
table metadata
+   * @throws RuntimeException if error occurred while reading the manifest file
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate)
+      throws TableNotFoundException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    long currentSnapshotId = currentSnapshot.snapshotId();
+    List<DataFile> knownDataFiles = new ArrayList<>();
+    log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", 
tableId, currentSnapshotId,
+        knownDataFiles.size());
+    //TODO: Add support for deleteManifests as well later
+    // Currently supporting dataManifests only
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());
+    for (ManifestFile manifestFile : dataManifestFiles) {
+      try (ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(manifestFile, this.tableOps.io());
+          CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
+        dataFiles.forEachRemaining(dataFile -> {
+          if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+            knownDataFiles.add(dataFile.copy());
+          }
+        });
+        log.info("~{}~ for snapshot '{}' - '{}' total known iceberg 
datafiles", tableId, currentSnapshotId,
+            knownDataFiles.size());
+      } catch (IOException e) {
+        String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read 
manifest file: %s", tableId,
+            currentSnapshotId, manifestFile.path());
+        log.error(errMsg, e);
+        throw new RuntimeException(errMsg, e);
+      }
+    }
+    return knownDataFiles;
+  }
+
+  /**
+   * Overwrite partition data files in the table for the specified partition 
col name & partition value.
+   * <p>
+   *   Overwrite partition replaces the partition using the expression filter 
provided.
+   * </p>
+   * @param dataFiles the list of data files to replace partitions with
+   * @param partitionColName the partition column name whose data files are to 
be replaced
+   * @param partitionValue  the partition column value on which data files 
will be replaced
+   */
+  protected void overwritePartition(List<DataFile> dataFiles, String 
partitionColName, String partitionValue)
+      throws TableNotFoundException {
+    if (dataFiles.isEmpty()) {
+      return;
+    }
+    log.info("~{}~ SnapshotId before overwrite: {}", tableId, 
accessTableMetadata().currentSnapshot().snapshotId());
+    OverwriteFiles overwriteFiles = this.table.newOverwrite();
+    overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, 
partitionValue));
+    dataFiles.forEach(overwriteFiles::addFile);
+    overwriteFiles.commit();
+    this.tableOps.refresh();
+    log.info("~{}~ SnapshotId after overwrite: {}", tableId, 
accessTableMetadata().currentSnapshot().snapshotId());

Review Comment:
   Thanks for this suggestion - Added as a note in comment before the line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to