[ https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939754 ]
ASF GitHub Bot logged work on GOBBLIN-2159: ------------------------------------------- Author: ASF GitHub Bot Created on: 23/Oct/24 17:07 Start Date: 23/Oct/24 17:07 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1813140460 ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java: ########## @@ -44,10 +45,12 @@ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> compan @Override public IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException { TableIdentifier tableId = TableIdentifier.of(dbName, tableName); - return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), - createTableOperations(tableId), - this.getCatalogUri(), - loadTableInstance(tableId)); + try { + return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), + createTableOperations(tableId), this.getCatalogUri(), loadTableInstance(tableId)); + } catch (NoSuchTableException ex) { + throw new IcebergTable.TableNotFoundException(tableId); + } Review Comment: recommend a comment like: ``` /// defend against `org.apache.iceberg.catalog.Catalog::loadTable` throwing inside some `@Override` of `loadTableInstance` ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -237,31 +238,35 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst * @throws RuntimeException if error occurred while reading the manifest file */ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate) - throws TableNotFoundException { + throws IOException { TableMetadata tableMetadata = accessTableMetadata(); Snapshot currentSnapshot = tableMetadata.currentSnapshot(); long currentSnapshotId = currentSnapshot.snapshotId(); List<DataFile> knownDataFiles = new ArrayList<>(); - log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", tableId, currentSnapshotId, - knownDataFiles.size()); + GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker(); //TODO: Add support for deleteManifests as well later // Currently supporting dataManifests only List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); for (ManifestFile manifestFile : dataManifestFiles) { + if (growthMilestoneTracker.isAnotherMilestone(knownDataFiles.size())) { + log.info("~{}~ for snapshot '{}' - before manifest-file '{}' '{}' total known iceberg datafiles", tableId, + currentSnapshotId, + manifestFile.path(), + knownDataFiles.size() + ); + } Review Comment: I agree this makes more sense here, given the synchronous reading of every manifest files happens within this method, rather than in the style of the `Iterator<IcebergSnapshotInfo>` returned by `IcebergTable::getIncrementalSnapshotInfosIterator`. that said, I doubt we should still log tracked growth as this very same list is later transformed in `IcebergPartitionDataset::calcDestDataFileBySrcPath`. all the network calls are in this method, rather than over there, so the in-process transformation into CopyEntities should be quite fast. maybe just log once at the end of `calcDestDataFileBySrcPath` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -120,8 +121,9 @@ public void testGetCurrentSnapshotInfo() throws IOException { @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class}) public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException { TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS"); - // Passing null for Table as catalog.loadTable(bogusTableId) will throw NoSuchTableException so - // IcebergTable instance couldn't be created and the goal of this test will not be achieved + // Passing null for Table as catalog.loadTable(bogusTableId) will throw + // org.apache.iceberg.exceptions.NoSuchTableException, Review Comment: this names the wrong exception (it should be the one in `expectedExceptions`)... but anyway `loadTable` shouldn't even enter into the call stack ever. isn't `IcebergTable::accessTableMetadata` doing the throwing? hence, the `null` is not involved, but rather the exception arises from `catalog.newTableOps(bogusTableId)` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java: ########## @@ -224,23 +230,23 @@ private static void setupDestFileSystem() throws IOException { Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); } - private static List<DataFile> createDataFileMocks() throws IOException { - List<DataFile> dataFiles = new ArrayList<>(); + private static Map<String, DataFile> createDataFileMocksBySrcPath(List<String> srcFilePaths) throws IOException { Review Comment: I really like how returning this `Map` allows you to be so succinct at every point of use: ``` Map<String, DataFile> mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths); Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())). thenReturn(new ArrayList<>(mockDataFilesBySrcPath.values())); ... // (above just a `.values()` and simply a `.keySet()` below) verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), false); ``` nice work! ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.util.function.CheckedExceptionFunction; +import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + // Currently hardcoded these transforms here but eventually it will depend on filter predicate implementation and can + // be moved to a common place or inside each filter predicate. + private static final List<String> supportedTransforms = ImmutableList.of("identity", "truncate"); + private final Predicate<StructLike> partitionFilterPredicate; + private final String partitionColumnName; + private final String partitionColValue; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath, String partitionColumnName, String partitionColValue) + throws IOException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + this.partitionColumnName = partitionColumnName; + this.partitionColValue = partitionColValue; + this.partitionFilterPredicate = createPartitionFilterPredicate(); + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code duplication + // Differences are getting data files, copying ancestor permission and adding post publish steps + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + Map<Path, DataFile> destDataFileBySrcPath = calcDestDataFileBySrcPath(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (Map.Entry<Path, FileStatus> entry : calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) { + Path destPath = entry.getKey(); + FileStatus srcFileStatus = entry.getValue(); + // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); + + fileEntity.setSourceData(getSourceDataset(this.sourceFs)); + fileEntity.setDestinationData(getDestinationDataset(targetFs)); + copyEntities.add(fileEntity); + } + + // Adding this check to avoid adding post publish step when there are no files to copy. + List<DataFile> destDataFiles = new ArrayList<>(destDataFileBySrcPath.values()); + if (CollectionUtils.isNotEmpty(destDataFiles)) { + copyEntities.add(createOverwritePostPublishStep(destDataFiles)); + } + + log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size()); Review Comment: the two dashes between `copy--entities` seems like a typo Issue Time Tracking ------------------- Worklog Id: (was: 939754) Time Spent: 11h 50m (was: 11h 40m) > Support Partition Based Copy in Iceberg Distcp > ---------------------------------------------- > > Key: GOBBLIN-2159 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2159 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vivek Rai > Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)