This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8b250d4ec1 [GOBBLIN-2147] Added lookback time fetch in partitioned
filesource (#4044)
8b250d4ec1 is described below
commit 8b250d4ec1d1df554c62da9e15b4d0e7e29cbd27
Author: Vivek Rai <[email protected]>
AuthorDate: Fri Mar 7 10:49:35 2025 +0530
[GOBBLIN-2147] Added lookback time fetch in partitioned filesource (#4044)
Add lookback time fetch in partitioned filesource
In case no lookback time is passed, default it to min_watermark
---
.../gobblin/configuration/ConfigurationKeys.java | 5 ++
.../source/DatePartitionedNestedRetriever.java | 17 ++++-
.../source/PartitionAwareFileRetrieverUtils.java | 31 +++++++++
.../gobblin/source/PartitionedFileSourceBase.java | 73 ++++++++++++++++------
.../PartitionAwareFileRetrieverUtilsTest.java | 63 +++++++++++++++++++
5 files changed, 167 insertions(+), 22 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b6569af2d2..d9243f6e0d 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -350,6 +350,11 @@ public class ConfigurationKeys {
*/
public static final String WATERMARK_INTERVAL_VALUE_KEY =
"watermark.interval.value";
+ /**
+ * DEFAULT LOOKBACK TIME KEY property
+ */
+ public static final String DEFAULT_COPY_LOOKBACK_TIME_KEY =
"copy.lookbackTime";
+
/**
* Extract related configuration properties.
*/
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
index 0ed42da8b7..fd652926f1 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
import org.apache.gobblin.util.DatePartitionType;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
import static
org.apache.gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN;
@@ -54,9 +55,9 @@ import static
org.apache.gobblin.source.PartitionedFileSourceBase.DATE_PARTITION
*
* For example, if {@link ConfigurationKeys#SOURCE_FILEBASED_DATA_DIRECTORY}
is set to /my/data/, then the class assumes
* folders following the pattern /my/data/daily/[year]/[month]/[day] are
present. It will iterate through all the data
- * under these folders starting from the date specified by {@link
#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} until
- * either {@link #DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB} files have been
processed, or until there is no more data
- * to process. For example, if {@link
#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} is set to 2015/01/01, then the job
+ * under these folders starting from the date specified by {@link
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} until
+ * either {@link
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB} files have
been processed, or until there is no more data
+ * to process. For example, if {@link
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} is set
to 2015/01/01, then the job
* will read from the folder /my/data/daily/2015/01/01/,
/my/data/daily/2015/01/02/, /my/data/2015/01/03/ etc.
*
*/
@@ -114,6 +115,10 @@ public class DatePartitionedNestedRetriever implements
PartitionAwareFileRetriev
throw new IOException("Error initializing FileSystem", e);
}
+ GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
+ Long iteration = 0L;
+ LOG.info("~{}~ Starting collecting files to process from {} to {}",
sourceDir, lowWaterMarkDate, currentDay);
+
for (DateTime date = lowWaterMarkDate; !date.isAfter(currentDay) &&
filesToProcess.size() < maxFilesToReturn;
date = date.withFieldAdded(incrementalUnit, 1)) {
@@ -129,8 +134,14 @@ public class DatePartitionedNestedRetriever implements
PartitionAwareFileRetriev
new FileInfo(fileStatus.getPath().toString(),
fileStatus.getLen(), date.getMillis(), partitionPath));
}
}
+
+ if (growthTracker.isAnotherMilestone(iteration++)) {
+ LOG.info("~{}~ collected {} files to process; most-recent source path:
~{}~", sourceDir, filesToProcess.size(), sourcePath);
+ }
}
+ LOG.info("~{}~ Finished collecting {} files to process", sourceDir,
filesToProcess.size());
+
return filesToProcess;
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
index 4bab517dec..9e809e4d0c 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtils.java
@@ -16,7 +16,13 @@
*/
package org.apache.gobblin.source;
+import java.io.IOException;
+
+import org.joda.time.DateTimeFieldType;
import org.joda.time.Duration;
+import org.joda.time.chrono.ISOChronology;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.DatePartitionType;
@@ -31,6 +37,7 @@ import static
org.apache.gobblin.source.PartitionedFileSourceBase.DEFAULT_PARTIT
* Utility functions for parsing configuration parameters commonly used by
{@link PartitionAwareFileRetriever}
* objects.
*/
+@Slf4j
public class PartitionAwareFileRetrieverUtils {
/**
* Retrieve the lead time duration from the LEAD_TIME and LEAD_TIME
granularity config settings.
@@ -52,4 +59,28 @@ public class PartitionAwareFileRetrieverUtils {
return new Duration(leadTime * leadTimeGranularity.getUnitMilliseconds());
}
+
+ /**
+ * Calculates the lookback time duration based on the provided lookback time
string.
+ *
+ * @param lookBackTime the lookback time string, which should include a
numeric value followed by a time unit character.
+ * For example, "5d" for 5 days or "10h" for 10 hours.
See {@link DatePartitionType#lookupByPattern}
+ * @return an {@link Duration} of lookBackTime if the lookback time is valid
+ * @throws IOException if the lookback time is invalid
+ */
+ public static Duration getLookbackTimeDuration(String lookBackTime) throws
IOException {
+ DateTimeFieldType lookBackTimeGranularity =
DatePartitionType.getLowestIntervalUnit(lookBackTime);
+ if (lookBackTimeGranularity != null) {
+ long lookBackTimeGranularityInMillis =
+
lookBackTimeGranularity.getDurationType().getField(ISOChronology.getInstance()).getUnitMillis();
+ try {
+ long lookBack = Long.parseLong(lookBackTime.substring(0,
lookBackTime.length() - 1));
+ return new Duration(lookBack * lookBackTimeGranularityInMillis);
+ } catch(NumberFormatException ex) {
+ throw new IOException("Invalid lookback time: " + lookBackTime, ex);
+ }
+ } else {
+ throw new IOException("There is no valid time granularity in lookback
time: " + lookBackTime);
+ }
+ }
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index ece7db0c0e..f9f476c2ce 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -17,14 +17,25 @@
package org.apache.gobblin.source;
-import com.google.common.base.Throwables;
-import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Throwables;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
@@ -43,13 +54,6 @@ import
org.apache.gobblin.source.workunit.MultiWorkUnitWeightedQueue;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.DatePartitionType;
import org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.Duration;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -119,6 +123,9 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
private static final String DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB =
DATE_PARTITIONED_SOURCE_PREFIX + ".max.workunits.per.job";
+ private static final String DATE_PARTITIONED_SOURCE_LOOKBACK_TIME =
+ DATE_PARTITIONED_SOURCE_PREFIX + ".lookback.time";
+
// Default configuration parameter values
/**
@@ -137,12 +144,11 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
*/
private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE
= 0;
- private static final Logger LOG =
LoggerFactory.getLogger(PartitionedFileSourceBase.class);
-
// Instance variables
private SourceState sourceState;
private FileSystem fs;
private long lowWaterMark;
+ private String lookBackTime;
private int maxFilesPerJob;
private int maxWorkUnitsPerJob;
private int fileCount;
@@ -171,6 +177,8 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
this.sourceState = state;
+ this.lookBackTime = getLookBackTimeProp(state);
+
this.lowWaterMark =
getLowWaterMark(state.getPreviousWorkUnitStates(),
state.getProp(DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE,
String.valueOf(DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE)));
@@ -207,9 +215,9 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
// Initialize all instance variables for this object
init(state);
- LOG.info("Will pull data from " + formatter.print(this.lowWaterMark) + "
until " + this.maxFilesPerJob
+ log.info("Will pull data from " + formatter.print(this.lowWaterMark) + "
until " + this.maxFilesPerJob
+ " files have been processed, or until there is no more data to
consume");
- LOG.info("Creating workunits");
+ log.info("Creating workunits");
// Weighted MultiWorkUnitWeightedQueue, the job will add new WorkUnits to
the queue along with a weight for each
// WorkUnit. The queue will take care of balancing the WorkUnits amongst a
set number of MultiWorkUnits
@@ -220,7 +228,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
// If the file count has not exceeded maxFilesPerJob then start adding new
WorkUnits to for this job
if (this.fileCount >= this.maxFilesPerJob) {
- LOG.info(
+ log.info(
"The number of work units from previous job has already reached the
upper limit, no more workunits will be made");
return multiWorkUnitWeightedQueue.getQueueAsList();
}
@@ -268,7 +276,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
Throwables.propagate(e);
}
- LOG.info(
+ log.info(
"Will process file from previous workunit: " +
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
this.fileCount++;
@@ -286,7 +294,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
extract = new Extract(this.tableType, namespace, topicName);
- LOG.info("Created extract: " + extract.getExtractId() + " for path " +
topicName);
+ log.info("Created extract: " + extract.getExtractId() + " for path " +
topicName);
extractMap.put(file.getWatermarkMsSinceEpoch(), extract);
}
@@ -310,7 +318,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
for (PartitionAwareFileRetriever.FileInfo file : filesToPull) {
Extract extract = getExtractForFile(file, topicName, namespace,
extractMap);
- LOG.info("Will process file " + file.getFilePath());
+ log.info("Will process file " + file.getFilePath());
WorkUnit singleWorkUnit = WorkUnit.create(extract);
singleWorkUnit.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName);
@@ -329,7 +337,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
this.fileCount++;
}
- LOG.info("Total number of files extracted for the current run: " +
filesToPull.size());
+ log.info("Total number of files extracted for the current run: " +
filesToPull.size());
} catch (IOException e) {
Throwables.propagate(e);
}
@@ -347,11 +355,20 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
}
/**
- * Gets the LWM for this job runs. The new LWM is the HWM of the previous
run + 1 unit (day,hour,minute..etc).
+ * Gets the LWM for this job runs.
+ * If the lookback property {@link
PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_LOOKBACK_TIME)} or
+ * {@link ConfigurationKeys#DEFAULT_COPY_LOOKBACK_TIME_KEY} is provided
+ * then the LWM is set to the current time minus the lookback time.
+ * Otherwise, The new LWM is the HWM of the previous run + 1 unit
(day,hour,minute.etc.).
* If there was no previous execution then it is set to the given
lowWaterMark + 1 unit.
*/
private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String
lowWaterMark) {
+ // If the lookback time is set get LWM from the lookback time
+ if (StringUtils.isNotEmpty(this.lookBackTime)) {
+ return getLowWaterMarkFromLookbackTime(this.lookBackTime);
+ }
+
long lowWaterMarkValue = retriever.getWatermarkFromString(lowWaterMark);
// Find the max HWM from the previous states, this is the new current LWM
@@ -367,7 +384,25 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs();
}
+ /** Returns the low watermark value based on lookback which is equal to
current time minus lookback time. */
+ private long getLowWaterMarkFromLookbackTime(String lookBackTime) {
+ try {
+ Duration lookBackDuration =
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+ return new DateTime().minus(lookBackDuration).getMillis();
+ } catch (IOException e) {
+ log.error("Failed to parse lookback time: {} , Returning 0 as low
watermark value, {}", lookBackTime, e);
+ }
+ return 0;
+ }
+
protected PartitionAwareFileRetriever getRetriever() {
return retriever;
}
+
+ private static String getLookBackTimeProp(SourceState state) {
+ if (state.contains(DATE_PARTITIONED_SOURCE_LOOKBACK_TIME)) {
+ return state.getProp(DATE_PARTITIONED_SOURCE_LOOKBACK_TIME);
+ }
+ return state.getProp(ConfigurationKeys.DEFAULT_COPY_LOOKBACK_TIME_KEY, "");
+ }
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtilsTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtilsTest.java
new file mode 100644
index 0000000000..c11c3a3167
--- /dev/null
+++
b/gobblin-core/src/test/java/org/apache/gobblin/source/PartitionAwareFileRetrieverUtilsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source;
+
+import java.io.IOException;
+
+import org.joda.time.Duration;
+import org.joda.time.chrono.ISOChronology;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/** Tests for {@link PartitionAwareFileRetrieverUtils}*/
+public class PartitionAwareFileRetrieverUtilsTest {
+
+ @Test(dataProvider = "validLookbackTimes")
+ public void testValidLookbackTime(String lookBackTime, long expectedMillis)
throws IOException {
+ Duration expectedDuration = new Duration(expectedMillis);
+ Duration actualDuration =
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+ Assert.assertEquals(expectedDuration, actualDuration);
+ }
+
+ @Test(dataProvider = "invalidLookbackTimes", expectedExceptions =
IOException.class)
+ public void testInvalidLookbackTime(String lookBackTime) throws IOException {
+ PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+ }
+
+ @DataProvider(name = "validLookbackTimes")
+ public Object[][] validLookbackTimes() {
+ return new Object[][] {
+ {"5d", 5 * ISOChronology.getInstance().days().getUnitMillis()},
+ {"10h", 10 * ISOChronology.getInstance().hours().getUnitMillis()},
+ {"30m", 30 * ISOChronology.getInstance().minutes().getUnitMillis()}
+ };
+ }
+
+ @DataProvider(name = "invalidLookbackTimes")
+ public String[][] invalidLookbackTimes() {
+ return new String[][] {
+ {"5x"}, // Invalid format
+ {"30z"}, // Invalid format
+ {"xd"}, // Invalid number
+ {"yh"}, // Invalid number
+ {"zm"} // Invalid number
+ };
+ }
+
+}