phet commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1773860832


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata 
srcMetadata, TableMetadata dst
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws IOException if an I/O error occurs while accessing the table 
metadata or reading manifest files
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate) throws IOException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());

Review Comment:
   please add a comment to document why `dataManifests`, rather than 
`Snapshot::allManifests`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata 
srcMetadata, TableMetadata dst
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws IOException if an I/O error occurs while accessing the table 
metadata or reading manifest files
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate) throws IOException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());
+    List<DataFile> dataFileList = new ArrayList<>();
+    for (ManifestFile manifestFile : dataManifestFiles) {
+      ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(manifestFile, this.tableOps.io());
+      CloseableIterator<DataFile> dataFiles = manifestReader.iterator();

Review Comment:
   `try` (with resources) to `.close()` the `CloseableIterator`s?
   
   ...also, does the `ManifestReader` need closing?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Finder class for locating and creating partitioned Iceberg datasets.
+ * <p>
+ * This class extends {@link IcebergDatasetFinder} and provides functionality 
to create
+ * {@link IcebergPartitionDataset} instances based on the specified source and 
destination Iceberg catalogs.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
+  public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties 
properties) {
+    super(sourceFs, properties);
+  }
+
+  /**
+   * Finds the {@link IcebergPartitionDataset}s in the file system using the 
Iceberg Catalog. Both Iceberg database name and table
+   * name are mandatory based on current implementation.
+   * <p>
+   * Overriding this method to put a check whether source and destination db & 
table names are passed in the properties as separate values
+   * </p>
+   * @return List of {@link IcebergPartitionDataset}s in the file system.
+   * @throws IOException if there is an error while finding the datasets.
+   */
+  @Override
+  public List<IcebergDataset> findDatasets() throws IOException {
+    String srcDbName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY);
+    String destDbName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY);
+    String srcTableName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY);
+    String destTableName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY);
+
+    if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(destDbName) || 
StringUtils.isBlank(srcTableName)
+        || StringUtils.isBlank(destTableName)) {
+      String errorMsg = String.format("Missing (at least some) IcebergDataset 
properties - source: ('%s' and '%s') and destination: ('%s' and '%s') ",
+          srcDbName, srcTableName, destDbName, destTableName);
+      log.error(errorMsg);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    IcebergCatalog srcIcebergCatalog = createIcebergCatalog(this.properties, 
CatalogLocation.SOURCE);
+    IcebergCatalog destIcebergCatalog = createIcebergCatalog(this.properties, 
CatalogLocation.DESTINATION);
+
+    return Collections.singletonList(createIcebergDataset(
+        srcIcebergCatalog, srcDbName, srcTableName,
+        destIcebergCatalog, destDbName, destTableName,
+        this.properties, this.sourceFs
+    ));
+  }
+
+ /**
+ * Creates an {@link IcebergPartitionDataset} instance for the specified 
source and destination Iceberg tables.
+ */
+  @Override
+  protected IcebergDataset createIcebergDataset(IcebergCatalog 
sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog 
destinationIcebergCatalog, String destDbName, String destTableName, Properties 
properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, 
srcTableName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
+        String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, 
srcTableName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(destDbName, destTableName);
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
+        String.format("Missing Destination Iceberg Table: {%s}.{%s}", 
destDbName, destTableName));
+//    TODO: Add Validator for source and destination tables later
+//    TableMetadata srcTableMetadata = srcIcebergTable.accessTableMetadata();
+//    TableMetadata destTableMetadata = destIcebergTable.accessTableMetadata();
+//    
IcebergTableMetadataValidator.validateSourceAndDestinationTablesMetadata(srcTableMetadata,
 destTableMetadata);
+    return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, 
properties, fs, getConfigShouldCopyMetadataPath(properties));

Review Comment:
   the only difference in this overriden method is to name 
`IcebergPartitionDataset` instead of `IcebergDataset` - the rest is repeated 
unchanged from the base class.
   
   a simple refactoring would both make this crystal clear AND reduce 
duplicative code.  e.g. just add a method in `IcebergDatasetFinder`:
   ```
   protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, 
IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs, 
boolean shouldIncludeMetadataPath) {
     return IcebergDataset(srcIcebergTable, destIcebergTable, properties, 
sourceFs, shouldIncludeMetadataPath);
   }
   ```
   that then could be the ONLY `@Override` within 
`IcebergPartitionDatasetFinder`.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on datetime 
values.
+ * <p>
+ *   This class filters partitions by checking if the partition datetime falls 
within a specified range.
+ * </p>
+ * <ul>
+ *   <li>The datetime partition column is expected to be a string column.</li>
+ *   <li>The datetime partition column values are expected to be in the format 
specified by the pattern in the properties.</li>
+ *   <li>The start and end dates are also specified in the properties.</li>
+ * </ul>
+ */
+public class IcebergDateTimePartitionFilterPredicate implements 
Predicate<StructLike> {
+
+  private static final List<String> supportedTransforms = 
ImmutableList.of("identity");
+  private static final String DATETIME_PARTITION_KEY = "partition.datetime";
+  private static final String DATETIME_PARTITION_PATTERN_KEY = 
DATETIME_PARTITION_KEY + ".pattern";
+  private static final String DATETIME_PARTITION_STARTDATE_KEY = 
DATETIME_PARTITION_KEY + ".startdate";
+  private static final String DATETIME_PARTITION_ENDDATE_KEY = 
DATETIME_PARTITION_KEY + ".enddate";
+  private final int partitionColumnIndex;
+  private final DateTimeFormatter dateTimeFormatter;
+  private final DateTime startDate;
+  private final DateTime endDate;
+
+  /**
+   * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the 
specified parameters.
+   *
+   * @param partitionColumnName the name of the partition column
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param properties the properties containing partition configuration
+   * @throws IllegalArgumentException if the partition column is not found or 
required properties are missing
+   */
+  public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, 
TableMetadata tableMetadata,
+      Properties properties) {
+
+    this.partitionColumnIndex = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName,
+        tableMetadata, supportedTransforms);;
+    Preconditions.checkArgument(this.partitionColumnIndex != -1,
+        String.format("Partition column %s not found", partitionColumnName));
+
+    String partitionPattern = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_PATTERN_KEY);
+
+    String startDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_STARTDATE_KEY);
+
+    String endDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_ENDDATE_KEY);
+
+    Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), 
"DateTime Partition pattern cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), 
"DateTime Partition start date cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime 
Partition end date cannot be empty");
+
+    this.dateTimeFormatter = 
DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC);
+    this.startDate = this.dateTimeFormatter.parseDateTime(startDateVal);
+    this.endDate = this.dateTimeFormatter.parseDateTime(endDateVal);
+  }
+
+  /**
+   * Check if the partition datetime falls within the specified range.
+   *
+   * @param partition the datetime partition to check
+   * @return {@code true} if the datetime partition value is within the range, 
otherwise {@code false}
+   */
+  @Override
+  public boolean test(StructLike partition) {
+    // Just a cautious check to avoid NPE, ideally partition shouldn't be null 
if table is partitioned
+    if (Objects.isNull(partition)) {
+      return false;
+    }
+
+    String partitionVal = partition.get(this.partitionColumnIndex, 
String.class);
+    if (StringUtils.isBlank(partitionVal)) {
+      return false;
+    }
+
+    DateTime partitionDateTime = 
this.dateTimeFormatter.parseDateTime(partitionVal);
+
+    if (partitionDateTime.isEqual(this.startDate) || 
partitionDateTime.isEqual(this.endDate)) {
+      return true;
+    }
+    return partitionDateTime.isAfter(this.startDate) && 
partitionDateTime.isBefore(this.endDate);
+  }

Review Comment:
   ```
   return pdt.isEqual(start)
       || pdt.isEqual(end)
       || (pdt.isAfter(start) && pdt.isBefore(end));
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    List<PartitionData> partitionDataList = Collections.nCopies(5, 
partitionData);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+
+    catalog.dropTable(testTableId);
+  }
+
+  @Test
+  public void testReplacePartitions() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, 1L);
+    List<PartitionData> partitionDataList = Arrays.asList(partitionData, 
partitionData2);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/id=2/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData3 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData3.set(0, 2L);
+    PartitionData partitionData4 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData4.set(0, 2L);
+    List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, 
partitionData4);
+
+    List<DataFile> dataFiles = getDataFiles(paths2, partitionDataList2);
+    // here, since partition data with value 2 doesn't exist yet, we expect it 
to get added to the table
+    icebergTable.replacePartitions(dataFiles);
+    List<String> expectedPaths = new ArrayList<>(paths);
+    expectedPaths.addAll(paths2);
+    verifyAnyOrder(expectedPaths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths3 = Arrays.asList(
+      "/path/tableName/data/id=1/file5.orc",
+      "/path/tableName/data/id=1/file6.orc"
+    );
+    // Reusing same partition dats to create data file with different paths
+    List<DataFile> dataFiles2 = getDataFiles(paths3, partitionDataList);
+    // here, since partition data with value 1 already exists, we expect it to 
get updated in the table with newer path
+    icebergTable.replacePartitions(dataFiles2);
+    List<String> updExpectedPaths = new ArrayList<>(paths2);
+    updExpectedPaths.addAll(paths3);
+    verifyAnyOrder(updExpectedPaths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    catalog.dropTable(testTableId);
+  }
+
+  private static void addPartitionDataFiles(Table table, List<String> paths, 
List<PartitionData> partitionDataList) {
+    Assert.assertEquals(paths.size(), partitionDataList.size());
+    for (int i = 0; i < paths.size(); i++) {
+      DataFile dataFile = createDataFileWithPartition(paths.get(i), 
partitionDataList.get(i));
+      table.newAppend().appendFile(dataFile).commit();

Review Comment:
   looks like you could implement in terms of the other:
   ```
   getDataFiles(paths, partitionDataList).stream().forEach(dataFile ->
       table.newAppend().appendFile(dataFile).commit()
   );
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    List<PartitionData> partitionDataList = Collections.nCopies(5, 
partitionData);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+
+    catalog.dropTable(testTableId);
+  }
+
+  @Test
+  public void testReplacePartitions() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, 1L);
+    List<PartitionData> partitionDataList = Arrays.asList(partitionData, 
partitionData2);

Review Comment:
   the `Collections.nCopies` above was clearer



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    this.partitionFilterPredicate = 
IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName,
+        srcTableMetadata, properties);
+  }
+
+  /**
+   * Represents the destination file paths and the corresponding file status 
in source file system.
+   * These both properties are used in creating {@link CopyEntity}
+   */
+  @Data
+  protected static final class FilePathsWithStatus {
+    private final Path destPath;
+    private final FileStatus srcFileStatus;
+  }
+
+  /**
+   * Generates copy entities for partition based data movement.
+   * It finds files specific to the partition and create destination data 
files based on the source data files.
+   * Also updates the destination data files with destination table write data 
location and add UUID to the file path
+   * to avoid conflicts.
+   *
+   * @param targetFs the target file system
+   * @param copyConfig the copy configuration
+   * @return a collection of copy entities
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
+    String fileSet = this.getFileSetId();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    IcebergTable srcIcebergTable = getSrcIcebergTable();
+    List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
+    List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
+    Configuration defaultHadoopConfiguration = new Configuration();
+
+    for (FilePathsWithStatus filePathsWithStatus : 
getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) {
+      Path destPath = filePathsWithStatus.getDestPath();
+      FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus();
+      FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+      CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+              actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), 
copyConfig)
+          .fileSet(fileSet)
+          .datasetOutputPath(targetFs.getUri().getPath())
+          .build();
+
+      fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+      fileEntity.setDestinationData(getDestinationDataset(targetFs));
+      copyEntities.add(fileEntity);
+    }
+
+    // Adding this check to avoid adding post publish step when there are no 
files to copy.
+    if (CollectionUtils.isNotEmpty(destDataFiles)) {
+      copyEntities.add(createPostPublishStep(destDataFiles));
+    }

Review Comment:
   I agree this is one difference with `IcebergDataset::generateCopyEntities`, 
which always wants to add its post-publish step.  (but it shouldn't be hard to 
refactor to isolate this difference)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    this.partitionFilterPredicate = 
IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName,
+        srcTableMetadata, properties);
+  }
+
+  /**
+   * Represents the destination file paths and the corresponding file status 
in source file system.
+   * These both properties are used in creating {@link CopyEntity}
+   */
+  @Data
+  protected static final class FilePathsWithStatus {
+    private final Path destPath;
+    private final FileStatus srcFileStatus;
+  }
+
+  /**
+   * Generates copy entities for partition based data movement.
+   * It finds files specific to the partition and create destination data 
files based on the source data files.
+   * Also updates the destination data files with destination table write data 
location and add UUID to the file path
+   * to avoid conflicts.
+   *
+   * @param targetFs the target file system
+   * @param copyConfig the copy configuration
+   * @return a collection of copy entities
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
+    String fileSet = this.getFileSetId();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    IcebergTable srcIcebergTable = getSrcIcebergTable();
+    List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
+    List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
+    Configuration defaultHadoopConfiguration = new Configuration();
+
+    for (FilePathsWithStatus filePathsWithStatus : 
getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) {
+      Path destPath = filePathsWithStatus.getDestPath();
+      FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus();
+      FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+      CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+              actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), 
copyConfig)
+          .fileSet(fileSet)
+          .datasetOutputPath(targetFs.getUri().getPath())
+          .build();
+
+      fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+      fileEntity.setDestinationData(getDestinationDataset(targetFs));
+      copyEntities.add(fileEntity);
+    }
+
+    // Adding this check to avoid adding post publish step when there are no 
files to copy.
+    if (CollectionUtils.isNotEmpty(destDataFiles)) {
+      copyEntities.add(createPostPublishStep(destDataFiles));
+    }
+
+    log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size());
+    return copyEntities;
+  }
+
+  private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws 
IcebergTable.TableNotFoundException {
+    List<DataFile> destDataFiles = new ArrayList<>();
+    if (srcDataFiles.isEmpty()) {
+      return destDataFiles;
+    }
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    TableMetadata destTableMetadata = 
getDestIcebergTable().accessTableMetadata();
+    PartitionSpec partitionSpec = destTableMetadata.spec();
+    String srcWriteDataLocation = 
srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
+    String destWriteDataLocation = 
destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
+    if (StringUtils.isEmpty(srcWriteDataLocation) || 
StringUtils.isEmpty(destWriteDataLocation)) {
+      log.warn(
+          String.format("Either source or destination table does not have 
write data location : source table write data location : {%s} , destination 
table write data location : {%s}",
+              srcWriteDataLocation,
+              destWriteDataLocation
+          )
+      );
+    }
+    // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns 
null if the property is not set and
+    // doesn't respect passed default value, so to avoid NPE in .replace() we 
are setting it to empty string.
+    String prefixToBeReplaced = (srcWriteDataLocation != null) ? 
srcWriteDataLocation : "";
+    String prefixToReplaceWith = (destWriteDataLocation != null) ? 
destWriteDataLocation : "";
+    srcDataFiles.forEach(dataFile -> {
+      String curDestFilePath = dataFile.path().toString();
+      String newDestFilePath = curDestFilePath.replace(prefixToBeReplaced, 
prefixToReplaceWith);
+      String updatedDestFilePath = addUUIDToPath(newDestFilePath);
+      destDataFiles.add(DataFiles.builder(partitionSpec)
+          .copy(dataFile)
+          .withPath(updatedDestFilePath)
+          .build());
+    });
+    return destDataFiles;
+  }
+
+  private String addUUIDToPath(String filePathStr) {
+    Path filePath = new Path(filePathStr);
+    String fileDir = filePath.getParent().toString();
+    String fileName = filePath.getName();
+    String newFileName = UUID.randomUUID() + "-" + fileName;
+    return String.join("/", fileDir, newFileName);

Review Comment:
   usually best to avoid hard-coding path separators, like `/`:
   ```
   Path filePath = new Path(filePathStr);
   String fileName = filePath.getName();
   String newFileName = UUID.randomUUID() + "-" + fileName;
   return new Path(filePath.getParent(), newFileName).toString()
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on datetime 
values.
+ * <p>
+ *   This class filters partitions by checking if the partition datetime falls 
within a specified range.
+ * </p>
+ * <ul>
+ *   <li>The datetime partition column is expected to be a string column.</li>
+ *   <li>The datetime partition column values are expected to be in the format 
specified by the pattern in the properties.</li>
+ *   <li>The start and end dates are also specified in the properties.</li>
+ * </ul>
+ */
+public class IcebergDateTimePartitionFilterPredicate implements 
Predicate<StructLike> {
+
+  private static final List<String> supportedTransforms = 
ImmutableList.of("identity");
+  private static final String DATETIME_PARTITION_KEY = "partition.datetime";
+  private static final String DATETIME_PARTITION_PATTERN_KEY = 
DATETIME_PARTITION_KEY + ".pattern";
+  private static final String DATETIME_PARTITION_STARTDATE_KEY = 
DATETIME_PARTITION_KEY + ".startdate";
+  private static final String DATETIME_PARTITION_ENDDATE_KEY = 
DATETIME_PARTITION_KEY + ".enddate";
+  private final int partitionColumnIndex;
+  private final DateTimeFormatter dateTimeFormatter;
+  private final DateTime startDate;
+  private final DateTime endDate;
+
+  /**
+   * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the 
specified parameters.
+   *
+   * @param partitionColumnName the name of the partition column
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param properties the properties containing partition configuration
+   * @throws IllegalArgumentException if the partition column is not found or 
required properties are missing
+   */
+  public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, 
TableMetadata tableMetadata,
+      Properties properties) {
+
+    this.partitionColumnIndex = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName,
+        tableMetadata, supportedTransforms);;
+    Preconditions.checkArgument(this.partitionColumnIndex != -1,
+        String.format("Partition column %s not found", partitionColumnName));
+
+    String partitionPattern = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_PATTERN_KEY);
+
+    String startDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_STARTDATE_KEY);
+
+    String endDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_ENDDATE_KEY);
+
+    Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), 
"DateTime Partition pattern cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), 
"DateTime Partition start date cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime 
Partition end date cannot be empty");
+
+    this.dateTimeFormatter = 
DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC);
+    this.startDate = this.dateTimeFormatter.parseDateTime(startDateVal);
+    this.endDate = this.dateTimeFormatter.parseDateTime(endDateVal);
+  }
+
+  /**
+   * Check if the partition datetime falls within the specified range.
+   *
+   * @param partition the datetime partition to check
+   * @return {@code true} if the datetime partition value is within the range, 
otherwise {@code false}
+   */
+  @Override
+  public boolean test(StructLike partition) {
+    // Just a cautious check to avoid NPE, ideally partition shouldn't be null 
if table is partitioned
+    if (Objects.isNull(partition)) {
+      return false;
+    }
+
+    String partitionVal = partition.get(this.partitionColumnIndex, 
String.class);

Review Comment:
   is the value guaranteed always to be a `String`?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Finder class for locating and creating partitioned Iceberg datasets.
+ * <p>
+ * This class extends {@link IcebergDatasetFinder} and provides functionality 
to create
+ * {@link IcebergPartitionDataset} instances based on the specified source and 
destination Iceberg catalogs.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
+  public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties 
properties) {
+    super(sourceFs, properties);
+  }
+
+  /**
+   * Finds the {@link IcebergPartitionDataset}s in the file system using the 
Iceberg Catalog. Both Iceberg database name and table
+   * name are mandatory based on current implementation.
+   * <p>
+   * Overriding this method to put a check whether source and destination db & 
table names are passed in the properties as separate values
+   * </p>
+   * @return List of {@link IcebergPartitionDataset}s in the file system.
+   * @throws IOException if there is an error while finding the datasets.
+   */
+  @Override
+  public List<IcebergDataset> findDatasets() throws IOException {
+    String srcDbName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY);
+    String destDbName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY);
+    String srcTableName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY);
+    String destTableName = getLocationQualifiedProperty(this.properties, 
CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY);

Review Comment:
   I see you deleted some handling of legacy prop names, but beyond that, this 
appears to be basically the same impl as from the base class.  so, why not just 
keep the base class version as-is, and only `@Override` `createIcebergTable`?
   
   update: please see next comment...



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata 
srcMetadata, TableMetadata dst
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws IOException if an I/O error occurs while accessing the table 
metadata or reading manifest files
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate) throws IOException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());
+    List<DataFile> dataFileList = new ArrayList<>();
+    for (ManifestFile manifestFile : dataManifestFiles) {
+      ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(manifestFile, this.tableOps.io());
+      CloseableIterator<DataFile> dataFiles = manifestReader.iterator();
+      dataFiles.forEachRemaining(dataFile -> {
+        if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+          dataFileList.add(dataFile.copy());
+        }
+      });
+    }
+    return dataFileList;
+  }
+
+  /**
+   * Replaces partitions in the table with the specified list of data files.
+   *
+   * @param dataFiles the list of data files to replace partitions with
+   */
+  protected void replacePartitions(List<DataFile> dataFiles) {
+    if (dataFiles.isEmpty()) {
+      return;
+    }
+    ReplacePartitions replacePartitions = this.table.newReplacePartitions();
+    dataFiles.forEach(replacePartitions::addFile);
+    replacePartitions.commit();

Review Comment:
   this API is not as I expected.  we don't indicate anywhere which partitions 
to replace.  does it basically replace the entirety of EVERY partition that any 
file we add belongs to?
   
   behavior like this could be worth documenting as a comment



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicate.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on 
specified partition values.
+ * <p>
+ * This class filters partitions by checking if the partition value matches 
any of the specified values.
+ * </p>
+ */
+public class IcebergPartitionFilterPredicate implements Predicate<StructLike> {

Review Comment:
   naming seems more like a base class or even an interface, than a stand-alone 
class impl...
   
   how about `IcebergMatchesAnyPropNamePartitionFilterPredicate`.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.Properties;
+import java.util.function.Predicate;
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+
+/**
+ * Factory class for creating partition filter predicates for Iceberg tables.
+ */
+public class IcebergPartitionFilterPredicateFactory {
+  private static final String ICEBERG_PARTITION_TYPE_KEY = "partition.type";
+  private static final String DATETIME_PARTITION_TYPE = "datetime";
+
+  /**
+   * Creates a filter predicate for the given partition column name, table 
metadata, and properties.
+   *
+   * @param partitionColumnName the name of the partition column
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param properties the properties containing partition type information
+   * @return a {@link Predicate} for filtering partitions
+   */
+  public static Predicate<StructLike> getFilterPredicate(String 
partitionColumnName, TableMetadata tableMetadata,

Review Comment:
   others may want different logic, so they might wish to implement their own 
factory.  to enable that, make this an interface and thereby make the "factory 
method" non-`static`.  also, `create` is the name conventionally used for a 
factory method.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on datetime 
values.
+ * <p>
+ *   This class filters partitions by checking if the partition datetime falls 
within a specified range.
+ * </p>
+ * <ul>
+ *   <li>The datetime partition column is expected to be a string column.</li>
+ *   <li>The datetime partition column values are expected to be in the format 
specified by the pattern in the properties.</li>
+ *   <li>The start and end dates are also specified in the properties.</li>
+ * </ul>
+ */
+public class IcebergDateTimePartitionFilterPredicate implements 
Predicate<StructLike> {
+
+  private static final List<String> supportedTransforms = 
ImmutableList.of("identity");
+  private static final String DATETIME_PARTITION_KEY = "partition.datetime";
+  private static final String DATETIME_PARTITION_PATTERN_KEY = 
DATETIME_PARTITION_KEY + ".pattern";
+  private static final String DATETIME_PARTITION_STARTDATE_KEY = 
DATETIME_PARTITION_KEY + ".startdate";
+  private static final String DATETIME_PARTITION_ENDDATE_KEY = 
DATETIME_PARTITION_KEY + ".enddate";
+  private final int partitionColumnIndex;
+  private final DateTimeFormatter dateTimeFormatter;
+  private final DateTime startDate;
+  private final DateTime endDate;
+
+  /**
+   * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the 
specified parameters.
+   *
+   * @param partitionColumnName the name of the partition column
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param properties the properties containing partition configuration
+   * @throws IllegalArgumentException if the partition column is not found or 
required properties are missing
+   */
+  public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, 
TableMetadata tableMetadata,
+      Properties properties) {
+
+    this.partitionColumnIndex = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName,
+        tableMetadata, supportedTransforms);;
+    Preconditions.checkArgument(this.partitionColumnIndex != -1,
+        String.format("Partition column %s not found", partitionColumnName));
+
+    String partitionPattern = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_PATTERN_KEY);
+
+    String startDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_STARTDATE_KEY);
+
+    String endDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_ENDDATE_KEY);
+
+    Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), 
"DateTime Partition pattern cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), 
"DateTime Partition start date cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime 
Partition end date cannot be empty");
+
+    this.dateTimeFormatter = 
DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC);

Review Comment:
   is UTC part of the iceberg spec or just what we expect to be using?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicate.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on 
specified partition values.
+ * <p>
+ * This class filters partitions by checking if the partition value matches 
any of the specified values.
+ * </p>
+ */
+public class IcebergPartitionFilterPredicate implements Predicate<StructLike> {
+  private static final List<String> supportedTransforms = 
ImmutableList.of("identity", "truncate");
+  private static final String ICEBERG_PARTITION_VALUES_KEY = 
"partition.values";
+  private final int partitionColumnIndex;
+  private final List<String> partitionValues;
+  private static final Splitter LIST_SPLITTER = 
Splitter.on(",").trimResults().omitEmptyStrings();
+
+  /**
+   * Constructs an {@code IcebergPartitionFilterPredicate} with the specified 
parameters.
+   *
+   * @param partitionColumnName the name of the partition column
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param properties the properties containing partition configuration
+   * @throws IllegalArgumentException if the partition column is not found or 
required properties are missing
+   */
+  public IcebergPartitionFilterPredicate(String partitionColumnName, 
TableMetadata tableMetadata,
+      Properties properties) {
+    this.partitionColumnIndex = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName,
+        tableMetadata, supportedTransforms);
+    Preconditions.checkArgument(this.partitionColumnIndex != -1,
+        String.format("Partition column %s not found", partitionColumnName));
+
+    String partitionColumnValues =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_VALUES_KEY);;
+    Preconditions.checkArgument(StringUtils.isNotBlank(partitionColumnValues),
+        "Partition column values cannot be empty");
+
+    this.partitionValues = LIST_SPLITTER.splitToList(partitionColumnValues);

Review Comment:
   as discussed with the other class, a better encapsulated ctor might be:
   ```
    public IcebergMatchesAnyPropNamePartitionFilterPredicate(
       int partitionColumnIndex, 
       List<String> targetValues)
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on datetime 
values.
+ * <p>
+ *   This class filters partitions by checking if the partition datetime falls 
within a specified range.
+ * </p>
+ * <ul>
+ *   <li>The datetime partition column is expected to be a string column.</li>
+ *   <li>The datetime partition column values are expected to be in the format 
specified by the pattern in the properties.</li>
+ *   <li>The start and end dates are also specified in the properties.</li>
+ * </ul>
+ */
+public class IcebergDateTimePartitionFilterPredicate implements 
Predicate<StructLike> {
+
+  private static final List<String> supportedTransforms = 
ImmutableList.of("identity");
+  private static final String DATETIME_PARTITION_KEY = "partition.datetime";
+  private static final String DATETIME_PARTITION_PATTERN_KEY = 
DATETIME_PARTITION_KEY + ".pattern";
+  private static final String DATETIME_PARTITION_STARTDATE_KEY = 
DATETIME_PARTITION_KEY + ".startdate";
+  private static final String DATETIME_PARTITION_ENDDATE_KEY = 
DATETIME_PARTITION_KEY + ".enddate";
+  private final int partitionColumnIndex;
+  private final DateTimeFormatter dateTimeFormatter;
+  private final DateTime startDate;
+  private final DateTime endDate;
+
+  /**
+   * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the 
specified parameters.
+   *
+   * @param partitionColumnName the name of the partition column
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param properties the properties containing partition configuration
+   * @throws IllegalArgumentException if the partition column is not found or 
required properties are missing
+   */
+  public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, 
TableMetadata tableMetadata,
+      Properties properties) {
+
+    this.partitionColumnIndex = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName,
+        tableMetadata, supportedTransforms);;
+    Preconditions.checkArgument(this.partitionColumnIndex != -1,
+        String.format("Partition column %s not found", partitionColumnName));
+
+    String partitionPattern = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_PATTERN_KEY);
+
+    String startDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_STARTDATE_KEY);

Review Comment:
   all of this property introspection reaches beyond the class' own impl to 
depend on conventions other classes may have set up.  doing so really increases 
the complexity of the class, which complicates testability.
   
   much better would be to leave the property access to a layer that would call 
this class.  e.g. define a constructor like:
   ```
   public IcebergDateTimePartitionFilterPredicate(
       int partitionColumnIndex, 
       DateTimeFormatter dateTimeFormatter,
       DateTime startDate,
       DateTime endDate)
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.transforms.Transform;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil}
 */
+public class IcebergPartitionFilterPredicateUtilTest {
+  private TableMetadata mockTableMetadata;
+  private final List<String> supportedTransforms = 
ImmutableList.of("supported1", "supported2");
+
+  private void setupMockData(String name, String transform) {

Review Comment:
   how about providing an `int n` param of how many columns to create, and this 
method would create `col1` w/ `transform1`, `col2` with `transform2`, `col3`, 
etc. (all the way up to `n`)?
   
   that could be used for your final test case



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.Properties;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicate}
 */
+public class IcebergPartitionFilterPredicateTest {
+
+  private TableMetadata mockTableMetadata;
+  private Properties mockProperties;
+  private MockedStatic<IcebergPartitionFilterPredicateUtil> 
icebergPartitionFilterPredicateUtilMockedStatic;
+  private static final String TEST_ICEBERG_PARTITION_VALUES_KEY = 
"iceberg.dataset.source.partition.values";
+  private static final String TEST_ICEBERG_PARTITION_VALUES_EXCEPTION_MESSAGE 
= "Partition column values cannot be empty";
+  private static final String TEST_ICEBERG_PARTITION_COLUMN = "col1";
+  private static final String TEST_ICEBERG_PARTITION_VALUES = "value1,value2";
+  private static final String TEST_ICEBERG_PARTITION_VALUES_2 = 
"value1,value3,value2,value4";
+  private static final String TEST_ICEBERG_PARTITION_VALUES_3 = "1,2,3,4";
+
+  @BeforeMethod
+  public void setup() {
+    mockTableMetadata = Mockito.mock(TableMetadata.class);
+    mockProperties = new Properties();
+    icebergPartitionFilterPredicateUtilMockedStatic = 
Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class);
+    icebergPartitionFilterPredicateUtilMockedStatic.when(
+            () -> 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
 Mockito.any(TableMetadata.class), Mockito.anyList()))
+        .thenReturn(0);
+  }
+
+  @AfterMethod
+  public void cleanup() {
+    icebergPartitionFilterPredicateUtilMockedStatic.close();
+  }
+
+  @Test
+  public void testPartitionColumnNotFound() {
+    icebergPartitionFilterPredicateUtilMockedStatic.when(
+            () -> 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
 Mockito.any(TableMetadata.class), Mockito.anyList()))
+        .thenReturn(-1);
+    IllegalArgumentException exception = 
Assert.expectThrows(IllegalArgumentException.class, () -> {
+      new IcebergPartitionFilterPredicate("nonexistentColumn", 
mockTableMetadata, mockProperties);
+    });
+    Assert.assertEquals(exception.getMessage(), "Partition column 
nonexistentColumn not found");
+  }
+
+  @Test
+  public void testPartitionColumnValuesEmpty() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, "");
+    verifyIllegalArgumentExceptionWithMessage();
+  }
+
+  @Test
+  public void testPartitionColumnValuesNULL() {
+    // Not setting values in mockProperties to test NULL value
+    verifyIllegalArgumentExceptionWithMessage();
+  }
+
+  @Test
+  public void testPartitionColumnValuesWhitespaces() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, "   ");
+    verifyIllegalArgumentExceptionWithMessage();
+  }
+
+  @Test
+  public void testPartitionValueNULL() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, 
TEST_ICEBERG_PARTITION_VALUES);
+    IcebergPartitionFilterPredicate predicate = new 
IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN,
+        mockTableMetadata, mockProperties);
+    // Just mocking, so that the partition value is NULL
+    Assert.assertFalse(predicate.test(Mockito.mock(StructLike.class)));
+  }
+
+  @Test
+  public void testWhenPartitionIsNull() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, 
TEST_ICEBERG_PARTITION_VALUES);
+    IcebergPartitionFilterPredicate predicate = new 
IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN,
+        mockTableMetadata, mockProperties);
+    Assert.assertFalse(predicate.test(null));
+  }
+
+  @Test
+  public void testPartitionValueMatch() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, 
TEST_ICEBERG_PARTITION_VALUES);
+
+    IcebergPartitionFilterPredicate predicate = new 
IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN,
+        mockTableMetadata, mockProperties);
+
+    StructLike mockPartition = Mockito.mock(StructLike.class);
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn("value1");
+
+    Assert.assertTrue(predicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionValueMatch2() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, 
TEST_ICEBERG_PARTITION_VALUES_2);
+
+    IcebergPartitionFilterPredicate predicate = new 
IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN,
+        mockTableMetadata, mockProperties);
+
+    StructLike mockPartition = Mockito.mock(StructLike.class);
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn("value2");
+
+    Assert.assertTrue(predicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionValueNoMatch() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, 
TEST_ICEBERG_PARTITION_VALUES);
+
+    IcebergPartitionFilterPredicate predicate = new 
IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN,
+        mockTableMetadata, mockProperties);
+
+    StructLike mockPartition = Mockito.mock(StructLike.class);
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn("value3");
+
+    Assert.assertFalse(predicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionValuesAsInt() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, 
TEST_ICEBERG_PARTITION_VALUES_3);
+
+    IcebergPartitionFilterPredicate predicate = new 
IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN,
+        mockTableMetadata, mockProperties);
+
+    StructLike mockPartition = Mockito.mock(StructLike.class);
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn(3);
+    Assert.assertTrue(predicate.test(mockPartition));
+
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn(4);
+    Assert.assertTrue(predicate.test(mockPartition));
+
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn(10);
+    Assert.assertFalse(predicate.test(mockPartition));
+
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn(Integer.MAX_VALUE);
+    Assert.assertFalse(predicate.test(mockPartition));
+
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn(Integer.MIN_VALUE);
+    Assert.assertFalse(predicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionValuesAsIntMaxMin() {

Review Comment:
   what do you feel it tests to exercise max and min int, in addition to the 
test case just above?



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.Properties;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicate}
 */
+public class IcebergPartitionFilterPredicateTest {
+
+  private TableMetadata mockTableMetadata;
+  private Properties mockProperties;
+  private MockedStatic<IcebergPartitionFilterPredicateUtil> 
icebergPartitionFilterPredicateUtilMockedStatic;
+  private static final String TEST_ICEBERG_PARTITION_VALUES_KEY = 
"iceberg.dataset.source.partition.values";
+  private static final String TEST_ICEBERG_PARTITION_VALUES_EXCEPTION_MESSAGE 
= "Partition column values cannot be empty";
+  private static final String TEST_ICEBERG_PARTITION_COLUMN = "col1";
+  private static final String TEST_ICEBERG_PARTITION_VALUES = "value1,value2";
+  private static final String TEST_ICEBERG_PARTITION_VALUES_2 = 
"value1,value3,value2,value4";
+  private static final String TEST_ICEBERG_PARTITION_VALUES_3 = "1,2,3,4";

Review Comment:
   suggest naming to remind us what these hold:
   ```
   TEST_ICEBERG_PARTITION_VALUES_1_AND_2
   TEST_ICEBERG_PARTITION_VALUES_1_TO_4
   TEST_ICEBERG_PARTITION_VALUES_INTS_1_TO_4
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicateTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.Properties;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergDateTimePartitionFilterPredicate}
 */
+public class IcebergDateTimePartitionFilterPredicateTest {
+  private static final String TEST_ICEBERG_PARTITION_DATETTIME = 
"iceberg.dataset.source.partition.datetime";
+  private static final String TEST_ICEBERG_PARTITION_DATETTIME_PATTERN = 
TEST_ICEBERG_PARTITION_DATETTIME + ".pattern";
+  private static final String TEST_ICEBERG_PARTITION_DATETTIME_STARTDATE = 
TEST_ICEBERG_PARTITION_DATETTIME + ".startdate";
+  private static final String TEST_ICEBERG_PARTITION_DATETTIME_ENDDATE = 
TEST_ICEBERG_PARTITION_DATETTIME + ".enddate";
+  private static final String PARTITION_COLUMN_NAME = "partitionColumn";
+  private static final String PARTITION_PATTERN = "yyyy-MM-dd";
+  private static final String START_DATE = "2024-01-01";
+  private static final String END_DATE = "2024-12-31";
+  private TableMetadata mockTableMetadata;
+  private Properties mockProperties;
+  private StructLike mockPartition;
+  private IcebergDateTimePartitionFilterPredicate 
mockDateTimePartitionFilterPredicate;
+  private MockedStatic<IcebergPartitionFilterPredicateUtil> 
icebergPartitionFilterPredicateUtilMockedStatic;
+
+  @BeforeMethod
+  public void setup() {
+    mockTableMetadata = Mockito.mock(TableMetadata.class);
+    icebergPartitionFilterPredicateUtilMockedStatic = 
Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class);
+    icebergPartitionFilterPredicateUtilMockedStatic.when(
+            () -> 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
 Mockito.any(TableMetadata.class), Mockito.anyList()))
+        .thenReturn(0);
+
+    mockProperties = new Properties();
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_PATTERN, 
PARTITION_PATTERN);
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_STARTDATE, 
START_DATE);
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_ENDDATE, 
END_DATE);
+
+    mockDateTimePartitionFilterPredicate = new 
IcebergDateTimePartitionFilterPredicate(
+        PARTITION_COLUMN_NAME,
+        mockTableMetadata,
+        mockProperties
+    );
+
+    mockPartition = Mockito.mock(StructLike.class);
+  }
+
+  @AfterMethod
+  public void cleanup() {
+    icebergPartitionFilterPredicateUtilMockedStatic.close();
+  }
+
+  @Test
+  public void testWhenPartitionIsNull() {
+    Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(null));
+  }
+
+  @Test
+  public void testPartitionColumnNotFound() {
+    icebergPartitionFilterPredicateUtilMockedStatic.when(
+            () -> 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
 Mockito.any(TableMetadata.class), Mockito.anyList()))
+        .thenReturn(-1);
+    verifyIllegalArgumentExceptionWithMessage("Partition column 
partitionColumn not found");
+  }
+
+  @Test
+  public void testPartitionBeforeRange() {
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(String.class))).thenReturn("2023-12-31");
+    
Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionWithinRange() {
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(String.class))).thenReturn("2024-06-15");
+    
Assert.assertTrue(mockDateTimePartitionFilterPredicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionOnStartDate() {
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(String.class))).thenReturn(START_DATE);
+    
Assert.assertTrue(mockDateTimePartitionFilterPredicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionOnEndDate() {
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(String.class))).thenReturn(END_DATE);
+    
Assert.assertTrue(mockDateTimePartitionFilterPredicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionAfterRange() {
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(String.class))).thenReturn("2025-01-01");
+    
Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionValueIsBlank() {
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(String.class))).thenReturn("");
+    
Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionValueIsNull() {
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(String.class))).thenReturn(null);
+    
Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition));
+  }
+
+  @Test
+  public void testMissingPartitionPattern() {
+    mockProperties.remove(TEST_ICEBERG_PARTITION_DATETTIME_PATTERN);
+    verifyIllegalArgumentExceptionWithMessage("DateTime Partition pattern 
cannot be empty");
+  }
+
+  @Test
+  public void testInvalidPartitionPattern() {
+    mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_PATTERN, 
"invalid-pattern");
+    verifyIllegalArgumentExceptionWithMessage("Illegal pattern");
+  }
+
+  @Test
+  public void testMissingStartDate() {
+    mockProperties.remove(TEST_ICEBERG_PARTITION_DATETTIME_STARTDATE);
+    verifyIllegalArgumentExceptionWithMessage("DateTime Partition start date 
cannot be empty");
+  }

Review Comment:
   a lot of these are a distraction from the crux of what needs testing - the 
predicate's TimeDate semantics.  as mentioned in the class itself, I suggest to 
leave prop wrangling to the caller, to streamline the predicate impl and its 
tests.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.TableMetadata;
+
+/**
+ * Utility class for creating and managing partition filter predicates for 
Iceberg tables.
+ * <p>
+ * This class provides methods to retrieve the index of a partition column in 
the table metadata
+ * and ensures that the partition transform is supported.
+ * </p>
+ * <p>
+ * Note: This class is not meant to be instantiated.
+ * </p>
+ */
+public class IcebergPartitionFilterPredicateUtil {
+  private IcebergPartitionFilterPredicateUtil() {
+  }
+
+  /**
+   * Retrieves the index of the partition column from the partition spec in 
the table metadata.
+   *
+   * @param partitionColumnName the name of the partition column to find
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param supportedTransforms a list of supported partition transforms
+   * @return the index of the partition column if found, otherwise -1
+   * @throws IllegalArgumentException if the partition transform is not 
supported
+   */
+  public static int getPartitionColumnIndex(
+      String partitionColumnName,
+      TableMetadata tableMetadata,
+      List<String> supportedTransforms
+  ) {
+    List<PartitionField> partitionFields = tableMetadata.spec().fields();
+    for (int idx = 0; idx < partitionFields.size(); idx++) {
+      PartitionField partitionField = partitionFields.get(idx);
+      if (partitionField.name().equals(partitionColumnName)) {
+        String transform = partitionField.transform().toString().toLowerCase();
+        if (!supportedTransforms.contains(transform)) {
+          throw new IllegalArgumentException(
+              String.format("Partition transform %s is not supported. 
Supported transforms are %s", transform,

Review Comment:
   suggest also to log the `PartitionField::name` and maybe also `idx`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergReplacePartitionsStep.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.util.retry.RetryerFactory;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
+
+/**
+ * Commit step for replacing partitions in an Iceberg table.
+ * <p>
+ * This class implements the {@link CommitStep} interface and provides 
functionality to replace
+ * partitions in the destination Iceberg table using serialized data files.
+ * </p>
+ */
+@Slf4j
+public class IcebergReplacePartitionsStep implements CommitStep {
+  private final String destTableIdStr;
+  private final Properties properties;
+  private final byte[] serializedDataFiles;
+  public static final String REPLACE_PARTITIONS_RETRYER_CONFIG_PREFIX = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".catalog.replace.partitions.retries";
+  private static final Config RETRYER_FALLBACK_CONFIG = 
ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
+      RETRY_TIMES, 3,
+      RETRY_TYPE, RetryType.FIXED_ATTEMPT.name()));
+
+  /**
+   * Constructs an {@code IcebergReplacePartitionsStep} with the specified 
parameters.
+   *
+   * @param destTableIdStr the identifier of the destination table as a string
+   * @param serializedDataFiles the serialized data files to be used for 
replacing partitions
+   * @param properties the properties containing configuration
+   */
+  public IcebergReplacePartitionsStep(String destTableIdStr, byte[] 
serializedDataFiles, Properties properties) {
+    this.destTableIdStr = destTableIdStr;
+    this.serializedDataFiles = serializedDataFiles;
+    this.properties = properties;
+  }
+
+  @Override
+  public boolean isCompleted() {
+    return false;
+  }
+
+  /**
+   * Executes the partition replacement in the destination Iceberg table.
+   * Also, have retry mechanism as done in {@link 
IcebergRegisterStep#execute()}
+   *
+   * @throws IOException if an I/O error occurs during execution
+   */
+  @Override
+  public void execute() throws IOException {

Review Comment:
   the `IcebergRegisterStep` checks whether verify current dest-side metadata 
remains the same as observed just prior to first loading source catalog table 
metadata.  would that be worthwhile here too?
   
   (if so, let's see whether any opportunity for reuse, by defining a common 
base class for both of these iceberg commit steps)



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    List<PartitionData> partitionDataList = Collections.nCopies(5, 
partitionData);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+
+    catalog.dropTable(testTableId);
+  }
+
+  @Test
+  public void testReplacePartitions() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, 1L);
+    List<PartitionData> partitionDataList = Arrays.asList(partitionData, 
partitionData2);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/id=2/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData3 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData3.set(0, 2L);
+    PartitionData partitionData4 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData4.set(0, 2L);
+    List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, 
partitionData4);
+
+    List<DataFile> dataFiles = getDataFiles(paths2, partitionDataList2);
+    // here, since partition data with value 2 doesn't exist yet, we expect it 
to get added to the table
+    icebergTable.replacePartitions(dataFiles);
+    List<String> expectedPaths = new ArrayList<>(paths);
+    expectedPaths.addAll(paths2);
+    verifyAnyOrder(expectedPaths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");

Review Comment:
   technically we might also call 
`icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths()` prior to 
`.replacePartitions`



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    List<PartitionData> partitionDataList = Collections.nCopies(5, 
partitionData);

Review Comment:
   is 5 == `paths.size()`?
   



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    List<PartitionData> partitionDataList = Collections.nCopies(5, 
partitionData);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+
+    catalog.dropTable(testTableId);
+  }
+
+  @Test
+  public void testReplacePartitions() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, 1L);
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, 1L);
+    List<PartitionData> partitionDataList = Arrays.asList(partitionData, 
partitionData2);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/id=2/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData3 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData3.set(0, 2L);
+    PartitionData partitionData4 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData4.set(0, 2L);
+    List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, 
partitionData4);
+
+    List<DataFile> dataFiles = getDataFiles(paths2, partitionDataList2);
+    // here, since partition data with value 2 doesn't exist yet, we expect it 
to get added to the table
+    icebergTable.replacePartitions(dataFiles);
+    List<String> expectedPaths = new ArrayList<>(paths);
+    expectedPaths.addAll(paths2);
+    verifyAnyOrder(expectedPaths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths3 = Arrays.asList(
+      "/path/tableName/data/id=1/file5.orc",
+      "/path/tableName/data/id=1/file6.orc"
+    );
+    // Reusing same partition dats to create data file with different paths
+    List<DataFile> dataFiles2 = getDataFiles(paths3, partitionDataList);

Review Comment:
   nit: it's confusing to have `paths3` associated w/ `dataFiles2` - please 
keep them aligned (even if you skip a number)
   
   that said, the name would be even better if it reflected the commonality of 
the partition IDs.  e.g.:
   ```
   List<DataFile> dataFiles3_pt1 = getDataFiles(paths3_pt1, partitionDataList1);
   ... // below...
   List<String> nowExpectedPaths = new ArrayList<>(paths2_pt2);
   nowExpectedPaths.addAll(paths3_pt1);
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"

Review Comment:
   the `/id=1/` naming convention isn't necessary, is it?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergReplacePartitionsStep.java:
##########


Review Comment:
   wow, so much code that's nearly... but not quite exactly... a copy of 
`IcebergRegisterStep`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);

Review Comment:
   let's put this and the `ICEBERG_PARTITION_NAME_KEY` constant into the 
companion finder



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -170,7 +170,7 @@ protected static boolean 
getConfigShouldCopyMetadataPath(Properties properties)
   }
 
   /** @return property value or `null` */
-  protected static String getLocationQualifiedProperty(Properties properties, 
CatalogLocation location, String relativePropName) {
+  public static String getLocationQualifiedProperty(Properties properties, 
CatalogLocation location, String relativePropName) {

Review Comment:
   based on my suggestions to rework the filter predicate ctor params, I'm 
curious whether this can be accessed entirely within 
`IcebergPartitionDatasetFinder` and therefore could remain `protected`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    this.partitionFilterPredicate = 
IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName,
+        srcTableMetadata, properties);
+  }
+
+  /**
+   * Represents the destination file paths and the corresponding file status 
in source file system.
+   * These both properties are used in creating {@link CopyEntity}
+   */
+  @Data
+  protected static final class FilePathsWithStatus {
+    private final Path destPath;
+    private final FileStatus srcFileStatus;
+  }

Review Comment:
   the base class uses `Map<Path, FileStatus>`.  did that seem inefficient or 
was there another reason to deviate?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    this.partitionFilterPredicate = 
IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName,
+        srcTableMetadata, properties);
+  }
+
+  /**
+   * Represents the destination file paths and the corresponding file status 
in source file system.
+   * These both properties are used in creating {@link CopyEntity}
+   */
+  @Data
+  protected static final class FilePathsWithStatus {
+    private final Path destPath;
+    private final FileStatus srcFileStatus;
+  }
+
+  /**
+   * Generates copy entities for partition based data movement.
+   * It finds files specific to the partition and create destination data 
files based on the source data files.
+   * Also updates the destination data files with destination table write data 
location and add UUID to the file path
+   * to avoid conflicts.
+   *
+   * @param targetFs the target file system
+   * @param copyConfig the copy configuration
+   * @return a collection of copy entities
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
+    String fileSet = this.getFileSetId();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    IcebergTable srcIcebergTable = getSrcIcebergTable();
+    List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
+    List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
+    Configuration defaultHadoopConfiguration = new Configuration();
+
+    for (FilePathsWithStatus filePathsWithStatus : 
getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) {
+      Path destPath = filePathsWithStatus.getDestPath();
+      FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus();
+      FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+      CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+              actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), 
copyConfig)
+          .fileSet(fileSet)
+          .datasetOutputPath(targetFs.getUri().getPath())
+          .build();

Review Comment:
   you skip first doing this, like in `IcebergDataset`:
   ```
         // preserving ancestor permissions till root path's child between src 
and dest
         List<OwnerAndPermission> ancestorOwnerAndPermissionList =
             
CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(actualSourceFs,
                 srcPath.getParent(), greatestAncestorPath, copyConfig);
   ```
   is that intentional?  do you feel it's not necessary or actually 
contra-indicated?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    this.partitionFilterPredicate = 
IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName,
+        srcTableMetadata, properties);
+  }
+
+  /**
+   * Represents the destination file paths and the corresponding file status 
in source file system.
+   * These both properties are used in creating {@link CopyEntity}
+   */
+  @Data
+  protected static final class FilePathsWithStatus {
+    private final Path destPath;
+    private final FileStatus srcFileStatus;
+  }
+
+  /**
+   * Generates copy entities for partition based data movement.
+   * It finds files specific to the partition and create destination data 
files based on the source data files.
+   * Also updates the destination data files with destination table write data 
location and add UUID to the file path
+   * to avoid conflicts.
+   *
+   * @param targetFs the target file system
+   * @param copyConfig the copy configuration
+   * @return a collection of copy entities
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {

Review Comment:
   this impl is really, really similar to the one it's based on in its base 
class.  deriving from a class and then overriding methods w/ only small changes 
is pretty nearly cut-and-paste code.  sometimes it's inevitable, but let's 
avoid when we can.  in this case, could we NOT override this method, but only 
`GetFilePathsToFileStatusResult getFilePathsToFileStatus(...)` so it handles 
this new code instead:
   ```
       IcebergTable srcIcebergTable = getSrcIcebergTable();
       List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
       List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
       Configuration defaultHadoopConfiguration = new Configuration();
   
       for (FilePathsWithStatus filePathsWithStatus : 
getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) {
   ...
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    this.partitionFilterPredicate = 
IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName,
+        srcTableMetadata, properties);
+  }
+
+  /**
+   * Represents the destination file paths and the corresponding file status 
in source file system.
+   * These both properties are used in creating {@link CopyEntity}
+   */
+  @Data
+  protected static final class FilePathsWithStatus {
+    private final Path destPath;
+    private final FileStatus srcFileStatus;
+  }
+
+  /**
+   * Generates copy entities for partition based data movement.
+   * It finds files specific to the partition and create destination data 
files based on the source data files.
+   * Also updates the destination data files with destination table write data 
location and add UUID to the file path
+   * to avoid conflicts.
+   *
+   * @param targetFs the target file system
+   * @param copyConfig the copy configuration
+   * @return a collection of copy entities
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
+    String fileSet = this.getFileSetId();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    IcebergTable srcIcebergTable = getSrcIcebergTable();
+    List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
+    List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
+    Configuration defaultHadoopConfiguration = new Configuration();
+
+    for (FilePathsWithStatus filePathsWithStatus : 
getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) {
+      Path destPath = filePathsWithStatus.getDestPath();
+      FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus();
+      FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+      CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+              actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), 
copyConfig)
+          .fileSet(fileSet)
+          .datasetOutputPath(targetFs.getUri().getPath())
+          .build();
+
+      fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+      fileEntity.setDestinationData(getDestinationDataset(targetFs));
+      copyEntities.add(fileEntity);
+    }
+
+    // Adding this check to avoid adding post publish step when there are no 
files to copy.
+    if (CollectionUtils.isNotEmpty(destDataFiles)) {
+      copyEntities.add(createPostPublishStep(destDataFiles));
+    }
+
+    log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size());
+    return copyEntities;
+  }
+
+  private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws 
IcebergTable.TableNotFoundException {
+    List<DataFile> destDataFiles = new ArrayList<>();
+    if (srcDataFiles.isEmpty()) {
+      return destDataFiles;
+    }
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    TableMetadata destTableMetadata = 
getDestIcebergTable().accessTableMetadata();
+    PartitionSpec partitionSpec = destTableMetadata.spec();
+    String srcWriteDataLocation = 
srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
+    String destWriteDataLocation = 
destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
+    if (StringUtils.isEmpty(srcWriteDataLocation) || 
StringUtils.isEmpty(destWriteDataLocation)) {
+      log.warn(
+          String.format("Either source or destination table does not have 
write data location : source table write data location : {%s} , destination 
table write data location : {%s}",
+              srcWriteDataLocation,
+              destWriteDataLocation
+          )
+      );
+    }
+    // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns 
null if the property is not set and
+    // doesn't respect passed default value, so to avoid NPE in .replace() we 
are setting it to empty string.
+    String prefixToBeReplaced = (srcWriteDataLocation != null) ? 
srcWriteDataLocation : "";
+    String prefixToReplaceWith = (destWriteDataLocation != null) ? 
destWriteDataLocation : "";
+    srcDataFiles.forEach(dataFile -> {
+      String curDestFilePath = dataFile.path().toString();
+      String newDestFilePath = curDestFilePath.replace(prefixToBeReplaced, 
prefixToReplaceWith);
+      String updatedDestFilePath = addUUIDToPath(newDestFilePath);

Review Comment:
   let's simplify by abstracting everything into a method from `Path -> Path` 
or `Path -> String`, perhaps named `relocateDestPath`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+
+  private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  private final Predicate<StructLike> partitionFilterPredicate;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws 
IcebergTable.TableNotFoundException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+
+    String partitionColumnName =
+        IcebergDatasetFinder.getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+            ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    this.partitionFilterPredicate = 
IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName,
+        srcTableMetadata, properties);
+  }
+
+  /**
+   * Represents the destination file paths and the corresponding file status 
in source file system.
+   * These both properties are used in creating {@link CopyEntity}
+   */
+  @Data
+  protected static final class FilePathsWithStatus {
+    private final Path destPath;
+    private final FileStatus srcFileStatus;
+  }
+
+  /**
+   * Generates copy entities for partition based data movement.
+   * It finds files specific to the partition and create destination data 
files based on the source data files.
+   * Also updates the destination data files with destination table write data 
location and add UUID to the file path
+   * to avoid conflicts.
+   *
+   * @param targetFs the target file system
+   * @param copyConfig the copy configuration
+   * @return a collection of copy entities
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
+    String fileSet = this.getFileSetId();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    IcebergTable srcIcebergTable = getSrcIcebergTable();
+    List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
+    List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
+    Configuration defaultHadoopConfiguration = new Configuration();
+
+    for (FilePathsWithStatus filePathsWithStatus : 
getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) {
+      Path destPath = filePathsWithStatus.getDestPath();
+      FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus();
+      FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+      CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+              actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), 
copyConfig)
+          .fileSet(fileSet)
+          .datasetOutputPath(targetFs.getUri().getPath())
+          .build();
+
+      fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+      fileEntity.setDestinationData(getDestinationDataset(targetFs));
+      copyEntities.add(fileEntity);
+    }
+
+    // Adding this check to avoid adding post publish step when there are no 
files to copy.
+    if (CollectionUtils.isNotEmpty(destDataFiles)) {
+      copyEntities.add(createPostPublishStep(destDataFiles));
+    }
+
+    log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size());
+    return copyEntities;
+  }
+
+  private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws 
IcebergTable.TableNotFoundException {
+    List<DataFile> destDataFiles = new ArrayList<>();
+    if (srcDataFiles.isEmpty()) {
+      return destDataFiles;
+    }
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    TableMetadata destTableMetadata = 
getDestIcebergTable().accessTableMetadata();
+    PartitionSpec partitionSpec = destTableMetadata.spec();
+    String srcWriteDataLocation = 
srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
+    String destWriteDataLocation = 
destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
+    if (StringUtils.isEmpty(srcWriteDataLocation) || 
StringUtils.isEmpty(destWriteDataLocation)) {
+      log.warn(
+          String.format("Either source or destination table does not have 
write data location : source table write data location : {%s} , destination 
table write data location : {%s}",
+              srcWriteDataLocation,
+              destWriteDataLocation
+          )
+      );

Review Comment:
   let's avoid starting a bunch of work only to discover we can't continue 
because certain props weren't set.  can we validate these in the ctor at the 
beginning (or even possibly in the caller) and fail-fast when missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to