Blazer-007 commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1778490720
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder; + +/** + * Predicate implementation for filtering Iceberg partitions based on datetime values. + * <p> + * This class filters partitions by checking if the partition datetime falls within a specified range. + * </p> + * <ul> + * <li>The datetime partition column is expected to be a string column.</li> + * <li>The datetime partition column values are expected to be in the format specified by the pattern in the properties.</li> + * <li>The start and end dates are also specified in the properties.</li> + * </ul> + */ +public class IcebergDateTimePartitionFilterPredicate implements Predicate<StructLike> { + + private static final List<String> supportedTransforms = ImmutableList.of("identity"); + private static final String DATETIME_PARTITION_KEY = "partition.datetime"; + private static final String DATETIME_PARTITION_PATTERN_KEY = DATETIME_PARTITION_KEY + ".pattern"; + private static final String DATETIME_PARTITION_STARTDATE_KEY = DATETIME_PARTITION_KEY + ".startdate"; + private static final String DATETIME_PARTITION_ENDDATE_KEY = DATETIME_PARTITION_KEY + ".enddate"; + private final int partitionColumnIndex; + private final DateTimeFormatter dateTimeFormatter; + private final DateTime startDate; + private final DateTime endDate; + + /** + * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnName the name of the partition column + * @param tableMetadata the metadata of the Iceberg table + * @param properties the properties containing partition configuration + * @throws IllegalArgumentException if the partition column is not found or required properties are missing + */ + public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, TableMetadata tableMetadata, + Properties properties) { + + this.partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName, + tableMetadata, supportedTransforms);; + Preconditions.checkArgument(this.partitionColumnIndex != -1, + String.format("Partition column %s not found", partitionColumnName)); + + String partitionPattern = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_PATTERN_KEY); + + String startDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_STARTDATE_KEY); + + String endDateVal = IcebergDatasetFinder.getLocationQualifiedProperty(properties, + IcebergDatasetFinder.CatalogLocation.SOURCE, + DATETIME_PARTITION_ENDDATE_KEY); + + Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), "DateTime Partition pattern cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), "DateTime Partition start date cannot be empty"); + Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime Partition end date cannot be empty"); + + this.dateTimeFormatter = DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC); + this.startDate = this.dateTimeFormatter.parseDateTime(startDateVal); + this.endDate = this.dateTimeFormatter.parseDateTime(endDateVal); + } + + /** + * Check if the partition datetime falls within the specified range. + * + * @param partition the datetime partition to check + * @return {@code true} if the datetime partition value is within the range, otherwise {@code false} + */ + @Override + public boolean test(StructLike partition) { + // Just a cautious check to avoid NPE, ideally partition shouldn't be null if table is partitioned + if (Objects.isNull(partition)) { + return false; + } + + String partitionVal = partition.get(this.partitionColumnIndex, String.class); Review Comment: have removed this filter predicate for now, will add it later as with overwrite api it is not easy to create a expression filter from string specially for ranges -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org