Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1774541066


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata 
srcMetadata, TableMetadata dst
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws IOException if an I/O error occurs while accessing the table 
metadata or reading manifest files
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate) throws IOException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());
+    List<DataFile> dataFileList = new ArrayList<>();
+    for (ManifestFile manifestFile : dataManifestFiles) {
+      ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(manifestFile, this.tableOps.io());
+      CloseableIterator<DataFile> dataFiles = manifestReader.iterator();
+      dataFiles.forEachRemaining(dataFile -> {
+        if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+          dataFileList.add(dataFile.copy());
+        }
+      });
+    }
+    return dataFileList;
+  }
+
+  /**
+   * Replaces partitions in the table with the specified list of data files.
+   *
+   * @param dataFiles the list of data files to replace partitions with
+   */
+  protected void replacePartitions(List<DataFile> dataFiles) {
+    if (dataFiles.isEmpty()) {
+      return;
+    }
+    ReplacePartitions replacePartitions = this.table.newReplacePartitions();
+    dataFiles.forEach(replacePartitions::addFile);
+    replacePartitions.commit();

Review Comment:
   Yes, it deletes partition that it gets from datafile using 
datafile.partition().
   Will add a note too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to