This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b250d4ec1 [GOBBLIN-2147] Added lookback time fetch in partitioned 
filesource (#4044)
8b250d4ec1 is described below

commit 8b250d4ec1d1df554c62da9e15b4d0e7e29cbd27
Author: Vivek Rai <[email protected]>
AuthorDate: Fri Mar 7 10:49:35 2025 +0530

    [GOBBLIN-2147] Added lookback time fetch in partitioned filesource (#4044)
    
    Add lookback time fetch in partitioned filesource
    In case no lookback time is passed, default it to min_watermark
---
 .../gobblin/configuration/ConfigurationKeys.java   |  5 ++
 .../source/DatePartitionedNestedRetriever.java     | 17 ++++-
 .../source/PartitionAwareFileRetrieverUtils.java   | 31 +++++++++
 .../gobblin/source/PartitionedFileSourceBase.java  | 73 ++++++++++++++++------
 .../PartitionAwareFileRetrieverUtilsTest.java      | 63 +++++++++++++++++++
 5 files changed, 167 insertions(+), 22 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b6569af2d2..d9243f6e0d 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -350,6 +350,11 @@ public class ConfigurationKeys {
    */
   public static final String WATERMARK_INTERVAL_VALUE_KEY = 
"watermark.interval.value";
 
+  /**
+   * DEFAULT LOOKBACK TIME KEY property
+   */
+  public static final String DEFAULT_COPY_LOOKBACK_TIME_KEY = 
"copy.lookbackTime";
+
   /**
    * Extract related configuration properties.
    */
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
index 0ed42da8b7..fd652926f1 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
 import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
 import org.apache.gobblin.util.DatePartitionType;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
 
 import static 
org.apache.gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN;
 
@@ -54,9 +55,9 @@ import static 
org.apache.gobblin.source.PartitionedFileSourceBase.DATE_PARTITION
  *
  * For example, if {@link ConfigurationKeys#SOURCE_FILEBASED_DATA_DIRECTORY} 
is set to /my/data/, then the class assumes
  * folders following the pattern /my/data/daily/[year]/[month]/[day] are 
present. It will iterate through all the data
- * under these folders starting from the date specified by {@link 
#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} until
- * either {@link #DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB} files have been 
processed, or until there is no more data
- * to process. For example, if {@link 
#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} is set to 2015/01/01, then the job
+ * under these folders starting from the date specified by {@link 
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} until
+ * either {@link 
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB} files have 
been processed, or until there is no more data
+ * to process. For example, if {@link 
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} is set 
to 2015/01/01, then the job
  * will read from the folder /my/data/daily/2015/01/01/, 
/my/data/daily/2015/01/02/, /my/data/2015/01/03/ etc.
  *
  */
@@ -114,6 +115,10 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
       throw new IOException("Error initializing FileSystem", e);
     }
 
+    GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
+    Long iteration = 0L;
+    LOG.info("~{}~ Starting collecting files to process from {} to {}", 
sourceDir, lowWaterMarkDate, currentDay);
+
     for (DateTime date = lowWaterMarkDate; !date.isAfter(currentDay) && 
filesToProcess.size() < maxFilesToReturn;
         date = date.withFieldAdded(incrementalUnit, 1)) {
 
@@ -129,8 +134,14 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
               new FileInfo(fileStatus.getPath().toString(), 
fileStatus.getLen(), date.getMillis(), partitionPath));
         }
       }
+
+      if (growthTracker.isAnotherMilestone(iteration++)) {
+        LOG.info("~{}~ collected {} files to process; most-recent source path: 
~{}~", sourceDir, filesToProcess.size(), sourcePath);
+      }
     }
 
+    LOG.info("~{}~ Finished collecting {} files to process", sourceDir, 
filesToProcess.size());
+
     return filesToProcess;
   }
 
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
index 4bab517dec..9e809e4d0c 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
@@ -16,7 +16,13 @@
  */
 package org.apache.gobblin.source;
 
+import java.io.IOException;
+
+import org.joda.time.DateTimeFieldType;
 import org.joda.time.Duration;
+import org.joda.time.chrono.ISOChronology;
+
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.DatePartitionType;
@@ -31,6 +37,7 @@ import static 
org.apache.gobblin.source.PartitionedFileSourceBase.DEFAULT_PARTIT
  * Utility functions for parsing configuration parameters commonly used by 
{@link PartitionAwareFileRetriever}
  * objects.
  */
+@Slf4j
 public class PartitionAwareFileRetrieverUtils {
   /**
    * Retrieve the lead time duration from the LEAD_TIME and LEAD_TIME 
granularity config settings.
@@ -52,4 +59,28 @@ public class PartitionAwareFileRetrieverUtils {
 
     return new Duration(leadTime * leadTimeGranularity.getUnitMilliseconds());
   }
+
+  /**
+   * Calculates the lookback time duration based on the provided lookback time 
string.
+   *
+   * @param lookBackTime the lookback time string, which should include a 
numeric value followed by a time unit character.
+   *                     For example, "5d" for 5 days or "10h" for 10 hours. 
See {@link DatePartitionType#lookupByPattern}
+   * @return an {@link Duration} of lookBackTime if the lookback time is valid
+   * @throws IOException if the lookback time is invalid
+   */
+  public static Duration getLookbackTimeDuration(String lookBackTime) throws 
IOException {
+    DateTimeFieldType lookBackTimeGranularity = 
DatePartitionType.getLowestIntervalUnit(lookBackTime);
+    if (lookBackTimeGranularity != null) {
+      long lookBackTimeGranularityInMillis =
+          
lookBackTimeGranularity.getDurationType().getField(ISOChronology.getInstance()).getUnitMillis();
+      try {
+        long lookBack = Long.parseLong(lookBackTime.substring(0, 
lookBackTime.length() - 1));
+        return new Duration(lookBack * lookBackTimeGranularityInMillis);
+      } catch(NumberFormatException ex) {
+        throw new IOException("Invalid lookback time: " + lookBackTime, ex);
+      }
+    } else {
+      throw new IOException("There is no valid time granularity in lookback 
time: " + lookBackTime);
+    }
+  }
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index ece7db0c0e..f9f476c2ce 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -17,14 +17,25 @@
 
 package org.apache.gobblin.source;
 
-import com.google.common.base.Throwables;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Throwables;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
@@ -43,13 +54,6 @@ import 
org.apache.gobblin.source.workunit.MultiWorkUnitWeightedQueue;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.DatePartitionType;
 import org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.Duration;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -119,6 +123,9 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
   private static final String DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB =
       DATE_PARTITIONED_SOURCE_PREFIX + ".max.workunits.per.job";
 
+  private static final String DATE_PARTITIONED_SOURCE_LOOKBACK_TIME =
+      DATE_PARTITIONED_SOURCE_PREFIX + ".lookback.time";
+
   // Default configuration parameter values
 
   /**
@@ -137,12 +144,11 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
   */
   private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE 
= 0;
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionedFileSourceBase.class);
-
   // Instance variables
   private SourceState sourceState;
   private FileSystem fs;
   private long lowWaterMark;
+  private String lookBackTime;
   private int maxFilesPerJob;
   private int maxWorkUnitsPerJob;
   private int fileCount;
@@ -171,6 +177,8 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
 
     this.sourceState = state;
 
+    this.lookBackTime = getLookBackTimeProp(state);
+
     this.lowWaterMark =
         getLowWaterMark(state.getPreviousWorkUnitStates(), 
state.getProp(DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE,
             
String.valueOf(DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE)));
@@ -207,9 +215,9 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
     // Initialize all instance variables for this object
     init(state);
 
-    LOG.info("Will pull data from " + formatter.print(this.lowWaterMark) + " 
until " + this.maxFilesPerJob
+    log.info("Will pull data from " + formatter.print(this.lowWaterMark) + " 
until " + this.maxFilesPerJob
         + " files have been processed, or until there is no more data to 
consume");
-    LOG.info("Creating workunits");
+    log.info("Creating workunits");
 
     // Weighted MultiWorkUnitWeightedQueue, the job will add new WorkUnits to 
the queue along with a weight for each
     // WorkUnit. The queue will take care of balancing the WorkUnits amongst a 
set number of MultiWorkUnits
@@ -220,7 +228,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
 
     // If the file count has not exceeded maxFilesPerJob then start adding new 
WorkUnits to for this job
     if (this.fileCount >= this.maxFilesPerJob) {
-      LOG.info(
+      log.info(
           "The number of work units from previous job has already reached the 
upper limit, no more workunits will be made");
       return multiWorkUnitWeightedQueue.getQueueAsList();
     }
@@ -268,7 +276,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
         Throwables.propagate(e);
       }
 
-      LOG.info(
+      log.info(
           "Will process file from previous workunit: " + 
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
 
       this.fileCount++;
@@ -286,7 +294,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
 
       extract = new Extract(this.tableType, namespace, topicName);
 
-      LOG.info("Created extract: " + extract.getExtractId() + " for path " + 
topicName);
+      log.info("Created extract: " + extract.getExtractId() + " for path " + 
topicName);
       extractMap.put(file.getWatermarkMsSinceEpoch(), extract);
     }
 
@@ -310,7 +318,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
       for (PartitionAwareFileRetriever.FileInfo file : filesToPull) {
         Extract extract = getExtractForFile(file, topicName, namespace, 
extractMap);
 
-        LOG.info("Will process file " + file.getFilePath());
+        log.info("Will process file " + file.getFilePath());
 
         WorkUnit singleWorkUnit = WorkUnit.create(extract);
         singleWorkUnit.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName);
@@ -329,7 +337,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
         this.fileCount++;
       }
 
-      LOG.info("Total number of files extracted for the current run: " + 
filesToPull.size());
+      log.info("Total number of files extracted for the current run: " + 
filesToPull.size());
     } catch (IOException e) {
       Throwables.propagate(e);
     }
@@ -347,11 +355,20 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
   }
 
   /**
-   * Gets the LWM for this job runs. The new LWM is the HWM of the previous 
run + 1 unit (day,hour,minute..etc).
+   * Gets the LWM for this job runs.
+   * If the lookback property {@link 
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_LOOKBACK_TIME)} or
+   * {@link ConfigurationKeys#DEFAULT_COPY_LOOKBACK_TIME_KEY} is provided
+   * then the LWM is set to the current time minus the lookback time.
+   * Otherwise, The new LWM is the HWM of the previous run + 1 unit 
(day,hour,minute.etc.).
    * If there was no previous execution then it is set to the given 
lowWaterMark + 1 unit.
    */
   private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String 
lowWaterMark) {
 
+    // If the lookback time is set get LWM from the lookback time
+    if (StringUtils.isNotEmpty(this.lookBackTime)) {
+      return getLowWaterMarkFromLookbackTime(this.lookBackTime);
+    }
+
     long lowWaterMarkValue = retriever.getWatermarkFromString(lowWaterMark);
 
     // Find the max HWM from the previous states, this is the new current LWM
@@ -367,7 +384,25 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
     return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs();
   }
 
+  /** Returns the low watermark value based on lookback which is equal to 
current time minus lookback time. */
+  private long getLowWaterMarkFromLookbackTime(String lookBackTime) {
+    try {
+      Duration lookBackDuration = 
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+      return new DateTime().minus(lookBackDuration).getMillis();
+    } catch (IOException e) {
+      log.error("Failed to parse lookback time: {} , Returning 0 as low 
watermark value, {}", lookBackTime, e);
+    }
+    return 0;
+  }
+
   protected PartitionAwareFileRetriever getRetriever() {
     return retriever;
   }
+
+  private static String getLookBackTimeProp(SourceState state) {
+    if (state.contains(DATE_PARTITIONED_SOURCE_LOOKBACK_TIME)) {
+      return state.getProp(DATE_PARTITIONED_SOURCE_LOOKBACK_TIME);
+    }
+    return state.getProp(ConfigurationKeys.DEFAULT_COPY_LOOKBACK_TIME_KEY, "");
+  }
 }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtilsTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtilsTest.java
new file mode 100644
index 0000000000..c11c3a3167
--- /dev/null
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtilsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source;
+
+import java.io.IOException;
+
+import org.joda.time.Duration;
+import org.joda.time.chrono.ISOChronology;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/** Tests for {@link PartitionAwareFileRetrieverUtils}*/
+public class PartitionAwareFileRetrieverUtilsTest {
+
+  @Test(dataProvider = "validLookbackTimes")
+  public void testValidLookbackTime(String lookBackTime, long expectedMillis) 
throws IOException {
+    Duration expectedDuration = new Duration(expectedMillis);
+    Duration actualDuration = 
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+    Assert.assertEquals(expectedDuration, actualDuration);
+  }
+
+  @Test(dataProvider = "invalidLookbackTimes", expectedExceptions = 
IOException.class)
+  public void testInvalidLookbackTime(String lookBackTime) throws IOException {
+    PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+  }
+
+  @DataProvider(name = "validLookbackTimes")
+  public Object[][] validLookbackTimes() {
+    return new Object[][] {
+        {"5d", 5 * ISOChronology.getInstance().days().getUnitMillis()},
+        {"10h", 10 * ISOChronology.getInstance().hours().getUnitMillis()},
+        {"30m", 30 * ISOChronology.getInstance().minutes().getUnitMillis()}
+    };
+  }
+
+  @DataProvider(name = "invalidLookbackTimes")
+  public String[][] invalidLookbackTimes() {
+    return new String[][] {
+        {"5x"}, // Invalid format
+        {"30z"}, // Invalid format
+        {"xd"}, // Invalid number
+        {"yh"}, // Invalid number
+        {"zm"}  // Invalid number
+    };
+  }
+
+}

Reply via email to