[ https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=938563&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-938563 ]
ASF GitHub Bot logged work on GOBBLIN-2159: ------------------------------------------- Author: ASF GitHub Bot Created on: 17/Oct/24 04:51 Start Date: 17/Oct/24 04:51 Worklog Time Spent: 10m Work Description: Blazer-007 commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1804101303 ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + * <p> + * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + * </p> + */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + private static final List<String> supportedTransforms = ImmutableList.of("identity", "truncate"); + private final Predicate<StructLike> partitionFilterPredicate; + private final Map<Path, Path> srcPathToDestPath; + private final String partitionColumnName; + private final String partitionColValue; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath, String partitionColumnName, String partitionColValue) + throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + this.partitionColumnName = partitionColumnName; + this.partitionColValue = partitionColValue; + this.partitionFilterPredicate = createPartitionFilterPredicate(); + this.srcPathToDestPath = new HashMap<>(); + } + + private Predicate<StructLike> createPartitionFilterPredicate() throws IcebergTable.TableNotFoundException { + //TODO: Refactor it later using factory or other way to support different types of filter predicate + // Also take into consideration creation of Expression Filter to be used in overwrite api + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + int partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex( + this.partitionColumnName, + srcTableMetadata, + supportedTransforms + ); + Preconditions.checkArgument(partitionColumnIndex >= 0, String.format( + "Partition column %s not found in table %s", + this.partitionColumnName, this.getFileSetId())); + return new IcebergMatchesAnyPropNamePartitionFilterPredicate(partitionColumnIndex, this.partitionColValue); + } Review Comment: replied in [comment here](https://github.com/apache/gobblin/pull/4058#discussion_r1804099396) Issue Time Tracking ------------------- Worklog Id: (was: 938563) Time Spent: 5h 10m (was: 5h) > Support Partition Based Copy in Iceberg Distcp > ---------------------------------------------- > > Key: GOBBLIN-2159 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2159 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vivek Rai > Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)