[ https://issues.apache.org/jira/browse/GOBBLIN-2147?focusedWorklogId=960348&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-960348 ]
ASF GitHub Bot logged work on GOBBLIN-2147: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Mar/25 17:37 Start Date: 05/Mar/25 17:37 Worklog Time Spent: 10m Work Description: abhishekmjain commented on code in PR #4044: URL: https://github.com/apache/gobblin/pull/4044#discussion_r1981876448 ########## gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java: ########## @@ -367,7 +384,25 @@ private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String lowW return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs(); } + /** Returns the low watermark value based on lookback which is equal to current time minus lookback time. */ + private long getLowWaterMarkFromLookbackTime(String lookBackTime) { + try { + Duration lookBackDuration = PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime); + return new DateTime().minus(lookBackDuration).getMillis(); + } catch (IOException e) { + Throwables.propagate(e); Review Comment: Should we propagate the exception if we want to have a fallback to 0? Also, looks like Throwables.propagate is deprecated. Let's just log the exception and move ahead here. ########## gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java: ########## @@ -367,7 +384,25 @@ private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String lowW return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs(); } + /** Returns the low watermark value based on lookback which is equal to current time minus lookback time. */ + private long getLowWaterMarkFromLookbackTime(String lookBackTime) { + try { + Duration lookBackDuration = PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime); + return new DateTime().minus(lookBackDuration).getMillis(); + } catch (IOException e) { + Throwables.propagate(e); Review Comment: Should we propagate the exception if we want to have a fallback to 0? Also, looks like Throwables.propagate is deprecated. Let's just log the exception instead. Issue Time Tracking ------------------- Worklog Id: (was: 960348) Time Spent: 1.5h (was: 1h 20m) > Add lookback time property in PartitionedFileSource > --------------------------------------------------- > > Key: GOBBLIN-2147 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2147 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vivek Rai > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > All FileBasedSource implementations should have config for lookback time. > > Currently > FileBasedSources look for data since the time set by > `conversion.min.watermark` and time granularity is decided by the lowest time > denomination. that denomination in many cases, including this one, is 1 second > (determined by > |gobblin.flow.input.dataset.descriptor.partition.pattern|yyyy-MM-dd_HH_mm_ss| > > It is an extremely abusive way to find workunits. > Let's enable these jobs to use lookback time configs like several other > dataset finders do. -- This message was sent by Atlassian Jira (v8.20.10#820010)