[ 
https://issues.apache.org/jira/browse/GOBBLIN-1838?focusedWorklogId=865329&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865329
 ]

ASF GitHub Bot logged work on GOBBLIN-1838:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Jun/23 17:31
            Start Date: 13/Jun/23 17:31
    Worklog Time Spent: 10m 
      Work Description: wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1228473902


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -891,94 +898,40 @@ public void flush(String dbName, String tableName) throws 
IOException {
     }
   }
 
-  @Override
-  public void reset(String dbName, String tableName) throws IOException {
-    this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+  private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName, 
TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate) {
+    return new CompletenessWatermarkUpdater(topicName, 
this.auditCheckGranularity, this.timeZone,
+        tableMetadata, propsToUpdate, this.state, 
this.auditCountVerifier.get());
   }
 
-  /**
-   * Update TableMetadata with the new completion watermark upon a successful 
audit check
-   * @param tableMetadata metadata of table
-   * @param topic topic name
-   * @param timestamps Sorted set in reverse order of timestamps to check 
audit counts for
-   * @param props table properties map
-   */
-  private void checkAndUpdateCompletenessWatermark(TableMetadata 
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
-      Map<String, String> props) {
-    String tableName = tableMetadata.table.get().name();
-    if (topic == null) {
-      log.error(String.format("Not performing audit check. %s is null. Please 
set as table property of %s",
-          TOPIC_NAME_KEY, tableName));
-    }
-    long newCompletenessWatermark =
-        computeCompletenessWatermark(tableName, topic, timestamps, 
tableMetadata.completionWatermark);
-    if (newCompletenessWatermark > tableMetadata.completionWatermark) {
-      log.info(String.format("Updating %s for %s to %s", 
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
-          newCompletenessWatermark));
-      props.put(COMPLETION_WATERMARK_KEY, 
String.valueOf(newCompletenessWatermark));
-      props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
-      tableMetadata.completionWatermark = newCompletenessWatermark;
-    }
+  private void updateWatermarkWithFilesRegistered(String topicName, 
TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate, boolean 
includeTotalCountCompletionWatermark) {
+    getWatermarkUpdater(topicName, tableMetadata, propsToUpdate)
+        .run(tableMetadata.datePartitions, 
includeTotalCountCompletionWatermark);
   }
 
-  /**
-   * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit 
counts match
-   * for that window (aka its is set to the beginning of next window)
-   * For each timestamp in sorted collection of timestamps in descending order
-   * if timestamp is greater than previousWatermark
-   * and hour(now) > hour(prevWatermark)
-   *    check audit counts for completeness between
-   *    a source and reference tier for [timestamp -1 , timstamp unit of 
granularity]
-   *    If the audit count matches update the watermark to the timestamp and 
break
-   *    else continue
-   * else
-   *  break
-   * Using a {@link TimeIterator} that operates over a range of time in 1 unit
-   * given the start, end and granularity
-   * @param catalogDbTableName
-   * @param topicName
-   * @param timestamps a sorted set of timestamps in decreasing order
-   * @param previousWatermark previous completion watermark for the table
-   * @return updated completion watermark
-   */
-  private long computeCompletenessWatermark(String catalogDbTableName, String 
topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
-    log.info(String.format("Compute completion watermark for %s and timestamps 
%s with previous watermark %s", topicName, timestamps, previousWatermark));
-    long completionWatermark = previousWatermark;
-    ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
-    try {
-      if(timestamps == null || timestamps.size() <= 0) {
-        log.error("Cannot create time iterator. Empty for null timestamps");
-        return previousWatermark;
-      }
-      TimeIterator.Granularity granularity = 
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
-      ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
-          .atZone(ZoneId.of(this.timeZone));
-      ZonedDateTime startDT = timestamps.first();
-      ZonedDateTime endDT = timestamps.last();
-      TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, 
true);
-      while (iterator.hasNext()) {
-        ZonedDateTime timestampDT = iterator.next();
-        if (timestampDT.isAfter(prevWatermarkDT)
-            && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
-          long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
-            completionWatermark = timestampMillis;
-            // Also persist the watermark into State object to share this with 
other MetadataWriters
-            // we enforce ourselves to always use lower-cased table name here
-            String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
-            
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased), completionWatermark);
-            break;
-          }
-        } else {
-          break;
-        }
-      }
-    } catch (IOException e) {
-      log.warn("Exception during audit count check: ", e);
+  private void updateWatermarkWithNoFilesRegistered(String topicName, 
TableMetadata tableMetadata,

Review Comment:
   This will be refined in next iteration.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 865329)
    Time Spent: 1h 20m  (was: 1h 10m)

> Introduce total count based completion watermark
> ------------------------------------------------
>
>                 Key: GOBBLIN-1838
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1838
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Andy Jiang
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently the completion watermark is determined according to the 
> completeness percentage: "max of srcCount/refCount, for each refTier".
> This change introduces a new "total count based completion watermark", which 
> is determined by a new completeness percentage: "srcCount / sum of all 
> refCount, for each refTier".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to