This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ba408f23c [GOBBLIN-1838] Introduce total count based completion 
watermark (#3701)
ba408f23c is described below

commit ba408f23c89357bc36ee70415041c2cff2b36df0
Author: Tao Qin <[email protected]>
AuthorDate: Mon Jul 24 12:25:20 2023 -0700

    [GOBBLIN-1838] Introduce total count based completion watermark (#3701)
    
    * [GOBBLIN-1838] Introduce total count based completion watermark
    
    * Refactor and address comments
    
    * fix style
    
    * combine completeness computation
    
    * Fix the logic for quiet topics
    
    * Add UT for ComopletenessWatermarkUpdater
    
    * space only change
    
    * refine
    
    ---------
    
    Co-authored-by: Tao Qin <[email protected]>
---
 .../verifier/KafkaAuditCountVerifier.java          | 124 +++++++--
 .../verifier/KafkaAuditCountVerifierTest.java      |  75 +++++-
 .../writer/CompletenessWatermarkUpdater.java       | 284 +++++++++++++++++++++
 .../iceberg/writer/IcebergMetadataWriter.java      | 178 +++++--------
 .../writer/IcebergMetadataWriterConfigKeys.java    |   8 +-
 .../writer/CompletenessWatermarkUpdaterTest.java   | 205 +++++++++++++++
 .../iceberg/writer/IcebergMetadataWriterTest.java  |  30 ++-
 7 files changed, 761 insertions(+), 143 deletions(-)

diff --git 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index d4495f9eb..77d89ff31 100644
--- 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++ 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.completeness.verifier;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
@@ -43,14 +44,22 @@ public class KafkaAuditCountVerifier {
   public static final String COMPLETENESS_PREFIX = "completeness.";
   public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
   public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX + 
"reference.tiers";
+  public static final String TOTAL_COUNT_REFERENCE_TIERS = COMPLETENESS_PREFIX 
+ "totalCount.reference.tiers";
   public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
   private static final double DEFAULT_THRESHOLD = 0.999;
   public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX + 
"complete.on.no.counts";
+
+  public enum CompletenessType {
+    ClassicCompleteness,
+    TotalCountCompleteness
+  }
+
   private final boolean returnCompleteOnNoCounts;
 
   private final AuditCountClient auditCountClient;
   private final String srcTier;
   private final Collection<String> refTiers;
+  private final Collection<String> totalCountRefTiers;
   private final double threshold;
 
   /**
@@ -69,6 +78,9 @@ public class KafkaAuditCountVerifier {
         state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
     this.srcTier = state.getProp(SOURCE_TIER);
     this.refTiers = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+    this.totalCountRefTiers = state.contains(TOTAL_COUNT_REFERENCE_TIERS)
+        ? 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(TOTAL_COUNT_REFERENCE_TIERS))
+        : null;
     this.returnCompleteOnNoCounts = 
state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
   }
 
@@ -90,23 +102,52 @@ public class KafkaAuditCountVerifier {
     }
   }
 
+  public Map<CompletenessType, Boolean> calculateCompleteness(String 
datasetName, long beginInMillis, long endInMillis)
+      throws IOException {
+    return calculateCompleteness(datasetName, beginInMillis, endInMillis, 
this.threshold);
+  }
+
   /**
    * Compare source tier against reference tiers.
-   * Compute completion percentage by srcCount/refCount. Return true iff the 
highest percentages is greater than threshold.
+   * Compute completion percentage which is true iff the calculated 
percentages is greater than threshold.
    *
    * @param datasetName A dataset short name like 'PageViewEvent'
    * @param beginInMillis Unix timestamp in milliseconds
    * @param endInMillis Unix timestamp in milliseconds
    * @param threshold User defined threshold
+   *
+   * @return a map of completeness result by CompletenessType
    */
-  public boolean isComplete(String datasetName, long beginInMillis, long 
endInMillis, double threshold)
-      throws IOException {
-    return getCompletenessPercentage(datasetName, beginInMillis, endInMillis) 
> threshold;
+  public Map<CompletenessType, Boolean> calculateCompleteness(String 
datasetName, long beginInMillis, long endInMillis,
+      double threshold) throws IOException {
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
+    log.info(String.format("checkTierCounts: audit counts map for %s for range 
[%s,%s]", datasetName, beginInMillis, endInMillis));
+    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+
+    Map<CompletenessType, Boolean> result = new HashMap<>();
+    result.put(CompletenessType.ClassicCompleteness, 
calculateCompleteness(datasetName, beginInMillis, endInMillis,
+        CompletenessType.ClassicCompleteness, countsByTier) > threshold);
+    result.put(CompletenessType.TotalCountCompleteness, 
calculateCompleteness(datasetName, beginInMillis, endInMillis,
+        CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
+    return result;
   }
 
-  public boolean isComplete(String datasetName, long beginInMillis, long 
endInMillis)
-      throws IOException {
-    return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+  private double calculateCompleteness(String datasetName, long beginInMillis, 
long endInMillis, CompletenessType type,
+      Map<String, Long> countsByTier) throws IOException {
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
+      log.info(String.format("Found empty counts map for %s, returning 
complete", datasetName));
+      return 1.0;
+    }
+
+    switch (type) {
+      case ClassicCompleteness:
+        return calculateClassicCompleteness(datasetName, beginInMillis, 
endInMillis, countsByTier);
+      case TotalCountCompleteness:
+        return calculateTotalCountCompleteness(datasetName, beginInMillis, 
endInMillis, countsByTier);
+      default:
+        log.error("Skip unsupported completeness type {}", type);
+        return -1;
+    }
   }
 
   /**
@@ -118,39 +159,70 @@ public class KafkaAuditCountVerifier {
    *
    * @return The highest percentage value
    */
-  private double getCompletenessPercentage(String datasetName, long 
beginInMillis, long endInMillis) throws IOException {
-    Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", 
datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
-    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
-      log.info(String.format("Found empty counts map for %s, returning 
complete", datasetName));
-      return 1.0;
-    }
-    double percent = -1;
-    if (!countsByTier.containsKey(this.srcTier)) {
-      throw new IOException(String.format("Source tier %s audit count cannot 
be retrieved for dataset %s between %s and %s", this.srcTier, datasetName, 
beginInMillis, endInMillis));
-    }
+  private double calculateClassicCompleteness(String datasetName, long 
beginInMillis, long endInMillis,
+      Map<String, Long> countsByTier) throws IOException {
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.refTiers);
 
+    double percent = -1;
     for (String refTier: this.refTiers) {
-      if (!countsByTier.containsKey(refTier)) {
-        throw new IOException(String.format("Reference tier %s audit count 
cannot be retrieved for dataset %s between %s and %s", refTier, datasetName, 
beginInMillis, endInMillis));
-      }
       long refCount = countsByTier.get(refTier);
-      if(refCount <= 0) {
-        throw new IOException(String.format("Reference tier %s count cannot be 
less than or equal to zero", refTier));
-      }
       long srcCount = countsByTier.get(this.srcTier);
-
       percent = Double.max(percent, (double) srcCount / (double) refCount);
     }
 
     if (percent < 0) {
       throw new IOException("Cannot calculate completion percentage");
     }
+    return percent;
+  }
 
+  /**
+   * Check total count based completeness by comparing source tier against 
reference tiers,
+   * and calculate the completion percentage by srcCount/sum_of(refCount).
+   *
+   * @param datasetName A dataset short name like 'PageViewEvent'
+   * @param beginInMillis Unix timestamp in milliseconds
+   * @param endInMillis Unix timestamp in milliseconds
+   *
+   * @return The percentage value by srcCount/sum_of(refCount)
+   */
+  private double calculateTotalCountCompleteness(String datasetName, long 
beginInMillis, long endInMillis,
+      Map<String, Long> countsByTier) throws IOException {
+    if (this.totalCountRefTiers == null) {
+      return -1;
+    }
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.totalCountRefTiers);
+
+    long srcCount = countsByTier.get(this.srcTier);
+    long totalRefCount = this.totalCountRefTiers
+        .stream()
+        .mapToLong(countsByTier::get)
+        .sum();
+    double percent = Double.max(-1, (double) srcCount / (double) 
totalRefCount);
+    if (percent < 0) {
+      throw new IOException("Cannot calculate total count completion 
percentage");
+    }
     return percent;
   }
 
+  private static void validateTierCounts(String datasetName, long 
beginInMillis, long endInMillis, Map<String, Long> countsByTier,
+      String sourceTier, Collection<String> referenceTiers)
+      throws IOException {
+    if (!countsByTier.containsKey(sourceTier)) {
+      throw new IOException(String.format("Source tier %s audit count cannot 
be retrieved for dataset %s between %s and %s", sourceTier, datasetName, 
beginInMillis, endInMillis));
+    }
+
+    for (String refTier: referenceTiers) {
+      if (!countsByTier.containsKey(refTier)) {
+        throw new IOException(String.format("Reference tier %s audit count 
cannot be retrieved for dataset %s between %s and %s", refTier, datasetName, 
beginInMillis, endInMillis));
+      }
+      long refCount = countsByTier.get(refTier);
+      if(refCount <= 0) {
+        throw new IOException(String.format("Reference tier %s count cannot be 
less than or equal to zero", refTier));
+      }
+    }
+  }
+
   /**
    * Fetch all <tier-count> pairs for a given dataset between a time range
    */
diff --git 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index 7a005c6c2..3e7140f41 100644
--- 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++ 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -33,6 +33,11 @@ public class KafkaAuditCountVerifierTest {
   public static final String SOURCE_TIER = "gobblin";
   public static final String REFERENCE_TIERS = "producer";
 
+  public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
+  public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
+  public static final String TOTAL_COUNT_REFERENCE_TIERS = 
TOTAL_COUNT_REF_TIER_0 + "," + TOTAL_COUNT_REF_TIER_1;
+
+
   public void testFetch() throws IOException {
     final String topic = "testTopic";
     State props = new State();
@@ -48,22 +53,86 @@ public class KafkaAuditCountVerifierTest {
         REFERENCE_TIERS,   1000L
     ));
     // Default threshold
-    Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
 
     // 99.999 % complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 999L,
         REFERENCE_TIERS,   1000L
     ));
-    Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
 
     // <= 99% complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 990L,
         REFERENCE_TIERS,   1000L
     ));
-    Assert.assertFalse(verifier.isComplete(topic, 0L, 0L));
+    Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+  }
+
+  public void testTotalCountCompleteness() throws IOException {
+    final String topic = "testTopic";
+    State props = new State();
+    props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+    props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, 
TOTAL_COUNT_REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+    TestAuditClient client = new TestAuditClient(props);
+    KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, 
client);
+
+    // All complete
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 1000L,
+        REFERENCE_TIERS, 1000L,
+        TOTAL_COUNT_REF_TIER_0, 600L,
+        TOTAL_COUNT_REF_TIER_1, 400L
+    ));
+    // Default threshold
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+
+    // 99.999 % complete
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 999L,
+        REFERENCE_TIERS, 1000L,
+        TOTAL_COUNT_REF_TIER_0, 600L,
+        TOTAL_COUNT_REF_TIER_1, 400L
+    ));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+
+    // <= 99% complete
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 990L,
+        REFERENCE_TIERS, 1000L,
+        TOTAL_COUNT_REF_TIER_0, 600L,
+        TOTAL_COUNT_REF_TIER_1, 400L
+    ));
+    Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
   }
 
+  public void testEmptyAuditCount() throws IOException {
+    final String topic = "testTopic";
+    State props = new State();
+    props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+    props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, 
TOTAL_COUNT_REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+    props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
+    TestAuditClient client = new TestAuditClient(props);
+    KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, 
client);
 
+    // Client gets empty audit count
+    client.setTierCounts(ImmutableMap.of());
+
+    // Should be complete, since COMPLETE_ON_NO_COUNTS=true
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+  }
 }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
new file mode 100644
index 000000000..b022ac339
--- /dev/null
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedSet;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.time.TimeIterator;
+
+import static 
org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
+
+/**
+ * A class for completeness watermark updater.
+ * It computes the new watermarks and updates below entities:
+ *  1. the properties in {@link IcebergMetadataWriter.TableMetadata}
+ *  2. {@link gobblin.configuration.State}
+ *  3. the completionWatermark in {@link IcebergMetadataWriter.TableMetadata}
+ */
+@Slf4j
+public class CompletenessWatermarkUpdater {
+  private final String topic;
+  private final String auditCheckGranularity;
+
+  protected final String timeZone;
+  protected final IcebergMetadataWriter.TableMetadata tableMetadata;
+  protected final Map<String, String> propsToUpdate;
+  protected final State stateToUpdate;
+  protected KafkaAuditCountVerifier auditCountVerifier;
+
+  public CompletenessWatermarkUpdater(String topic, String 
auditCheckGranularity, String timeZone,
+      IcebergMetadataWriter.TableMetadata tableMetadata, Map<String, String> 
propsToUpdate, State stateToUpdate,
+      KafkaAuditCountVerifier auditCountVerifier) {
+    this.tableMetadata = tableMetadata;
+    this.topic = topic;
+    this.auditCheckGranularity = auditCheckGranularity;
+    this.timeZone = timeZone;
+    this.propsToUpdate = propsToUpdate;
+    this.stateToUpdate = stateToUpdate;
+    this.auditCountVerifier = auditCountVerifier;
+  }
+
+  /**
+   * Update TableMetadata with the new completion watermark upon a successful 
audit check
+   * @param timestamps Sorted set in reverse order of timestamps to check 
audit counts for
+   * @param includeTotalCountCompletionWatermark If 
totalCountCompletionWatermark should be calculated
+   */
+  public void run(SortedSet<ZonedDateTime> timestamps, boolean 
includeTotalCountCompletionWatermark) {
+    String tableName = tableMetadata.table.get().name();
+    if (this.topic == null) {
+      log.error(String.format("Not performing audit check. %s is null. Please 
set as table property of %s",
+          TOPIC_NAME_KEY, tableName));
+    }
+    computeAndUpdateWatermark(tableName, timestamps, 
includeTotalCountCompletionWatermark);
+  }
+
+  private void computeAndUpdateWatermark(String tableName, 
SortedSet<ZonedDateTime> timestamps,
+      boolean includeTotalCountWatermark) {
+    log.info(String.format("Compute completion watermark for %s and timestamps 
%s with previous watermark %s, previous totalCount watermark %s, 
includeTotalCountWatermark=%b",
+        this.topic, timestamps, tableMetadata.completionWatermark, 
tableMetadata.totalCountCompletionWatermark,
+        includeTotalCountWatermark));
+
+    WatermarkUpdaterSet updaterSet = new 
WatermarkUpdaterSet(this.tableMetadata, this.timeZone, this.propsToUpdate,
+        this.stateToUpdate, includeTotalCountWatermark);
+    if(timestamps == null || timestamps.size() <= 0) {
+      log.error("Cannot create time iterator. Empty for null timestamps");
+      return;
+    }
+
+    ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
+    TimeIterator.Granularity granularity = 
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
+    ZonedDateTime startDT = timestamps.first();
+    ZonedDateTime endDT = timestamps.last();
+    TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, 
true);
+    try {
+      while (iterator.hasNext()) {
+        ZonedDateTime timestampDT = iterator.next();
+        updaterSet.checkForEarlyStop(timestampDT, now, granularity);
+        if (updaterSet.allFinished()) {
+          break;
+        }
+
+        ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
+        Map<KafkaAuditCountVerifier.CompletenessType, Boolean> results =
+            this.auditCountVerifier.calculateCompleteness(this.topic,
+                auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
+                timestampDT.toInstant().toEpochMilli());
+
+        updaterSet.computeAndUpdate(results, timestampDT);
+      }
+    } catch (IOException e) {
+      log.warn("Exception during audit count check: ", e);
+    }
+  }
+
+  /**
+   * A class that contains both ClassicWatermakrUpdater and 
TotalCountWatermarkUpdater
+   */
+  static class WatermarkUpdaterSet {
+    private final List<WatermarkUpdater> updaters;
+
+    WatermarkUpdaterSet(IcebergMetadataWriter.TableMetadata tableMetadata, 
String timeZone,
+        Map<String, String> propsToUpdate, State stateToUpdate, boolean 
includeTotalCountWatermark) {
+      this.updaters = new ArrayList<>();
+      this.updaters.add(new 
ClassicWatermarkUpdater(tableMetadata.completionWatermark, timeZone, 
tableMetadata,
+          propsToUpdate, stateToUpdate));
+      if (includeTotalCountWatermark) {
+        this.updaters.add(new 
TotalCountWatermarkUpdater(tableMetadata.totalCountCompletionWatermark, 
timeZone,
+            tableMetadata, propsToUpdate, stateToUpdate));
+      }
+    }
+
+    void checkForEarlyStop(ZonedDateTime timestampDT, ZonedDateTime now,
+        TimeIterator.Granularity granularity) {
+      this.updaters.stream().forEach(updater
+          -> updater.checkForEarlyStop(timestampDT, now, granularity));
+    }
+
+    boolean allFinished() {
+      return this.updaters.stream().allMatch(updater -> updater.isFinished());
+    }
+
+    void computeAndUpdate(Map<KafkaAuditCountVerifier.CompletenessType, 
Boolean> results,
+        ZonedDateTime timestampDT) {
+      this.updaters.stream()
+          .filter(updater -> !updater.isFinished())
+          .forEach(updater -> updater.computeAndUpdate(results, timestampDT));
+    }
+  }
+
+  /**
+   * A stateful class for watermark updaters.
+   * The updater starts with finished=false state.
+   * Then computeAndUpdate() is called multiple times with the parameters:
+   * 1. The completeness audit results within (datepartition-1, datepartition)
+   * 2. the datepartition timestamp
+   * The method is call multiple times in descending order of the 
datepartition timestamp.
+   * <p>
+   * When the audit result is complete for a timestamp, it updates below 
entities:
+   *    1. the properties in {@link IcebergMetadataWriter.TableMetadata}
+   *    2. {@link gobblin.configuration.State}
+   *    3. the completionWatermark in {@link 
IcebergMetadataWriter.TableMetadata}
+   *  And it turns into finished=true state, in which the following 
computeAndUpdate() calls will be skipped.
+   */
+  static abstract class WatermarkUpdater {
+    protected final long previousWatermark;
+    protected final ZonedDateTime prevWatermarkDT;
+    protected final String timeZone;
+    protected boolean finished = false;
+    protected final IcebergMetadataWriter.TableMetadata tableMetadata;
+    protected final Map<String, String> propsToUpdate;
+    protected final State stateToUpdate;
+
+    public WatermarkUpdater(long previousWatermark, String timeZone, 
IcebergMetadataWriter.TableMetadata tableMetadata,
+        Map<String, String> propsToUpdate, State stateToUpdate) {
+      this.previousWatermark = previousWatermark;
+      this.timeZone = timeZone;
+      this.tableMetadata = tableMetadata;
+      this.propsToUpdate = propsToUpdate;
+      this.stateToUpdate = stateToUpdate;
+
+      prevWatermarkDT = 
Instant.ofEpochMilli(previousWatermark).atZone(ZoneId.of(this.timeZone));
+    }
+
+    public void computeAndUpdate(Map<KafkaAuditCountVerifier.CompletenessType, 
Boolean> results,
+        ZonedDateTime timestampDT) {
+      if (finished) {
+        return;
+      }
+      computeAndUpdateInternal(results, timestampDT);
+    }
+
+    protected abstract void 
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean> 
results,
+        ZonedDateTime timestampDT);
+
+    protected boolean isFinished() {
+      return this.finished;
+    }
+
+    protected void setFinished() {
+      this.finished = true;
+    }
+
+    protected void checkForEarlyStop(ZonedDateTime timestampDT, ZonedDateTime 
now,
+        TimeIterator.Granularity granularity) {
+      if (isFinished()
+          || (timestampDT.isAfter(this.prevWatermarkDT)
+              && TimeIterator.durationBetween(this.prevWatermarkDT, now, 
granularity) > 0)) {
+        return;
+      }
+      setFinished();
+    }
+  }
+
+  @VisibleForTesting
+  void setAuditCountVerifier(KafkaAuditCountVerifier auditCountVerifier) {
+    this.auditCountVerifier = auditCountVerifier;
+  }
+
+  static class ClassicWatermarkUpdater extends WatermarkUpdater {
+    public ClassicWatermarkUpdater(long previousWatermark, String timeZone,
+        IcebergMetadataWriter.TableMetadata tableMetadata, Map<String, String> 
propsToUpdate, State stateToUpdate) {
+      super(previousWatermark, timeZone, tableMetadata, propsToUpdate, 
stateToUpdate);
+    }
+
+    @Override
+    protected void 
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean> 
results,
+        ZonedDateTime timestampDT) {
+      if 
(!results.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness)) {
+        return;
+      }
+
+      setFinished();
+      long updatedWatermark = timestampDT.toInstant().toEpochMilli();
+      this.stateToUpdate.setProp(
+          String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
+              this.tableMetadata.table.get().name().toLowerCase(Locale.ROOT)),
+          updatedWatermark);
+
+      if (updatedWatermark > this.previousWatermark) {
+        log.info(String.format("Updating %s for %s to %s", 
COMPLETION_WATERMARK_KEY,
+            this.tableMetadata.table.get().name(), updatedWatermark));
+        this.propsToUpdate.put(COMPLETION_WATERMARK_KEY, 
String.valueOf(updatedWatermark));
+        this.propsToUpdate.put(COMPLETION_WATERMARK_TIMEZONE_KEY, 
this.timeZone);
+
+        this.tableMetadata.completionWatermark = updatedWatermark;
+      }
+    }
+  }
+
+  static class TotalCountWatermarkUpdater extends WatermarkUpdater {
+    public TotalCountWatermarkUpdater(long previousWatermark, String timeZone,
+        IcebergMetadataWriter.TableMetadata tableMetadata, Map<String, String> 
propsToUpdate, State stateToUpdate) {
+      super(previousWatermark, timeZone, tableMetadata, propsToUpdate, 
stateToUpdate);
+    }
+
+    @Override
+    protected void 
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean> 
results,
+        ZonedDateTime timestampDT) {
+      if 
(!results.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness)) 
{
+        return;
+      }
+
+      setFinished();
+      long updatedWatermark = timestampDT.toInstant().toEpochMilli();
+      this.stateToUpdate.setProp(
+          String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
+              this.tableMetadata.table.get().name().toLowerCase(Locale.ROOT)),
+          updatedWatermark);
+
+      if (updatedWatermark > previousWatermark) {
+        log.info(String.format("Updating %s for %s to %s", 
TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+            this.tableMetadata.table.get().name(), updatedWatermark));
+        this.propsToUpdate.put(TOTAL_COUNT_COMPLETION_WATERMARK_KEY, 
String.valueOf(updatedWatermark));
+        tableMetadata.totalCountCompletionWatermark = updatedWatermark;
+      }
+    }
+  }
+
+}
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index b6c9e15dc..19bc805eb 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -32,7 +32,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -122,7 +121,6 @@ import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.time.TimeIterator;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.ClustersNames;
 import org.apache.gobblin.util.HadoopUtils;
@@ -168,7 +166,9 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   private static final String ICEBERG_FILE_PATH_COLUMN = 
DataFile.FILE_PATH.name();
 
   private final boolean completenessEnabled;
+  private final boolean totalCountCompletenessEnabled;
   private final WhitelistBlacklist completenessWhitelistBlacklist;
+  private final WhitelistBlacklist 
totalCountBasedCompletenessWhitelistBlacklist;
   private final String timeZone;
   private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
   private final String newPartitionColumn;
@@ -234,8 +234,13 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
               FsPermission.getDefault());
     }
     this.completenessEnabled = 
state.getPropAsBoolean(ICEBERG_COMPLETENESS_ENABLED, 
DEFAULT_ICEBERG_COMPLETENESS);
+    this.totalCountCompletenessEnabled = 
state.getPropAsBoolean(ICEBERG_TOTAL_COUNT_COMPLETENESS_ENABLED,
+        DEFAULT_ICEBERG_TOTAL_COUNT_COMPLETENESS);
     this.completenessWhitelistBlacklist = new 
WhitelistBlacklist(state.getProp(ICEBERG_COMPLETENESS_WHITELIST, ""),
         state.getProp(ICEBERG_COMPLETENESS_BLACKLIST, ""));
+    this.totalCountBasedCompletenessWhitelistBlacklist = new 
WhitelistBlacklist(
+        state.getProp(ICEBERG_TOTAL_COUNT_COMPLETENESS_WHITELIST, ""),
+        state.getProp(ICEBERG_TOTAL_COUNT_COMPLETENESS_BLACKLIST, ""));
     this.timeZone = state.getProp(TIME_ZONE_KEY, DEFAULT_TIME_ZONE);
     this.HOURLY_DATEPARTITION_FORMAT = 
DateTimeFormatter.ofPattern(DATEPARTITION_FORMAT)
         .withZone(ZoneId.of(this.timeZone));
@@ -258,7 +263,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   }
 
   private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws 
NoSuchTableException {
-    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata());
+    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata(this.conf));
     if (!tableMetadata.table.isPresent()) {
       tableMetadata.table = Optional.of(catalog.loadTable(tid));
     }
@@ -304,7 +309,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   public void write(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> newSpecsMap,
       Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) 
throws IOException {
     TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName());
-    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata());
+    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata(this.conf));
     Table table;
     try {
       table = getIcebergTable(tid);
@@ -327,6 +332,12 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     if(tableMetadata.completenessEnabled) {
       tableMetadata.completionWatermark = 
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
           String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+
+      if (tableMetadata.totalCountCompletenessEnabled) {
+        tableMetadata.totalCountCompletionWatermark = Long.parseLong(
+            
table.properties().getOrDefault(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+                String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+      }
     }
 
     computeCandidateSchema(gmce, tid, tableSpec);
@@ -392,7 +403,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    * the given {@link TableIdentifier} with the input {@link 
GobblinMetadataChangeEvent}
    */
   private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier 
tid) {
-    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata());
+    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata(this.conf));
     tableMetadata.dataOffsetRange = 
Optional.of(tableMetadata.dataOffsetRange.or(() -> 
getLastOffset(tableMetadata)));
     Map<String, List<Range>> offsets = tableMetadata.dataOffsetRange.get();
     for (Map.Entry<String, String> entry : 
gmce.getTopicPartitionOffsetsRange().entrySet()) {
@@ -424,7 +435,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
 
   protected void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid, 
GobblinMetadataChangeEvent gmce) {
     org.apache.hadoop.hive.metastore.api.Table table = 
HiveMetaStoreUtils.getTable(tableSpec.getTable());
-    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata());
+    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata(this.conf));
     tableMetadata.newProperties = 
Optional.of(IcebergUtils.getTableProperties(table));
     String nativeName = tableMetadata.datasetName;
     String topic = nativeName.substring(nativeName.lastIndexOf("/") + 1);
@@ -442,7 +453,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    */
   private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, 
TableIdentifier tid, HiveSpec spec) {
     Table table = getIcebergTable(tid);
-    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata());
+    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata(this.conf));
     org.apache.hadoop.hive.metastore.api.Table hiveTable = 
HiveMetaStoreUtils.getTable(spec.getTable());
     tableMetadata.lastProperties = 
Optional.of(tableMetadata.lastProperties.or(() -> table.properties()));
     Map<String, String> props = tableMetadata.lastProperties.get();
@@ -824,7 +835,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     writeLock.lock();
     try {
       TableIdentifier tid = TableIdentifier.of(dbName, tableName);
-      TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new 
TableMetadata());
+      TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new 
TableMetadata(this.conf));
       if (tableMetadata.transaction.isPresent()) {
         Transaction transaction = tableMetadata.transaction.get();
         Map<String, String> props = tableMetadata.newProperties.or(
@@ -839,7 +850,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
             log.info("Sending audit counts for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
           }
           if (tableMetadata.completenessEnabled) {
-            checkAndUpdateCompletenessWatermark(tableMetadata, topicName, 
tableMetadata.datePartitions, props);
+            updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+                tableMetadata.totalCountCompletenessEnabled);
           }
         }
         if (tableMetadata.deleteFiles.isPresent()) {
@@ -849,16 +861,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
         if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
             && tableMetadata.completenessEnabled) {
-          if (tableMetadata.completionWatermark > 
DEFAULT_COMPLETION_WATERMARK) {
-            log.info(String.format("Checking kafka audit for %s on 
change_property ", topicName));
-            SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
-            ZonedDateTime dtAtBeginningOfHour = 
ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
-            timestamps.add(dtAtBeginningOfHour);
-            checkAndUpdateCompletenessWatermark(tableMetadata, topicName, 
timestamps, props);
-          } else {
-            log.info(String.format("Need valid watermark, current watermark is 
%s, Not checking kafka audit for %s",
-                tableMetadata.completionWatermark, topicName));
-          }
+          updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, 
props);
         }
 
         //Set high waterMark
@@ -909,94 +912,36 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     }
   }
 
-  @Override
-  public void reset(String dbName, String tableName) throws IOException {
-    this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+  private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName, 
TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate) {
+    return new CompletenessWatermarkUpdater(topicName, 
this.auditCheckGranularity, this.timeZone,
+        tableMetadata, propsToUpdate, this.state, 
this.auditCountVerifier.get());
   }
 
-  /**
-   * Update TableMetadata with the new completion watermark upon a successful 
audit check
-   * @param tableMetadata metadata of table
-   * @param topic topic name
-   * @param timestamps Sorted set in reverse order of timestamps to check 
audit counts for
-   * @param props table properties map
-   */
-  private void checkAndUpdateCompletenessWatermark(TableMetadata 
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
-      Map<String, String> props) {
-    String tableName = tableMetadata.table.get().name();
-    if (topic == null) {
-      log.error(String.format("Not performing audit check. %s is null. Please 
set as table property of %s",
-          TOPIC_NAME_KEY, tableName));
-    }
-    long newCompletenessWatermark =
-        computeCompletenessWatermark(tableName, topic, timestamps, 
tableMetadata.completionWatermark);
-    if (newCompletenessWatermark > tableMetadata.completionWatermark) {
-      log.info(String.format("Updating %s for %s to %s", 
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
-          newCompletenessWatermark));
-      props.put(COMPLETION_WATERMARK_KEY, 
String.valueOf(newCompletenessWatermark));
-      props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
-      tableMetadata.completionWatermark = newCompletenessWatermark;
-    }
+  private void updateWatermarkWithFilesRegistered(String topicName, 
TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate, boolean 
includeTotalCountCompletionWatermark) {
+    getWatermarkUpdater(topicName, tableMetadata, propsToUpdate)
+        .run(tableMetadata.datePartitions, 
includeTotalCountCompletionWatermark);
   }
 
-  /**
-   * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit 
counts match
-   * for that window (aka its is set to the beginning of next window)
-   * For each timestamp in sorted collection of timestamps in descending order
-   * if timestamp is greater than previousWatermark
-   * and hour(now) > hour(prevWatermark)
-   *    check audit counts for completeness between
-   *    a source and reference tier for [timestamp -1 , timstamp unit of 
granularity]
-   *    If the audit count matches update the watermark to the timestamp and 
break
-   *    else continue
-   * else
-   *  break
-   * Using a {@link TimeIterator} that operates over a range of time in 1 unit
-   * given the start, end and granularity
-   * @param catalogDbTableName
-   * @param topicName
-   * @param timestamps a sorted set of timestamps in decreasing order
-   * @param previousWatermark previous completion watermark for the table
-   * @return updated completion watermark
-   */
-  private long computeCompletenessWatermark(String catalogDbTableName, String 
topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
-    log.info(String.format("Compute completion watermark for %s and timestamps 
%s with previous watermark %s", topicName, timestamps, previousWatermark));
-    long completionWatermark = previousWatermark;
-    ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
-    try {
-      if(timestamps == null || timestamps.size() <= 0) {
-        log.error("Cannot create time iterator. Empty for null timestamps");
-        return previousWatermark;
-      }
-      TimeIterator.Granularity granularity = 
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
-      ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
-          .atZone(ZoneId.of(this.timeZone));
-      ZonedDateTime startDT = timestamps.first();
-      ZonedDateTime endDT = timestamps.last();
-      TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, 
true);
-      while (iterator.hasNext()) {
-        ZonedDateTime timestampDT = iterator.next();
-        if (timestampDT.isAfter(prevWatermarkDT)
-            && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
-          long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
-            completionWatermark = timestampMillis;
-            // Also persist the watermark into State object to share this with 
other MetadataWriters
-            // we enforce ourselves to always use lower-cased table name here
-            String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
-            
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased), completionWatermark);
-            break;
-          }
-        } else {
-          break;
-        }
-      }
-    } catch (IOException e) {
-      log.warn("Exception during audit count check: ", e);
+  private void updateWatermarkWithNoFilesRegistered(String topicName, 
TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate) {
+    if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
+      log.info(String.format("Checking kafka audit for %s on change_property 
", topicName));
+      SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
+      ZonedDateTime dtAtBeginningOfHour = 
ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
+      timestamps.add(dtAtBeginningOfHour);
+
+      getWatermarkUpdater(topicName, tableMetadata, 
propsToUpdate).run(timestamps, true);
+    } else {
+      log.info(String.format("Need valid watermark, current watermark is %s, 
Not checking kafka audit for %s",
+          tableMetadata.completionWatermark, topicName));
     }
-    return completionWatermark;
+  }
+
+  @Override
+  public void reset(String dbName, String tableName) throws IOException {
+    this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
   }
 
   private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata 
tableMetadata, String dbName,
@@ -1031,6 +976,10 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     }
     if (tableMetadata.completenessEnabled) {
       gobblinTrackingEvent.addMetadata(COMPLETION_WATERMARK_KEY, 
Long.toString(tableMetadata.completionWatermark));
+      if (tableMetadata.totalCountCompletenessEnabled) {
+        gobblinTrackingEvent.addMetadata(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+            Long.toString(tableMetadata.totalCountCompletionWatermark));
+      }
     }
     eventSubmitter.submit(gobblinTrackingEvent);
   }
@@ -1109,7 +1058,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         Long currentOffset = 
((LongWatermark)recordEnvelope.getWatermark().getWatermark()).getValue();
 
         if (currentOffset > currentWatermark) {
-          if (!tableMetadataMap.computeIfAbsent(tid, t -> new 
TableMetadata()).lowWatermark.isPresent()) {
+          if (!tableMetadataMap.computeIfAbsent(tid, t -> new 
TableMetadata(this.conf)).lowWatermark.isPresent()) {
             //This means we haven't register this table or met some error 
before, we need to reset the low watermark
             tableMetadataMap.get(tid).lowWatermark = Optional.of(currentOffset 
- 1);
             
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
@@ -1117,6 +1066,11 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
               tableMetadataMap.get(tid).newPartitionColumnEnabled = true;
               if (this.completenessEnabled && 
this.completenessWhitelistBlacklist.acceptTable(dbName, tableName)) {
                 tableMetadataMap.get(tid).completenessEnabled = true;
+
+                if (this.totalCountCompletenessEnabled
+                    && 
this.totalCountBasedCompletenessWhitelistBlacklist.acceptTable(dbName, 
tableName)) {
+                  tableMetadataMap.get(tid).totalCountCompletenessEnabled = 
true;
+                }
               }
             }
           }
@@ -1156,7 +1110,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    *
    * Also note the difference with {@link org.apache.iceberg.TableMetadata}.
    */
-  public class TableMetadata {
+  public static class TableMetadata {
     Optional<Table> table = Optional.absent();
 
     /**
@@ -1175,18 +1129,18 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     Optional<String> lastSchemaVersion = Optional.absent();
     Optional<Long> lowWatermark = Optional.absent();
     long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
+    long totalCountCompletionWatermark = DEFAULT_COMPLETION_WATERMARK;
     SortedSet<ZonedDateTime> datePartitions = new 
TreeSet<>(Collections.reverseOrder());
     List<String> serializedAuditCountMaps = new ArrayList<>();
 
     @Setter
     String datasetName;
     boolean completenessEnabled;
+    boolean totalCountCompletenessEnabled;
     boolean newPartitionColumnEnabled;
+    Configuration conf;
 
-    Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
-        .expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, 
DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
-            TimeUnit.HOURS)
-        .build();
+    Cache<CharSequence, String> addedFiles;
     long lowestGMCEEmittedTime = Long.MAX_VALUE;
 
     /**
@@ -1240,5 +1194,13 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
       this.datePartitions.clear();
       this.serializedAuditCountMaps.clear();
     }
+
+    TableMetadata(Configuration conf) {
+      this.conf = conf;
+      addedFiles = CacheBuilder.newBuilder()
+          .expireAfterAccess(this.conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, 
DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
+              TimeUnit.HOURS)
+          .build();
+    }
   }
 }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
index 73c206d5f..6d4b7b629 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -21,9 +21,14 @@ public class IcebergMetadataWriterConfigKeys {
 
   public static final String ICEBERG_COMPLETENESS_ENABLED = 
"iceberg.completeness.enabled";
   public static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+  public static final String ICEBERG_TOTAL_COUNT_COMPLETENESS_ENABLED = 
"iceberg.completeness.totalCount.enabled";
+  public static final boolean DEFAULT_ICEBERG_TOTAL_COUNT_COMPLETENESS = false;
   public static final String ICEBERG_COMPLETENESS_WHITELIST = 
"iceberg.completeness.whitelist";
+  public static final String ICEBERG_TOTAL_COUNT_COMPLETENESS_WHITELIST = 
"iceberg.totalCount.completeness.whitelist";
   public static final String ICEBERG_COMPLETENESS_BLACKLIST = 
"iceberg.completeness.blacklist";
+  public static final String ICEBERG_TOTAL_COUNT_COMPLETENESS_BLACKLIST = 
"iceberg.totalCount.completeness.blacklist";
   public static final String COMPLETION_WATERMARK_KEY = "completionWatermark";
+  public static final String TOTAL_COUNT_COMPLETION_WATERMARK_KEY = 
"totalCountCompletionWatermark";
   public static final String COMPLETION_WATERMARK_TIMEZONE_KEY = 
"completionWatermarkTimezone";
   public static final long DEFAULT_COMPLETION_WATERMARK = -1L;
   public static final String TIME_ZONE_KEY = "iceberg.completeness.timezone";
@@ -41,8 +46,7 @@ public class IcebergMetadataWriterConfigKeys {
   public static final String ICEBERG_NEW_PARTITION_WHITELIST = 
"iceberg.new.partition.whitelist";
   public static final String ICEBERG_NEW_PARTITION_BLACKLIST = 
"iceberg.new.partition.blacklist";
   public static final String STATE_COMPLETION_WATERMARK_KEY_OF_TABLE = 
"completion.watermark.%s";
+  public static final String 
STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE = 
"totalCount.completion.watermark.%s";
   public static final String ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY = 
"iceberg.enable.custom.metadata.retention.policy";
   public static final boolean 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY = true;
-
-
 }
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdaterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdaterTest.java
new file mode 100644
index 000000000..0b7848733
--- /dev/null
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdaterTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.time.TimeIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static 
org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
+import static org.mockito.Mockito.*;
+
+
+public class CompletenessWatermarkUpdaterTest {
+  static final String TOPIC = "testTopic";
+  static final String TABLE_NAME = "testTopic_tableName";
+  static final String TIME_ZONE = "America/Los_Angeles";
+  static final String AUDIT_CHECK_GRANULARITY = "HOUR";
+
+  static final ZonedDateTime NOW = 
ZonedDateTime.now(ZoneId.of(TIME_ZONE)).truncatedTo(ChronoUnit.HOURS);
+  static final ZonedDateTime ONE_HOUR_AGO = TimeIterator.dec(NOW, 
TimeIterator.Granularity.valueOf(AUDIT_CHECK_GRANULARITY), 1);
+  static final ZonedDateTime TWO_HOUR_AGO = TimeIterator.dec(NOW, 
TimeIterator.Granularity.valueOf(AUDIT_CHECK_GRANULARITY), 2);
+  static final ZonedDateTime THREE_HOUR_AGO = TimeIterator.dec(NOW, 
TimeIterator.Granularity.valueOf(AUDIT_CHECK_GRANULARITY), 3);
+
+  @Test
+  public void testClassicWatermarkOnly() throws IOException {
+    TestParams params = createTestParams();
+
+    // Round 1: the expected completion watermark bootstraps to ONE_HOUR_AGO
+    //          total completion watermark is not enabled
+    KafkaAuditCountVerifier verifier = 
mockKafkaAuditCountVerifier(ImmutableList.of(
+        new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true /* 
isCompleteClassic */, false /* isCompleteTotalCount */),
+        new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true, 
true)));
+    CompletenessWatermarkUpdater updater =
+        new CompletenessWatermarkUpdater("testTopic", AUDIT_CHECK_GRANULARITY, 
TIME_ZONE, params.tableMetadata, params.props, params.state, verifier);
+    SortedSet<ZonedDateTime> timestamps = new 
TreeSet<>(Collections.reverseOrder());
+    timestamps.add(ONE_HOUR_AGO);
+    timestamps.add(TWO_HOUR_AGO);
+    boolean includeTotalCountCompletionWatermark = false;
+    updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+    validateCompletionWaterMark(ONE_HOUR_AGO, params);
+    validateEmptyTotalCompletionWatermark(params);
+
+    // Round 2: the expected completion watermark moves from ONE_HOUR_AGO to 
NOW
+    //          total completion watermark is not enabled
+    verifier = mockKafkaAuditCountVerifier(ImmutableList.of(
+        new AuditCountVerificationResult(ONE_HOUR_AGO, NOW, true /* 
isCompleteClassic */, false /* isCompleteTotalCount */),
+        new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true, 
true),
+        new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true, 
true)));
+    updater.setAuditCountVerifier(verifier);
+    timestamps.add(NOW);
+    updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+    validateCompletionWaterMark(NOW, params);
+    validateEmptyTotalCompletionWatermark(params);
+  }
+
+  @Test
+  public void testClassicAndTotalCountWatermark() throws IOException {
+    TestParams params = createTestParams();
+
+    // Round 1: the expected completion watermark bootstraps to ONE_HOUR_AGO
+    //          the expected total completion watermark bootstraps to 
TOW_HOUR_AGO
+    KafkaAuditCountVerifier verifier = 
mockKafkaAuditCountVerifier(ImmutableList.of(
+        new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true /* 
isCompleteClassic */, false /* isCompleteTotalCount */),
+        new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true, 
true)));
+    CompletenessWatermarkUpdater updater =
+        new CompletenessWatermarkUpdater("testTopic", AUDIT_CHECK_GRANULARITY, 
TIME_ZONE, params.tableMetadata, params.props, params.state, verifier);
+    SortedSet<ZonedDateTime> timestamps = new 
TreeSet<>(Collections.reverseOrder());
+    timestamps.add(ONE_HOUR_AGO);
+    timestamps.add(TWO_HOUR_AGO);
+    boolean includeTotalCountCompletionWatermark = true;
+    updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+    validateCompletionWaterMark(ONE_HOUR_AGO, params);
+    validateTotalCompletionWatermark(TWO_HOUR_AGO, params);
+
+    // Round 2: the expected completion watermark moves from ONE_HOUR_AGO to 
NOW
+    //          the expected total completion watermark moves from 
TOW_HOUR_AGO to ONE_HOUR_AGO
+    verifier = mockKafkaAuditCountVerifier(ImmutableList.of(
+        new AuditCountVerificationResult(ONE_HOUR_AGO, NOW, true /* 
isCompleteClassic */, false /* isCompleteTotalCount */),
+        new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true, 
true),
+        new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true, 
true)));
+    updater.setAuditCountVerifier(verifier);
+    timestamps.add(NOW);
+    updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+    validateCompletionWaterMark(NOW, params);
+    validateTotalCompletionWatermark(ONE_HOUR_AGO, params);
+}
+
+  static void validateCompletionWaterMark(ZonedDateTime expectedDT, TestParams 
params) {
+    long expected = expectedDT.toInstant().toEpochMilli();
+
+    // 1. assert updated tableMetadata.completionWatermark
+    Assert.assertEquals(params.tableMetadata.completionWatermark, expected);
+    // 2. assert updated property
+    Assert.assertEquals(params.props.get(COMPLETION_WATERMARK_KEY), 
String.valueOf(expected));
+    Assert.assertEquals(params.props.get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
TIME_ZONE);
+    // 3. assert updated state
+    String watermarkKey = 
String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
+        params.tableMetadata.table.get().name().toLowerCase(Locale.ROOT));
+    Assert.assertEquals(params.state.getProp(watermarkKey), 
String.valueOf(expected));
+  }
+
+  static void validateTotalCompletionWatermark(ZonedDateTime expectedDT, 
TestParams params) {
+    long expected = expectedDT.toInstant().toEpochMilli();
+
+    // 1. expect updated tableMetadata.totalCountCompletionWatermark
+    Assert.assertEquals(params.tableMetadata.totalCountCompletionWatermark, 
expected);
+    // 2. expect updated property
+    
Assert.assertEquals(params.props.get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY), 
String.valueOf(expected));
+    // 3. expect updated state
+    String totalCountWatermarkKey = 
String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
+        params.tableMetadata.table.get().name().toLowerCase(Locale.ROOT));
+    Assert.assertEquals(params.state.getProp(totalCountWatermarkKey), 
String.valueOf(expected));
+  }
+
+  static void validateEmptyTotalCompletionWatermark(TestParams params) {
+    Assert.assertEquals(params.tableMetadata.totalCountCompletionWatermark, 
DEFAULT_COMPLETION_WATERMARK);
+    Assert.assertNull(params.props.get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
+    String totalCountWatermarkKey = 
String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
+        params.tableMetadata.table.get().name().toLowerCase(Locale.ROOT));
+    Assert.assertNull(params.state.getProp(totalCountWatermarkKey));
+  }
+
+ static class TestParams {
+    IcebergMetadataWriter.TableMetadata tableMetadata;
+    Map<String, String> props;
+    State state;
+ }
+
+ static TestParams createTestParams() throws IOException {
+   TestParams params = new TestParams();
+   params.tableMetadata = new IcebergMetadataWriter.TableMetadata(new 
Configuration());
+
+   Table table = mock(Table.class);
+   when(table.name()).thenReturn(TABLE_NAME);
+   params.tableMetadata.table = Optional.of(table);
+
+   params.props = new HashMap<>();
+   params.state = new State();
+
+   return params;
+ }
+
+ static class AuditCountVerificationResult {
+   AuditCountVerificationResult(ZonedDateTime start, ZonedDateTime end, 
boolean isCompleteClassic, boolean isCompleteTotalCount) {
+     this.start = start;
+     this.end = end;
+     this.isCompleteClassic = isCompleteClassic;
+     this.isCompleteTotalCount = isCompleteTotalCount;
+   }
+   ZonedDateTime start;
+   ZonedDateTime end;
+   boolean isCompleteClassic;
+   boolean isCompleteTotalCount;
+ }
+
+ static KafkaAuditCountVerifier 
mockKafkaAuditCountVerifier(List<AuditCountVerificationResult> resultsToMock)
+     throws IOException {
+   KafkaAuditCountVerifier verifier = 
mock(IcebergMetadataWriterTest.TestAuditCountVerifier.class);
+   for (AuditCountVerificationResult result : resultsToMock) {
+     Mockito.when(verifier.calculateCompleteness(TOPIC, 
result.start.toInstant().toEpochMilli(), result.end.toInstant().toEpochMilli()))
+         .thenReturn(ImmutableMap.of(
+             KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness, 
result.isCompleteClassic,
+             KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, 
result.isCompleteTotalCount));
+   }
+   return verifier;
+ }
+}
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index e5819d6ba..6feb8adae 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -194,6 +194,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     State state = getState();
     state.setProp(ICEBERG_NEW_PARTITION_ENABLED, true);
     state.setProp(ICEBERG_COMPLETENESS_ENABLED, true);
+    state.setProp(ICEBERG_TOTAL_COUNT_COMPLETENESS_ENABLED, true);
     state.setProp(NEW_PARTITION_KEY, "late");
     state.setProp(NEW_PARTITION_TYPE_KEY, "int");
     state.setProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY, 
TestAuditClientFactory.class.getName());
@@ -223,6 +224,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
     Assert.assertEquals(table.location(),
         new File(tmpDir, 
"testDB/testTopic/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+    
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+    
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
 
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "1000-2000").build());
     GenericRecord genericGmce_1000_2000 = 
GenericData.get().deepCopy(gmce.getSchema(), gmce);
@@ -238,6 +241,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     // Assert low watermark and high watermark set properly
     
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "9");
     
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "20");
+    
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+    
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
 
     /*test flush twice*/
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "2000-3000").build());
@@ -257,6 +262,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
     
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "20");
     
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "30");
+    
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+    
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
 
     /* Test it will skip event with lower watermark*/
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "3000-4000").build());
@@ -268,6 +275,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-3000");
     
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
+    
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+    
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
   }
 
   //Make sure hive test execute later and close the metastore
@@ -420,7 +429,9 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
     // Test when completeness watermark = -1 bootstrap case
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
-    Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis 
- TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+    Mockito.when(verifier.calculateCompleteness("testTopicCompleteness", 
timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis))
+        
.thenReturn(ImmutableMap.of(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
 true,
+                                    
KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, true));
     IcebergMetadataWriter imw = (IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
     imw.setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
@@ -429,8 +440,10 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis));
+    
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
 String.valueOf(timestampMillis));
     // 1631811600000L correspond to 2020-09-16-10 in PT
     
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
 table.name().toLowerCase(Locale.ROOT))), 1631811600000L);
+    
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
 table.name().toLowerCase(Locale.ROOT))), 1631811600000L);
 
     Iterator<org.apache.iceberg.DataFile> dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile.getAbsolutePath())).collect().iterator();
     Assert.assertTrue(dfl.hasNext());
@@ -453,6 +466,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis));
+    
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
 String.valueOf(timestampMillis));
 
     dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile1.getAbsolutePath())).collect().iterator();
     Assert.assertTrue(dfl.hasNext());
@@ -475,12 +489,17 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(60L))));
 
-    Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis1 
- TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+    Mockito.when(verifier.calculateCompleteness("testTopicCompleteness", 
timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1))
+        
.thenReturn(ImmutableMap.of(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
 true,
+                                    
KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, true));
+
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis1));
+    
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
 String.valueOf(timestampMillis1));
     // watermark 1631815200000L correspond to 2021-09-16-11 in PT
     
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
 table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
+    
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
 table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
 
     dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile2.getAbsolutePath())).collect().iterator();
     Assert.assertTrue(dfl.hasNext());
@@ -511,7 +530,10 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
     // For quiet topics always check for previous hour window
-    Mockito.when(verifier.isComplete("testTopicCompleteness", 
expectedCWDt.minusHours(1).toInstant().toEpochMilli(), 
expectedWatermark)).thenReturn(true);
+    Mockito.when(verifier.calculateCompleteness("testTopicCompleteness", 
expectedCWDt.minusHours(1).toInstant().toEpochMilli(), expectedWatermark))
+        
.thenReturn(ImmutableMap.of(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
 true,
+                    
KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, true));
+
     ((IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
 
@@ -521,7 +543,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(expectedWatermark));
-
+    
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
 String.valueOf(expectedWatermark));
   }
 
   private String writeRecord(File file) throws IOException {

Reply via email to