sv2000 commented on a change in pull request #3385:
URL: https://github.com/apache/gobblin/pull/3385#discussion_r706297393



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -858,46 +859,60 @@ public void flush(String dbName, String tableName) throws 
IOException {
   }
 
   /**
-   * For a sorted collection of timestamps greater than an existing watermark, 
check audit counts for completeness between
-   * a source and reference tier with 1 unit of granularity
-   * If the audit count matches update the watermark to the timestamp
+   * For each timestamp in sorted collection of timestamps in descending order
+   * if timestamp is greater than previousWatermark
+   * and hour(now) > hour(prevWatermark) + 1
+   *    check audit counts for completeness between
+   *    a source and reference tier for [timestamp, timstamp + 1 unit of 
granularity]
+   *    If the audit count matches update the watermark to the timestamp and 
break
+   *    else continue
+   * else
+   *  break
    * Using a {@link TimeIterator} that operates over a range of time in 1 unit
    * given the start, end and granularity
    * @param table
-   * @param timestamps a sorted set of timestamps in increasing order
+   * @param timestamps a sorted set of timestamps in decreasing order
    * @param previousWatermark previous completion watermark for the table
    * @return updated completion watermark
    */
-  private long computeCompletenessWatermark(String table, SortedSet<Long> 
timestamps, long previousWatermark) {
+  private long computeCompletenessWatermark(String table, 
SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
     log.info(String.format("Compute completion watermark for %s and timestamps 
%s with previous watermark %s", table, timestamps, previousWatermark));
     long completionWatermark = previousWatermark;
+    ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
     try {
       if(timestamps == null || timestamps.size() <= 0) {
         log.error("Cannot create time iterator. Empty for null timestamps");
         return previousWatermark;
       }
-
-      ZonedDateTime startDT = Instant.ofEpochMilli(timestamps.first())
-          .atZone(ZoneId.of(this.timeZone));
-      ZonedDateTime endDT = Instant.ofEpochMilli(timestamps.last())
+      TimeIterator.Granularity granularity = 
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
+      ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
           .atZone(ZoneId.of(this.timeZone));
-      TimeIterator iterator = new TimeIterator(startDT, endDT, 
TimeIterator.Granularity.valueOf(this.auditCheckGranularity));
+      ZonedDateTime startDT = timestamps.first();
+      ZonedDateTime endDT = timestamps.last();
+      TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, 
true);
       while (iterator.hasNext()) {
-        long start = iterator.next().toInstant().toEpochMilli();
-        if (start > previousWatermark) {
-          if(auditCountVerifier.get().isComplete(table, start, 
iterator.getStartTime().toInstant().toEpochMilli())) {
-            completionWatermark = start;
+        ZonedDateTime timestampDT = iterator.next();
+        if (timestampDT.isAfter(prevWatermarkDT)
+            && getHoursFromEpoch(now) > (getHoursFromEpoch(prevWatermarkDT) + 
1)) {

Review comment:
       I guess you are assuming an hourly granularity here? Is that expected?
   
   It might be simpler to use: Duration.between() API and convert it to the 
configured granularity e.g. via toHours(), toDays() or toMins().




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to