phet commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1773860832
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws IOException if an I/O error occurs while accessing the table metadata or reading manifest files + */ + public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate) throws IOException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); Review Comment: please add a comment to document why `dataManifests`, rather than `Snapshot::allManifests` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws IOException if an I/O error occurs while accessing the table metadata or reading manifest files + */ + public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate) throws IOException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); + List<DataFile> dataFileList = new ArrayList<>(); + for (ManifestFile manifestFile : dataManifestFiles) { + ManifestReader<DataFile> manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io()); + CloseableIterator<DataFile> dataFiles = manifestReader.iterator(); Review Comment: `try` (with resources) to `.close()` the `CloseableIterator`s? ...also, does the `ManifestReader` need closing? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; + +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +/** + * Finder class for locating and creating partitioned Iceberg datasets. + * <p> + * This class extends {@link IcebergDatasetFinder} and provides functionality to create + * {@link IcebergPartitionDataset} instances based on the specified source and destination Iceberg catalogs. + * </p> + */ +@Slf4j +public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { + public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { + super(sourceFs, properties); + } + + /** + * Finds the {@link IcebergPartitionDataset}s in the file system using the Iceberg Catalog. Both Iceberg database name and table + * name are mandatory based on current implementation. + * <p> + * Overriding this method to put a check whether source and destination db & table names are passed in the properties as separate values + * </p> + * @return List of {@link IcebergPartitionDataset}s in the file system. + * @throws IOException if there is an error while finding the datasets. + */ + @Override + public List<IcebergDataset> findDatasets() throws IOException { + String srcDbName = getLocationQualifiedProperty(this.properties, CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY); + String destDbName = getLocationQualifiedProperty(this.properties, CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY); + String srcTableName = getLocationQualifiedProperty(this.properties, CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY); + String destTableName = getLocationQualifiedProperty(this.properties, CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY); + + if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(destDbName) || StringUtils.isBlank(srcTableName) + || StringUtils.isBlank(destTableName)) { + String errorMsg = String.format("Missing (at least some) IcebergDataset properties - source: ('%s' and '%s') and destination: ('%s' and '%s') ", + srcDbName, srcTableName, destDbName, destTableName); + log.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + + IcebergCatalog srcIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); + IcebergCatalog destIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); + + return Collections.singletonList(createIcebergDataset( + srcIcebergCatalog, srcDbName, srcTableName, + destIcebergCatalog, destDbName, destTableName, + this.properties, this.sourceFs + )); + } + + /** + * Creates an {@link IcebergPartitionDataset} instance for the specified source and destination Iceberg tables. + */ + @Override + protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog destinationIcebergCatalog, String destDbName, String destTableName, Properties properties, FileSystem fs) throws IOException { + IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, srcTableName); + Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), + String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, srcTableName)); + IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName); + Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), + String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName)); +// TODO: Add Validator for source and destination tables later +// TableMetadata srcTableMetadata = srcIcebergTable.accessTableMetadata(); +// TableMetadata destTableMetadata = destIcebergTable.accessTableMetadata(); +// IcebergTableMetadataValidator.validateSourceAndDestinationTablesMetadata(srcTableMetadata, destTableMetadata); + return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties)); Review Comment: the only difference in this overriden method is to name `IcebergPartitionDataset` instead of `IcebergDataset` - the rest is repeated unchanged from the base class. a simple refactoring would both make this crystal clear AND reduce duplicative code. e.g. just add a method in `IcebergDatasetFinder`: ``` protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs, boolean shouldIncludeMetadataPath) { return IcebergDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); } ``` that then could be the ONLY `@Override` within `IcebergPartitionDatasetFinder`. ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; + +/** + * Predicate implementation for filtering Iceberg partitions based on datetime values. + * <p> + * This class filters partitions by checking if the partition datetime falls within a specified range. + * </p> + * <ul> + * <li>The datetime partition column is expected to be a string column.</li> + * <li>The datetime partition column values are expected to be in the format specified by the pattern in the properties.</li> + * <li>The start and end dates are also specified in the properties.</li> + * </ul> + */ +public class IcebergDateTimePartitionFilterPredicate implements Predicate<StructLike> { + + private static final List<String> supportedTransforms = ImmutableList.of("identity"); + private static final String DATETIME_PARTITION_KEY = "partition.datetime"; + private static final String DATETIME_PARTITION_PATTERN_KEY = DATETIME_PARTITION_KEY + ".pattern"; + private static final String DATETIME_PARTITION_STARTDATE_KEY = DATETIME_PARTITION_KEY + ".startdate"; + private static final String DATETIME_PARTITION_ENDDATE_KEY = DATETIME_PARTITION_KEY + ".enddate"; + private final int partitionColumnIndex; + private final DateTimeFormatter dateTimeFormatter; + private final DateTime startDate; + private final DateTime endDate; + + /** + * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnName the name of the partition column + * @param tableMetadata the metadata of the Iceberg table + * @param properties the properties containing partition configuration + * @throws IllegalArgumentException if the partition column is not found or required properties are missing + */ + public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, TableMetadata tableMetadata, + Properties properties) { + + this.partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName, + tableMetadata, supportedTransforms);; + Preconditions.checkArgument(this.partitionColumnIndex != -1, + String.format("Partition column %s not found", partitionColumnName)); + + String partitionPattern = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_PATTERN_KEY); + + String startDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_STARTDATE_KEY); + + String endDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_ENDDATE_KEY); + + Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), "DateTime Partition pattern cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), "DateTime Partition start date cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime Partition end date cannot be empty"); + + this.dateTimeFormatter = DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC); + this.startDate = this.dateTimeFormatter.parseDateTime(startDateVal); + this.endDate = this.dateTimeFormatter.parseDateTime(endDateVal); + } + + /** + * Check if the partition datetime falls within the specified range. + * + * @param partition the datetime partition to check + * @return {@code true} if the datetime partition value is within the range, otherwise {@code false} + */ + @Override + public boolean test(StructLike partition) { + // Just a cautious check to avoid NPE, ideally partition shouldn't be null if table is partitioned + if (Objects.isNull(partition)) { + return false; + } + + String partitionVal = partition.get(this.partitionColumnIndex, String.class); + if (StringUtils.isBlank(partitionVal)) { + return false; + } + + DateTime partitionDateTime = this.dateTimeFormatter.parseDateTime(partitionVal); + + if (partitionDateTime.isEqual(this.startDate) || partitionDateTime.isEqual(this.endDate)) { + return true; + } + return partitionDateTime.isAfter(this.startDate) && partitionDateTime.isBefore(this.endDate); + } Review Comment: ``` return pdt.isEqual(start) || pdt.isEqual(end) || (pdt.isAfter(start) && pdt.isBefore(end)); ``` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + List<PartitionData> partitionDataList = Collections.nCopies(5, partitionData); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + + catalog.dropTable(testTableId); + } + + @Test + public void testReplacePartitions() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, 1L); + List<PartitionData> partitionDataList = Arrays.asList(partitionData, partitionData2); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + + List<String> paths2 = Arrays.asList( + "/path/tableName/data/id=2/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData3 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData3.set(0, 2L); + PartitionData partitionData4 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData4.set(0, 2L); + List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, partitionData4); + + List<DataFile> dataFiles = getDataFiles(paths2, partitionDataList2); + // here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table + icebergTable.replacePartitions(dataFiles); + List<String> expectedPaths = new ArrayList<>(paths); + expectedPaths.addAll(paths2); + verifyAnyOrder(expectedPaths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths3 = Arrays.asList( + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file6.orc" + ); + // Reusing same partition dats to create data file with different paths + List<DataFile> dataFiles2 = getDataFiles(paths3, partitionDataList); + // here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path + icebergTable.replacePartitions(dataFiles2); + List<String> updExpectedPaths = new ArrayList<>(paths2); + updExpectedPaths.addAll(paths3); + verifyAnyOrder(updExpectedPaths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + catalog.dropTable(testTableId); + } + + private static void addPartitionDataFiles(Table table, List<String> paths, List<PartitionData> partitionDataList) { + Assert.assertEquals(paths.size(), partitionDataList.size()); + for (int i = 0; i < paths.size(); i++) { + DataFile dataFile = createDataFileWithPartition(paths.get(i), partitionDataList.get(i)); + table.newAppend().appendFile(dataFile).commit(); Review Comment: looks like you could implement in terms of the other: ``` getDataFiles(paths, partitionDataList).stream().forEach(dataFile -> table.newAppend().appendFile(dataFile).commit() ); ``` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + List<PartitionData> partitionDataList = Collections.nCopies(5, partitionData); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + + catalog.dropTable(testTableId); + } + + @Test + public void testReplacePartitions() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, 1L); + List<PartitionData> partitionDataList = Arrays.asList(partitionData, partitionData2); Review Comment: the `Collections.nCopies` above was clearer ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + this.partitionFilterPredicate = IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName, + srcTableMetadata, properties); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) { + Path destPath = filePathsWithStatus.getDestPath(); + FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); + + fileEntity.setSourceData(getSourceDataset(this.sourceFs)); + fileEntity.setDestinationData(getDestinationDataset(targetFs)); + copyEntities.add(fileEntity); + } + + // Adding this check to avoid adding post publish step when there are no files to copy. + if (CollectionUtils.isNotEmpty(destDataFiles)) { + copyEntities.add(createPostPublishStep(destDataFiles)); + } Review Comment: I agree this is one difference with `IcebergDataset::generateCopyEntities`, which always wants to add its post-publish step. (but it shouldn't be hard to refactor to isolate this difference) ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + this.partitionFilterPredicate = IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName, + srcTableMetadata, properties); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) { + Path destPath = filePathsWithStatus.getDestPath(); + FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); + + fileEntity.setSourceData(getSourceDataset(this.sourceFs)); + fileEntity.setDestinationData(getDestinationDataset(targetFs)); + copyEntities.add(fileEntity); + } + + // Adding this check to avoid adding post publish step when there are no files to copy. + if (CollectionUtils.isNotEmpty(destDataFiles)) { + copyEntities.add(createPostPublishStep(destDataFiles)); + } + + log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size()); + return copyEntities; + } + + private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws IcebergTable.TableNotFoundException { + List<DataFile> destDataFiles = new ArrayList<>(); + if (srcDataFiles.isEmpty()) { + return destDataFiles; + } + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + TableMetadata destTableMetadata = getDestIcebergTable().accessTableMetadata(); + PartitionSpec partitionSpec = destTableMetadata.spec(); + String srcWriteDataLocation = srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + String destWriteDataLocation = destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + if (StringUtils.isEmpty(srcWriteDataLocation) || StringUtils.isEmpty(destWriteDataLocation)) { + log.warn( + String.format("Either source or destination table does not have write data location : source table write data location : {%s} , destination table write data location : {%s}", + srcWriteDataLocation, + destWriteDataLocation + ) + ); + } + // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns null if the property is not set and + // doesn't respect passed default value, so to avoid NPE in .replace() we are setting it to empty string. + String prefixToBeReplaced = (srcWriteDataLocation != null) ? srcWriteDataLocation : ""; + String prefixToReplaceWith = (destWriteDataLocation != null) ? destWriteDataLocation : ""; + srcDataFiles.forEach(dataFile -> { + String curDestFilePath = dataFile.path().toString(); + String newDestFilePath = curDestFilePath.replace(prefixToBeReplaced, prefixToReplaceWith); + String updatedDestFilePath = addUUIDToPath(newDestFilePath); + destDataFiles.add(DataFiles.builder(partitionSpec) + .copy(dataFile) + .withPath(updatedDestFilePath) + .build()); + }); + return destDataFiles; + } + + private String addUUIDToPath(String filePathStr) { + Path filePath = new Path(filePathStr); + String fileDir = filePath.getParent().toString(); + String fileName = filePath.getName(); + String newFileName = UUID.randomUUID() + "-" + fileName; + return String.join("/", fileDir, newFileName); Review Comment: usually best to avoid hard-coding path separators, like `/`: ``` Path filePath = new Path(filePathStr); String fileName = filePath.getName(); String newFileName = UUID.randomUUID() + "-" + fileName; return new Path(filePath.getParent(), newFileName).toString() ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; + +/** + * Predicate implementation for filtering Iceberg partitions based on datetime values. + * <p> + * This class filters partitions by checking if the partition datetime falls within a specified range. + * </p> + * <ul> + * <li>The datetime partition column is expected to be a string column.</li> + * <li>The datetime partition column values are expected to be in the format specified by the pattern in the properties.</li> + * <li>The start and end dates are also specified in the properties.</li> + * </ul> + */ +public class IcebergDateTimePartitionFilterPredicate implements Predicate<StructLike> { + + private static final List<String> supportedTransforms = ImmutableList.of("identity"); + private static final String DATETIME_PARTITION_KEY = "partition.datetime"; + private static final String DATETIME_PARTITION_PATTERN_KEY = DATETIME_PARTITION_KEY + ".pattern"; + private static final String DATETIME_PARTITION_STARTDATE_KEY = DATETIME_PARTITION_KEY + ".startdate"; + private static final String DATETIME_PARTITION_ENDDATE_KEY = DATETIME_PARTITION_KEY + ".enddate"; + private final int partitionColumnIndex; + private final DateTimeFormatter dateTimeFormatter; + private final DateTime startDate; + private final DateTime endDate; + + /** + * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnName the name of the partition column + * @param tableMetadata the metadata of the Iceberg table + * @param properties the properties containing partition configuration + * @throws IllegalArgumentException if the partition column is not found or required properties are missing + */ + public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, TableMetadata tableMetadata, + Properties properties) { + + this.partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName, + tableMetadata, supportedTransforms);; + Preconditions.checkArgument(this.partitionColumnIndex != -1, + String.format("Partition column %s not found", partitionColumnName)); + + String partitionPattern = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_PATTERN_KEY); + + String startDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_STARTDATE_KEY); + + String endDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_ENDDATE_KEY); + + Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), "DateTime Partition pattern cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), "DateTime Partition start date cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime Partition end date cannot be empty"); + + this.dateTimeFormatter = DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC); + this.startDate = this.dateTimeFormatter.parseDateTime(startDateVal); + this.endDate = this.dateTimeFormatter.parseDateTime(endDateVal); + } + + /** + * Check if the partition datetime falls within the specified range. + * + * @param partition the datetime partition to check + * @return {@code true} if the datetime partition value is within the range, otherwise {@code false} + */ + @Override + public boolean test(StructLike partition) { + // Just a cautious check to avoid NPE, ideally partition shouldn't be null if table is partitioned + if (Objects.isNull(partition)) { + return false; + } + + String partitionVal = partition.get(this.partitionColumnIndex, String.class); Review Comment: is the value guaranteed always to be a `String`? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; + +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +/** + * Finder class for locating and creating partitioned Iceberg datasets. + * <p> + * This class extends {@link IcebergDatasetFinder} and provides functionality to create + * {@link IcebergPartitionDataset} instances based on the specified source and destination Iceberg catalogs. + * </p> + */ +@Slf4j +public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { + public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { + super(sourceFs, properties); + } + + /** + * Finds the {@link IcebergPartitionDataset}s in the file system using the Iceberg Catalog. Both Iceberg database name and table + * name are mandatory based on current implementation. + * <p> + * Overriding this method to put a check whether source and destination db & table names are passed in the properties as separate values + * </p> + * @return List of {@link IcebergPartitionDataset}s in the file system. + * @throws IOException if there is an error while finding the datasets. + */ + @Override + public List<IcebergDataset> findDatasets() throws IOException { + String srcDbName = getLocationQualifiedProperty(this.properties, CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY); + String destDbName = getLocationQualifiedProperty(this.properties, CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY); + String srcTableName = getLocationQualifiedProperty(this.properties, CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY); + String destTableName = getLocationQualifiedProperty(this.properties, CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY); Review Comment: I see you deleted some handling of legacy prop names, but beyond that, this appears to be basically the same impl as from the base class. so, why not just keep the base class version as-is, and only `@Override` `createIcebergTable`? update: please see next comment... ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws IOException if an I/O error occurs while accessing the table metadata or reading manifest files + */ + public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate) throws IOException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); + List<DataFile> dataFileList = new ArrayList<>(); + for (ManifestFile manifestFile : dataManifestFiles) { + ManifestReader<DataFile> manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io()); + CloseableIterator<DataFile> dataFiles = manifestReader.iterator(); + dataFiles.forEachRemaining(dataFile -> { + if (icebergPartitionFilterPredicate.test(dataFile.partition())) { + dataFileList.add(dataFile.copy()); + } + }); + } + return dataFileList; + } + + /** + * Replaces partitions in the table with the specified list of data files. + * + * @param dataFiles the list of data files to replace partitions with + */ + protected void replacePartitions(List<DataFile> dataFiles) { + if (dataFiles.isEmpty()) { + return; + } + ReplacePartitions replacePartitions = this.table.newReplacePartitions(); + dataFiles.forEach(replacePartitions::addFile); + replacePartitions.commit(); Review Comment: this API is not as I expected. we don't indicate anywhere which partitions to replace. does it basically replace the entirety of EVERY partition that any file we add belongs to? behavior like this could be worth documenting as a comment ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicate.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; + +import com.google.common.base.Splitter; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; + +/** + * Predicate implementation for filtering Iceberg partitions based on specified partition values. + * <p> + * This class filters partitions by checking if the partition value matches any of the specified values. + * </p> + */ +public class IcebergPartitionFilterPredicate implements Predicate<StructLike> { Review Comment: naming seems more like a base class or even an interface, than a stand-alone class impl... how about `IcebergMatchesAnyPropNamePartitionFilterPredicate`. ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateFactory.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.Properties; +import java.util.function.Predicate; +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; + +/** + * Factory class for creating partition filter predicates for Iceberg tables. + */ +public class IcebergPartitionFilterPredicateFactory { + private static final String ICEBERG_PARTITION_TYPE_KEY = "partition.type"; + private static final String DATETIME_PARTITION_TYPE = "datetime"; + + /** + * Creates a filter predicate for the given partition column name, table metadata, and properties. + * + * @param partitionColumnName the name of the partition column + * @param tableMetadata the metadata of the Iceberg table + * @param properties the properties containing partition type information + * @return a {@link Predicate} for filtering partitions + */ + public static Predicate<StructLike> getFilterPredicate(String partitionColumnName, TableMetadata tableMetadata, Review Comment: others may want different logic, so they might wish to implement their own factory. to enable that, make this an interface and thereby make the "factory method" non-`static`. also, `create` is the name conventionally used for a factory method. ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; + +/** + * Predicate implementation for filtering Iceberg partitions based on datetime values. + * <p> + * This class filters partitions by checking if the partition datetime falls within a specified range. + * </p> + * <ul> + * <li>The datetime partition column is expected to be a string column.</li> + * <li>The datetime partition column values are expected to be in the format specified by the pattern in the properties.</li> + * <li>The start and end dates are also specified in the properties.</li> + * </ul> + */ +public class IcebergDateTimePartitionFilterPredicate implements Predicate<StructLike> { + + private static final List<String> supportedTransforms = ImmutableList.of("identity"); + private static final String DATETIME_PARTITION_KEY = "partition.datetime"; + private static final String DATETIME_PARTITION_PATTERN_KEY = DATETIME_PARTITION_KEY + ".pattern"; + private static final String DATETIME_PARTITION_STARTDATE_KEY = DATETIME_PARTITION_KEY + ".startdate"; + private static final String DATETIME_PARTITION_ENDDATE_KEY = DATETIME_PARTITION_KEY + ".enddate"; + private final int partitionColumnIndex; + private final DateTimeFormatter dateTimeFormatter; + private final DateTime startDate; + private final DateTime endDate; + + /** + * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnName the name of the partition column + * @param tableMetadata the metadata of the Iceberg table + * @param properties the properties containing partition configuration + * @throws IllegalArgumentException if the partition column is not found or required properties are missing + */ + public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, TableMetadata tableMetadata, + Properties properties) { + + this.partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName, + tableMetadata, supportedTransforms);; + Preconditions.checkArgument(this.partitionColumnIndex != -1, + String.format("Partition column %s not found", partitionColumnName)); + + String partitionPattern = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_PATTERN_KEY); + + String startDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_STARTDATE_KEY); + + String endDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_ENDDATE_KEY); + + Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), "DateTime Partition pattern cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), "DateTime Partition start date cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime Partition end date cannot be empty"); + + this.dateTimeFormatter = DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC); Review Comment: is UTC part of the iceberg spec or just what we expect to be using? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicate.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; + +import com.google.common.base.Splitter; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; + +/** + * Predicate implementation for filtering Iceberg partitions based on specified partition values. + * <p> + * This class filters partitions by checking if the partition value matches any of the specified values. + * </p> + */ +public class IcebergPartitionFilterPredicate implements Predicate<StructLike> { + private static final List<String> supportedTransforms = ImmutableList.of("identity", "truncate"); + private static final String ICEBERG_PARTITION_VALUES_KEY = "partition.values"; + private final int partitionColumnIndex; + private final List<String> partitionValues; + private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings(); + + /** + * Constructs an {@code IcebergPartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnName the name of the partition column + * @param tableMetadata the metadata of the Iceberg table + * @param properties the properties containing partition configuration + * @throws IllegalArgumentException if the partition column is not found or required properties are missing + */ + public IcebergPartitionFilterPredicate(String partitionColumnName, TableMetadata tableMetadata, + Properties properties) { + this.partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName, + tableMetadata, supportedTransforms); + Preconditions.checkArgument(this.partitionColumnIndex != -1, + String.format("Partition column %s not found", partitionColumnName)); + + String partitionColumnValues = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_VALUES_KEY);; + Preconditions.checkArgument(StringUtils.isNotBlank(partitionColumnValues), + "Partition column values cannot be empty"); + + this.partitionValues = LIST_SPLITTER.splitToList(partitionColumnValues); Review Comment: as discussed with the other class, a better encapsulated ctor might be: ``` public IcebergMatchesAnyPropNamePartitionFilterPredicate( int partitionColumnIndex, List<String> targetValues) ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; + +/** + * Predicate implementation for filtering Iceberg partitions based on datetime values. + * <p> + * This class filters partitions by checking if the partition datetime falls within a specified range. + * </p> + * <ul> + * <li>The datetime partition column is expected to be a string column.</li> + * <li>The datetime partition column values are expected to be in the format specified by the pattern in the properties.</li> + * <li>The start and end dates are also specified in the properties.</li> + * </ul> + */ +public class IcebergDateTimePartitionFilterPredicate implements Predicate<StructLike> { + + private static final List<String> supportedTransforms = ImmutableList.of("identity"); + private static final String DATETIME_PARTITION_KEY = "partition.datetime"; + private static final String DATETIME_PARTITION_PATTERN_KEY = DATETIME_PARTITION_KEY + ".pattern"; + private static final String DATETIME_PARTITION_STARTDATE_KEY = DATETIME_PARTITION_KEY + ".startdate"; + private static final String DATETIME_PARTITION_ENDDATE_KEY = DATETIME_PARTITION_KEY + ".enddate"; + private final int partitionColumnIndex; + private final DateTimeFormatter dateTimeFormatter; + private final DateTime startDate; + private final DateTime endDate; + + /** + * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnName the name of the partition column + * @param tableMetadata the metadata of the Iceberg table + * @param properties the properties containing partition configuration + * @throws IllegalArgumentException if the partition column is not found or required properties are missing + */ + public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, TableMetadata tableMetadata, + Properties properties) { + + this.partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName, + tableMetadata, supportedTransforms);; + Preconditions.checkArgument(this.partitionColumnIndex != -1, + String.format("Partition column %s not found", partitionColumnName)); + + String partitionPattern = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_PATTERN_KEY); + + String startDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_STARTDATE_KEY); Review Comment: all of this property introspection reaches beyond the class' own impl to depend on conventions other classes may have set up. doing so really increases the complexity of the class, which complicates testability. much better would be to leave the property access to a layer that would call this class. e.g. define a constructor like: ``` public IcebergDateTimePartitionFilterPredicate( int partitionColumnIndex, DateTimeFormatter dateTimeFormatter, DateTime startDate, DateTime endDate) ``` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; + +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.transforms.Transform; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil} */ +public class IcebergPartitionFilterPredicateUtilTest { + private TableMetadata mockTableMetadata; + private final List<String> supportedTransforms = ImmutableList.of("supported1", "supported2"); + + private void setupMockData(String name, String transform) { Review Comment: how about providing an `int n` param of how many columns to create, and this method would create `col1` w/ `transform1`, `col2` with `transform2`, `col3`, etc. (all the way up to `n`)? that could be used for your final test case ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateTest.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.Properties; + +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicate} */ +public class IcebergPartitionFilterPredicateTest { + + private TableMetadata mockTableMetadata; + private Properties mockProperties; + private MockedStatic<IcebergPartitionFilterPredicateUtil> icebergPartitionFilterPredicateUtilMockedStatic; + private static final String TEST_ICEBERG_PARTITION_VALUES_KEY = "iceberg.dataset.source.partition.values"; + private static final String TEST_ICEBERG_PARTITION_VALUES_EXCEPTION_MESSAGE = "Partition column values cannot be empty"; + private static final String TEST_ICEBERG_PARTITION_COLUMN = "col1"; + private static final String TEST_ICEBERG_PARTITION_VALUES = "value1,value2"; + private static final String TEST_ICEBERG_PARTITION_VALUES_2 = "value1,value3,value2,value4"; + private static final String TEST_ICEBERG_PARTITION_VALUES_3 = "1,2,3,4"; + + @BeforeMethod + public void setup() { + mockTableMetadata = Mockito.mock(TableMetadata.class); + mockProperties = new Properties(); + icebergPartitionFilterPredicateUtilMockedStatic = Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class); + icebergPartitionFilterPredicateUtilMockedStatic.when( + () -> IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(), Mockito.any(TableMetadata.class), Mockito.anyList())) + .thenReturn(0); + } + + @AfterMethod + public void cleanup() { + icebergPartitionFilterPredicateUtilMockedStatic.close(); + } + + @Test + public void testPartitionColumnNotFound() { + icebergPartitionFilterPredicateUtilMockedStatic.when( + () -> IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(), Mockito.any(TableMetadata.class), Mockito.anyList())) + .thenReturn(-1); + IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { + new IcebergPartitionFilterPredicate("nonexistentColumn", mockTableMetadata, mockProperties); + }); + Assert.assertEquals(exception.getMessage(), "Partition column nonexistentColumn not found"); + } + + @Test + public void testPartitionColumnValuesEmpty() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, ""); + verifyIllegalArgumentExceptionWithMessage(); + } + + @Test + public void testPartitionColumnValuesNULL() { + // Not setting values in mockProperties to test NULL value + verifyIllegalArgumentExceptionWithMessage(); + } + + @Test + public void testPartitionColumnValuesWhitespaces() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, " "); + verifyIllegalArgumentExceptionWithMessage(); + } + + @Test + public void testPartitionValueNULL() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, TEST_ICEBERG_PARTITION_VALUES); + IcebergPartitionFilterPredicate predicate = new IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN, + mockTableMetadata, mockProperties); + // Just mocking, so that the partition value is NULL + Assert.assertFalse(predicate.test(Mockito.mock(StructLike.class))); + } + + @Test + public void testWhenPartitionIsNull() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, TEST_ICEBERG_PARTITION_VALUES); + IcebergPartitionFilterPredicate predicate = new IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN, + mockTableMetadata, mockProperties); + Assert.assertFalse(predicate.test(null)); + } + + @Test + public void testPartitionValueMatch() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, TEST_ICEBERG_PARTITION_VALUES); + + IcebergPartitionFilterPredicate predicate = new IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN, + mockTableMetadata, mockProperties); + + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn("value1"); + + Assert.assertTrue(predicate.test(mockPartition)); + } + + @Test + public void testPartitionValueMatch2() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, TEST_ICEBERG_PARTITION_VALUES_2); + + IcebergPartitionFilterPredicate predicate = new IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN, + mockTableMetadata, mockProperties); + + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn("value2"); + + Assert.assertTrue(predicate.test(mockPartition)); + } + + @Test + public void testPartitionValueNoMatch() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, TEST_ICEBERG_PARTITION_VALUES); + + IcebergPartitionFilterPredicate predicate = new IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN, + mockTableMetadata, mockProperties); + + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn("value3"); + + Assert.assertFalse(predicate.test(mockPartition)); + } + + @Test + public void testPartitionValuesAsInt() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_VALUES_KEY, TEST_ICEBERG_PARTITION_VALUES_3); + + IcebergPartitionFilterPredicate predicate = new IcebergPartitionFilterPredicate(TEST_ICEBERG_PARTITION_COLUMN, + mockTableMetadata, mockProperties); + + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn(3); + Assert.assertTrue(predicate.test(mockPartition)); + + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn(4); + Assert.assertTrue(predicate.test(mockPartition)); + + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn(10); + Assert.assertFalse(predicate.test(mockPartition)); + + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn(Integer.MAX_VALUE); + Assert.assertFalse(predicate.test(mockPartition)); + + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn(Integer.MIN_VALUE); + Assert.assertFalse(predicate.test(mockPartition)); + } + + @Test + public void testPartitionValuesAsIntMaxMin() { Review Comment: what do you feel it tests to exercise max and min int, in addition to the test case just above? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateTest.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.Properties; + +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicate} */ +public class IcebergPartitionFilterPredicateTest { + + private TableMetadata mockTableMetadata; + private Properties mockProperties; + private MockedStatic<IcebergPartitionFilterPredicateUtil> icebergPartitionFilterPredicateUtilMockedStatic; + private static final String TEST_ICEBERG_PARTITION_VALUES_KEY = "iceberg.dataset.source.partition.values"; + private static final String TEST_ICEBERG_PARTITION_VALUES_EXCEPTION_MESSAGE = "Partition column values cannot be empty"; + private static final String TEST_ICEBERG_PARTITION_COLUMN = "col1"; + private static final String TEST_ICEBERG_PARTITION_VALUES = "value1,value2"; + private static final String TEST_ICEBERG_PARTITION_VALUES_2 = "value1,value3,value2,value4"; + private static final String TEST_ICEBERG_PARTITION_VALUES_3 = "1,2,3,4"; Review Comment: suggest naming to remind us what these hold: ``` TEST_ICEBERG_PARTITION_VALUES_1_AND_2 TEST_ICEBERG_PARTITION_VALUES_1_TO_4 TEST_ICEBERG_PARTITION_VALUES_INTS_1_TO_4 ``` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicateTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.Properties; + +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergDateTimePartitionFilterPredicate} */ +public class IcebergDateTimePartitionFilterPredicateTest { + private static final String TEST_ICEBERG_PARTITION_DATETTIME = "iceberg.dataset.source.partition.datetime"; + private static final String TEST_ICEBERG_PARTITION_DATETTIME_PATTERN = TEST_ICEBERG_PARTITION_DATETTIME + ".pattern"; + private static final String TEST_ICEBERG_PARTITION_DATETTIME_STARTDATE = TEST_ICEBERG_PARTITION_DATETTIME + ".startdate"; + private static final String TEST_ICEBERG_PARTITION_DATETTIME_ENDDATE = TEST_ICEBERG_PARTITION_DATETTIME + ".enddate"; + private static final String PARTITION_COLUMN_NAME = "partitionColumn"; + private static final String PARTITION_PATTERN = "yyyy-MM-dd"; + private static final String START_DATE = "2024-01-01"; + private static final String END_DATE = "2024-12-31"; + private TableMetadata mockTableMetadata; + private Properties mockProperties; + private StructLike mockPartition; + private IcebergDateTimePartitionFilterPredicate mockDateTimePartitionFilterPredicate; + private MockedStatic<IcebergPartitionFilterPredicateUtil> icebergPartitionFilterPredicateUtilMockedStatic; + + @BeforeMethod + public void setup() { + mockTableMetadata = Mockito.mock(TableMetadata.class); + icebergPartitionFilterPredicateUtilMockedStatic = Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class); + icebergPartitionFilterPredicateUtilMockedStatic.when( + () -> IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(), Mockito.any(TableMetadata.class), Mockito.anyList())) + .thenReturn(0); + + mockProperties = new Properties(); + mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_PATTERN, PARTITION_PATTERN); + mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_STARTDATE, START_DATE); + mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_ENDDATE, END_DATE); + + mockDateTimePartitionFilterPredicate = new IcebergDateTimePartitionFilterPredicate( + PARTITION_COLUMN_NAME, + mockTableMetadata, + mockProperties + ); + + mockPartition = Mockito.mock(StructLike.class); + } + + @AfterMethod + public void cleanup() { + icebergPartitionFilterPredicateUtilMockedStatic.close(); + } + + @Test + public void testWhenPartitionIsNull() { + Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(null)); + } + + @Test + public void testPartitionColumnNotFound() { + icebergPartitionFilterPredicateUtilMockedStatic.when( + () -> IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(), Mockito.any(TableMetadata.class), Mockito.anyList())) + .thenReturn(-1); + verifyIllegalArgumentExceptionWithMessage("Partition column partitionColumn not found"); + } + + @Test + public void testPartitionBeforeRange() { + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(String.class))).thenReturn("2023-12-31"); + Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition)); + } + + @Test + public void testPartitionWithinRange() { + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(String.class))).thenReturn("2024-06-15"); + Assert.assertTrue(mockDateTimePartitionFilterPredicate.test(mockPartition)); + } + + @Test + public void testPartitionOnStartDate() { + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(String.class))).thenReturn(START_DATE); + Assert.assertTrue(mockDateTimePartitionFilterPredicate.test(mockPartition)); + } + + @Test + public void testPartitionOnEndDate() { + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(String.class))).thenReturn(END_DATE); + Assert.assertTrue(mockDateTimePartitionFilterPredicate.test(mockPartition)); + } + + @Test + public void testPartitionAfterRange() { + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(String.class))).thenReturn("2025-01-01"); + Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition)); + } + + @Test + public void testPartitionValueIsBlank() { + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(String.class))).thenReturn(""); + Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition)); + } + + @Test + public void testPartitionValueIsNull() { + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(String.class))).thenReturn(null); + Assert.assertFalse(mockDateTimePartitionFilterPredicate.test(mockPartition)); + } + + @Test + public void testMissingPartitionPattern() { + mockProperties.remove(TEST_ICEBERG_PARTITION_DATETTIME_PATTERN); + verifyIllegalArgumentExceptionWithMessage("DateTime Partition pattern cannot be empty"); + } + + @Test + public void testInvalidPartitionPattern() { + mockProperties.setProperty(TEST_ICEBERG_PARTITION_DATETTIME_PATTERN, "invalid-pattern"); + verifyIllegalArgumentExceptionWithMessage("Illegal pattern"); + } + + @Test + public void testMissingStartDate() { + mockProperties.remove(TEST_ICEBERG_PARTITION_DATETTIME_STARTDATE); + verifyIllegalArgumentExceptionWithMessage("DateTime Partition start date cannot be empty"); + } Review Comment: a lot of these are a distraction from the crux of what needs testing - the predicate's TimeDate semantics. as mentioned in the class itself, I suggest to leave prop wrangling to the caller, to streamline the predicate impl and its tests. ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; + +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.TableMetadata; + +/** + * Utility class for creating and managing partition filter predicates for Iceberg tables. + * <p> + * This class provides methods to retrieve the index of a partition column in the table metadata + * and ensures that the partition transform is supported. + * </p> + * <p> + * Note: This class is not meant to be instantiated. + * </p> + */ +public class IcebergPartitionFilterPredicateUtil { + private IcebergPartitionFilterPredicateUtil() { + } + + /** + * Retrieves the index of the partition column from the partition spec in the table metadata. + * + * @param partitionColumnName the name of the partition column to find + * @param tableMetadata the metadata of the Iceberg table + * @param supportedTransforms a list of supported partition transforms + * @return the index of the partition column if found, otherwise -1 + * @throws IllegalArgumentException if the partition transform is not supported + */ + public static int getPartitionColumnIndex( + String partitionColumnName, + TableMetadata tableMetadata, + List<String> supportedTransforms + ) { + List<PartitionField> partitionFields = tableMetadata.spec().fields(); + for (int idx = 0; idx < partitionFields.size(); idx++) { + PartitionField partitionField = partitionFields.get(idx); + if (partitionField.name().equals(partitionColumnName)) { + String transform = partitionField.transform().toString().toLowerCase(); + if (!supportedTransforms.contains(transform)) { + throw new IllegalArgumentException( + String.format("Partition transform %s is not supported. Supported transforms are %s", transform, Review Comment: suggest also to log the `PartitionField::name` and maybe also `idx` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergReplacePartitionsStep.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.SerializationUtil; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.commit.CommitStep; +import org.apache.gobblin.util.retry.RetryerFactory; + +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS; +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES; +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE; +import static org.apache.gobblin.util.retry.RetryerFactory.RetryType; + +/** + * Commit step for replacing partitions in an Iceberg table. + * <p> + * This class implements the {@link CommitStep} interface and provides functionality to replace + * partitions in the destination Iceberg table using serialized data files. + * </p> + */ +@Slf4j +public class IcebergReplacePartitionsStep implements CommitStep { + private final String destTableIdStr; + private final Properties properties; + private final byte[] serializedDataFiles; + public static final String REPLACE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".catalog.replace.partitions.retries"; + private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of( + RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L), + RETRY_TIMES, 3, + RETRY_TYPE, RetryType.FIXED_ATTEMPT.name())); + + /** + * Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters. + * + * @param destTableIdStr the identifier of the destination table as a string + * @param serializedDataFiles the serialized data files to be used for replacing partitions + * @param properties the properties containing configuration + */ + public IcebergReplacePartitionsStep(String destTableIdStr, byte[] serializedDataFiles, Properties properties) { + this.destTableIdStr = destTableIdStr; + this.serializedDataFiles = serializedDataFiles; + this.properties = properties; + } + + @Override + public boolean isCompleted() { + return false; + } + + /** + * Executes the partition replacement in the destination Iceberg table. + * Also, have retry mechanism as done in {@link IcebergRegisterStep#execute()} + * + * @throws IOException if an I/O error occurs during execution + */ + @Override + public void execute() throws IOException { Review Comment: the `IcebergRegisterStep` checks whether verify current dest-side metadata remains the same as observed just prior to first loading source catalog table metadata. would that be worthwhile here too? (if so, let's see whether any opportunity for reuse, by defining a common base class for both of these iceberg commit steps) ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + List<PartitionData> partitionDataList = Collections.nCopies(5, partitionData); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + + catalog.dropTable(testTableId); + } + + @Test + public void testReplacePartitions() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, 1L); + List<PartitionData> partitionDataList = Arrays.asList(partitionData, partitionData2); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + + List<String> paths2 = Arrays.asList( + "/path/tableName/data/id=2/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData3 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData3.set(0, 2L); + PartitionData partitionData4 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData4.set(0, 2L); + List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, partitionData4); + + List<DataFile> dataFiles = getDataFiles(paths2, partitionDataList2); + // here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table + icebergTable.replacePartitions(dataFiles); + List<String> expectedPaths = new ArrayList<>(paths); + expectedPaths.addAll(paths2); + verifyAnyOrder(expectedPaths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); Review Comment: technically we might also call `icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths()` prior to `.replacePartitions` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + List<PartitionData> partitionDataList = Collections.nCopies(5, partitionData); Review Comment: is 5 == `paths.size()`? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + List<PartitionData> partitionDataList = Collections.nCopies(5, partitionData); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + + catalog.dropTable(testTableId); + } + + @Test + public void testReplacePartitions() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, 1L); + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, 1L); + List<PartitionData> partitionDataList = Arrays.asList(partitionData, partitionData2); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + + List<String> paths2 = Arrays.asList( + "/path/tableName/data/id=2/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData3 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData3.set(0, 2L); + PartitionData partitionData4 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData4.set(0, 2L); + List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, partitionData4); + + List<DataFile> dataFiles = getDataFiles(paths2, partitionDataList2); + // here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table + icebergTable.replacePartitions(dataFiles); + List<String> expectedPaths = new ArrayList<>(paths); + expectedPaths.addAll(paths2); + verifyAnyOrder(expectedPaths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths3 = Arrays.asList( + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file6.orc" + ); + // Reusing same partition dats to create data file with different paths + List<DataFile> dataFiles2 = getDataFiles(paths3, partitionDataList); Review Comment: nit: it's confusing to have `paths3` associated w/ `dataFiles2` - please keep them aligned (even if you skip a number) that said, the name would be even better if it reflected the commonality of the partition IDs. e.g.: ``` List<DataFile> dataFiles3_pt1 = getDataFiles(paths3_pt1, partitionDataList1); ... // below... List<String> nowExpectedPaths = new ArrayList<>(paths2_pt2); nowExpectedPaths.addAll(paths3_pt1); ``` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +338,120 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" Review Comment: the `/id=1/` naming convention isn't necessary, is it? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergReplacePartitionsStep.java: ########## Review Comment: wow, so much code that's nearly... but not quite exactly... a copy of `IcebergRegisterStep` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); Review Comment: let's put this and the `ICEBERG_PARTITION_NAME_KEY` constant into the companion finder ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java: ########## @@ -170,7 +170,7 @@ protected static boolean getConfigShouldCopyMetadataPath(Properties properties) } /** @return property value or `null` */ - protected static String getLocationQualifiedProperty(Properties properties, CatalogLocation location, String relativePropName) { + public static String getLocationQualifiedProperty(Properties properties, CatalogLocation location, String relativePropName) { Review Comment: based on my suggestions to rework the filter predicate ctor params, I'm curious whether this can be accessed entirely within `IcebergPartitionDatasetFinder` and therefore could remain `protected` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + this.partitionFilterPredicate = IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName, + srcTableMetadata, properties); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } Review Comment: the base class uses `Map<Path, FileStatus>`. did that seem inefficient or was there another reason to deviate? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + this.partitionFilterPredicate = IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName, + srcTableMetadata, properties); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) { + Path destPath = filePathsWithStatus.getDestPath(); + FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); Review Comment: you skip first doing this, like in `IcebergDataset`: ``` // preserving ancestor permissions till root path's child between src and dest List<OwnerAndPermission> ancestorOwnerAndPermissionList = CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(actualSourceFs, srcPath.getParent(), greatestAncestorPath, copyConfig); ``` is that intentional? do you feel it's not necessary or actually contra-indicated? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + this.partitionFilterPredicate = IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName, + srcTableMetadata, properties); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { Review Comment: this impl is really, really similar to the one it's based on in its base class. deriving from a class and then overriding methods w/ only small changes is pretty nearly cut-and-paste code. sometimes it's inevitable, but let's avoid when we can. in this case, could we NOT override this method, but only `GetFilePathsToFileStatusResult getFilePathsToFileStatus(...)` so it handles this new code instead: ``` IcebergTable srcIcebergTable = getSrcIcebergTable(); List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); Configuration defaultHadoopConfiguration = new Configuration(); for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) { ... ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + this.partitionFilterPredicate = IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName, + srcTableMetadata, properties); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) { + Path destPath = filePathsWithStatus.getDestPath(); + FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); + + fileEntity.setSourceData(getSourceDataset(this.sourceFs)); + fileEntity.setDestinationData(getDestinationDataset(targetFs)); + copyEntities.add(fileEntity); + } + + // Adding this check to avoid adding post publish step when there are no files to copy. + if (CollectionUtils.isNotEmpty(destDataFiles)) { + copyEntities.add(createPostPublishStep(destDataFiles)); + } + + log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size()); + return copyEntities; + } + + private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws IcebergTable.TableNotFoundException { + List<DataFile> destDataFiles = new ArrayList<>(); + if (srcDataFiles.isEmpty()) { + return destDataFiles; + } + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + TableMetadata destTableMetadata = getDestIcebergTable().accessTableMetadata(); + PartitionSpec partitionSpec = destTableMetadata.spec(); + String srcWriteDataLocation = srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + String destWriteDataLocation = destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + if (StringUtils.isEmpty(srcWriteDataLocation) || StringUtils.isEmpty(destWriteDataLocation)) { + log.warn( + String.format("Either source or destination table does not have write data location : source table write data location : {%s} , destination table write data location : {%s}", + srcWriteDataLocation, + destWriteDataLocation + ) + ); + } + // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns null if the property is not set and + // doesn't respect passed default value, so to avoid NPE in .replace() we are setting it to empty string. + String prefixToBeReplaced = (srcWriteDataLocation != null) ? srcWriteDataLocation : ""; + String prefixToReplaceWith = (destWriteDataLocation != null) ? destWriteDataLocation : ""; + srcDataFiles.forEach(dataFile -> { + String curDestFilePath = dataFile.path().toString(); + String newDestFilePath = curDestFilePath.replace(prefixToBeReplaced, prefixToReplaceWith); + String updatedDestFilePath = addUUIDToPath(newDestFilePath); Review Comment: let's simplify by abstracting everything into a method from `Path -> Path` or `Path -> String`, perhaps named `relocateDestPath` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateFactory; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + + private static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + private final Predicate<StructLike> partitionFilterPredicate; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + + String partitionColumnName = + IcebergDatasetFinder.getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + this.partitionFilterPredicate = IcebergPartitionFilterPredicateFactory.getFilterPredicate(partitionColumnName, + srcTableMetadata, properties); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) { + Path destPath = filePathsWithStatus.getDestPath(); + FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); + + fileEntity.setSourceData(getSourceDataset(this.sourceFs)); + fileEntity.setDestinationData(getDestinationDataset(targetFs)); + copyEntities.add(fileEntity); + } + + // Adding this check to avoid adding post publish step when there are no files to copy. + if (CollectionUtils.isNotEmpty(destDataFiles)) { + copyEntities.add(createPostPublishStep(destDataFiles)); + } + + log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size()); + return copyEntities; + } + + private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws IcebergTable.TableNotFoundException { + List<DataFile> destDataFiles = new ArrayList<>(); + if (srcDataFiles.isEmpty()) { + return destDataFiles; + } + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + TableMetadata destTableMetadata = getDestIcebergTable().accessTableMetadata(); + PartitionSpec partitionSpec = destTableMetadata.spec(); + String srcWriteDataLocation = srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + String destWriteDataLocation = destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + if (StringUtils.isEmpty(srcWriteDataLocation) || StringUtils.isEmpty(destWriteDataLocation)) { + log.warn( + String.format("Either source or destination table does not have write data location : source table write data location : {%s} , destination table write data location : {%s}", + srcWriteDataLocation, + destWriteDataLocation + ) + ); Review Comment: let's avoid starting a bunch of work only to discover we can't continue because certain props weren't set. can we validate these in the ctor at the beginning (or even possibly in the caller) and fail-fast when missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org