[ https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939317&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939317 ]
ASF GitHub Bot logged work on GOBBLIN-2159: ------------------------------------------- Author: ASF GitHub Bot Created on: 21/Oct/24 21:51 Start Date: 21/Oct/24 21:51 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1809244154 ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + // Currently hardcoded these transforms here but eventually it will depend on filter predicate implementation and can + // be moved to a common place or inside each filter predicate. + private static final List<String> supportedTransforms = ImmutableList.of("identity", "truncate"); + private final Predicate<StructLike> partitionFilterPredicate; + private final String partitionColumnName; + private final String partitionColValue; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath, String partitionColumnName, String partitionColValue) + throws IOException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + this.partitionColumnName = partitionColumnName; + this.partitionColValue = partitionColValue; + this.partitionFilterPredicate = createPartitionFilterPredicate(); + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code duplication + // Differences are getting data files, copying ancestor permission and adding post publish steps + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (Map.Entry<Path, FileStatus> entry : getDestFilePathWithSrcFileStatus(srcDataFiles, destDataFiles, this.sourceFs).entrySet()) { + Path destPath = entry.getKey(); + FileStatus srcFileStatus = entry.getValue(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); Review Comment: this seemed potentially wasteful to perform in the inner loop, as the FS shouldn't change. I checked what (the original) `IcebergDataset` was doing and found this comment: ``` // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); ``` so if you can move this outside the loop, let's do it. if not now either add the same TODO here or else explain why it's not possible/desireable ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + // Currently hardcoded these transforms here but eventually it will depend on filter predicate implementation and can + // be moved to a common place or inside each filter predicate. + private static final List<String> supportedTransforms = ImmutableList.of("identity", "truncate"); + private final Predicate<StructLike> partitionFilterPredicate; + private final String partitionColumnName; + private final String partitionColValue; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath, String partitionColumnName, String partitionColValue) + throws IOException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + this.partitionColumnName = partitionColumnName; + this.partitionColValue = partitionColValue; + this.partitionFilterPredicate = createPartitionFilterPredicate(); + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code duplication + // Differences are getting data files, copying ancestor permission and adding post publish steps + String fileSet = this.getFileSetId(); + List<CopyEntity> copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (Map.Entry<Path, FileStatus> entry : getDestFilePathWithSrcFileStatus(srcDataFiles, destDataFiles, this.sourceFs).entrySet()) { + Path destPath = entry.getKey(); + FileStatus srcFileStatus = entry.getValue(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); Review Comment: this seemed potentially wasteful to perform in the inner loop, as the FS shouldn't change. I checked what (the original) `IcebergDataset` was doing and found this comment: ``` // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); ``` so if you can move this outside the loop, let's do it. if not now either add the same TODO here or else explain why it's not possible/desirable Issue Time Tracking ------------------- Worklog Id: (was: 939317) Time Spent: 6h 40m (was: 6.5h) > Support Partition Based Copy in Iceberg Distcp > ---------------------------------------------- > > Key: GOBBLIN-2159 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2159 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vivek Rai > Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)