Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1774540100


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergDateTimePartitionFilterPredicate.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetFinder;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on datetime 
values.
+ * <p>
+ *   This class filters partitions by checking if the partition datetime falls 
within a specified range.
+ * </p>
+ * <ul>
+ *   <li>The datetime partition column is expected to be a string column.</li>
+ *   <li>The datetime partition column values are expected to be in the format 
specified by the pattern in the properties.</li>
+ *   <li>The start and end dates are also specified in the properties.</li>
+ * </ul>
+ */
+public class IcebergDateTimePartitionFilterPredicate implements 
Predicate<StructLike> {
+
+  private static final List<String> supportedTransforms = 
ImmutableList.of("identity");
+  private static final String DATETIME_PARTITION_KEY = "partition.datetime";
+  private static final String DATETIME_PARTITION_PATTERN_KEY = 
DATETIME_PARTITION_KEY + ".pattern";
+  private static final String DATETIME_PARTITION_STARTDATE_KEY = 
DATETIME_PARTITION_KEY + ".startdate";
+  private static final String DATETIME_PARTITION_ENDDATE_KEY = 
DATETIME_PARTITION_KEY + ".enddate";
+  private final int partitionColumnIndex;
+  private final DateTimeFormatter dateTimeFormatter;
+  private final DateTime startDate;
+  private final DateTime endDate;
+
+  /**
+   * Constructs an {@code IcebergDateTimePartitionFilterPredicate} with the 
specified parameters.
+   *
+   * @param partitionColumnName the name of the partition column
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param properties the properties containing partition configuration
+   * @throws IllegalArgumentException if the partition column is not found or 
required properties are missing
+   */
+  public IcebergDateTimePartitionFilterPredicate(String partitionColumnName, 
TableMetadata tableMetadata,
+      Properties properties) {
+
+    this.partitionColumnIndex = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(partitionColumnName,
+        tableMetadata, supportedTransforms);;
+    Preconditions.checkArgument(this.partitionColumnIndex != -1,
+        String.format("Partition column %s not found", partitionColumnName));
+
+    String partitionPattern = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_PATTERN_KEY);
+
+    String startDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_STARTDATE_KEY);
+
+    String endDateVal = 
IcebergDatasetFinder.getLocationQualifiedProperty(properties,
+        IcebergDatasetFinder.CatalogLocation.SOURCE,
+        DATETIME_PARTITION_ENDDATE_KEY);
+
+    Preconditions.checkArgument(StringUtils.isNotBlank(partitionPattern), 
"DateTime Partition pattern cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(startDateVal), 
"DateTime Partition start date cannot be empty");
+    Preconditions.checkArgument(StringUtils.isNotBlank(endDateVal), "DateTime 
Partition end date cannot be empty");
+
+    this.dateTimeFormatter = 
DateTimeFormat.forPattern(partitionPattern).withZone(DateTimeZone.UTC);
+    this.startDate = this.dateTimeFormatter.parseDateTime(startDateVal);
+    this.endDate = this.dateTimeFormatter.parseDateTime(endDateVal);
+  }
+
+  /**
+   * Check if the partition datetime falls within the specified range.
+   *
+   * @param partition the datetime partition to check
+   * @return {@code true} if the datetime partition value is within the range, 
otherwise {@code false}
+   */
+  @Override
+  public boolean test(StructLike partition) {
+    // Just a cautious check to avoid NPE, ideally partition shouldn't be null 
if table is partitioned
+    if (Objects.isNull(partition)) {
+      return false;
+    }
+
+    String partitionVal = partition.get(this.partitionColumnIndex, 
String.class);

Review Comment:
   Yes, 
   <li>The datetime partition column is expected to be a string column.</li>
   
   This filter predicate i have written to provide with earlier datasets which 
still uses datepartition as partitioncol & as string column, which in our case 
are most of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to