[ 
https://issues.apache.org/jira/browse/GOBBLIN-2147?focusedWorklogId=960348&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-960348
 ]

ASF GitHub Bot logged work on GOBBLIN-2147:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Mar/25 17:37
            Start Date: 05/Mar/25 17:37
    Worklog Time Spent: 10m 
      Work Description: abhishekmjain commented on code in PR #4044:
URL: https://github.com/apache/gobblin/pull/4044#discussion_r1981876448


##########
gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java:
##########
@@ -367,7 +384,25 @@ private long getLowWaterMark(Iterable<WorkUnitState> 
previousStates, String lowW
     return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs();
   }
 
+  /** Returns the low watermark value based on lookback which is equal to 
current time minus lookback time. */
+  private long getLowWaterMarkFromLookbackTime(String lookBackTime) {
+    try {
+      Duration lookBackDuration = 
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+      return new DateTime().minus(lookBackDuration).getMillis();
+    } catch (IOException e) {
+      Throwables.propagate(e);

Review Comment:
   Should we propagate the exception if we want to have a fallback to 0?
   
   Also, looks like Throwables.propagate is deprecated. Let's just log the 
exception and move ahead here.



##########
gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java:
##########
@@ -367,7 +384,25 @@ private long getLowWaterMark(Iterable<WorkUnitState> 
previousStates, String lowW
     return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs();
   }
 
+  /** Returns the low watermark value based on lookback which is equal to 
current time minus lookback time. */
+  private long getLowWaterMarkFromLookbackTime(String lookBackTime) {
+    try {
+      Duration lookBackDuration = 
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+      return new DateTime().minus(lookBackDuration).getMillis();
+    } catch (IOException e) {
+      Throwables.propagate(e);

Review Comment:
   Should we propagate the exception if we want to have a fallback to 0?
   
   Also, looks like Throwables.propagate is deprecated. Let's just log the 
exception instead.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 960348)
    Time Spent: 1.5h  (was: 1h 20m)

> Add lookback time property in PartitionedFileSource
> ---------------------------------------------------
>
>                 Key: GOBBLIN-2147
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2147
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> All FileBasedSource implementations should have config for lookback time.
>  
> Currently 
> FileBasedSources look for data since the time set by 
> `conversion.min.watermark` and time granularity is decided by the lowest time 
> denomination. that denomination in many cases, including this one, is 1 second
> (determined by 
> |gobblin.flow.input.dataset.descriptor.partition.pattern|yyyy-MM-dd_HH_mm_ss|
>  
> It is an extremely abusive way to find workunits.
> Let's enable these jobs to use lookback time configs like several other 
> dataset finders do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to