This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ba408f23c [GOBBLIN-1838] Introduce total count based completion
watermark (#3701)
ba408f23c is described below
commit ba408f23c89357bc36ee70415041c2cff2b36df0
Author: Tao Qin <[email protected]>
AuthorDate: Mon Jul 24 12:25:20 2023 -0700
[GOBBLIN-1838] Introduce total count based completion watermark (#3701)
* [GOBBLIN-1838] Introduce total count based completion watermark
* Refactor and address comments
* fix style
* combine completeness computation
* Fix the logic for quiet topics
* Add UT for ComopletenessWatermarkUpdater
* space only change
* refine
---------
Co-authored-by: Tao Qin <[email protected]>
---
.../verifier/KafkaAuditCountVerifier.java | 124 +++++++--
.../verifier/KafkaAuditCountVerifierTest.java | 75 +++++-
.../writer/CompletenessWatermarkUpdater.java | 284 +++++++++++++++++++++
.../iceberg/writer/IcebergMetadataWriter.java | 178 +++++--------
.../writer/IcebergMetadataWriterConfigKeys.java | 8 +-
.../writer/CompletenessWatermarkUpdaterTest.java | 205 +++++++++++++++
.../iceberg/writer/IcebergMetadataWriterTest.java | 30 ++-
7 files changed, 761 insertions(+), 143 deletions(-)
diff --git
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index d4495f9eb..77d89ff31 100644
---
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.completeness.verifier;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import com.google.common.base.Preconditions;
@@ -43,14 +44,22 @@ public class KafkaAuditCountVerifier {
public static final String COMPLETENESS_PREFIX = "completeness.";
public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX +
"reference.tiers";
+ public static final String TOTAL_COUNT_REFERENCE_TIERS = COMPLETENESS_PREFIX
+ "totalCount.reference.tiers";
public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
private static final double DEFAULT_THRESHOLD = 0.999;
public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX +
"complete.on.no.counts";
+
+ public enum CompletenessType {
+ ClassicCompleteness,
+ TotalCountCompleteness
+ }
+
private final boolean returnCompleteOnNoCounts;
private final AuditCountClient auditCountClient;
private final String srcTier;
private final Collection<String> refTiers;
+ private final Collection<String> totalCountRefTiers;
private final double threshold;
/**
@@ -69,6 +78,9 @@ public class KafkaAuditCountVerifier {
state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
this.srcTier = state.getProp(SOURCE_TIER);
this.refTiers =
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+ this.totalCountRefTiers = state.contains(TOTAL_COUNT_REFERENCE_TIERS)
+ ?
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(TOTAL_COUNT_REFERENCE_TIERS))
+ : null;
this.returnCompleteOnNoCounts =
state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
}
@@ -90,23 +102,52 @@ public class KafkaAuditCountVerifier {
}
}
+ public Map<CompletenessType, Boolean> calculateCompleteness(String
datasetName, long beginInMillis, long endInMillis)
+ throws IOException {
+ return calculateCompleteness(datasetName, beginInMillis, endInMillis,
this.threshold);
+ }
+
/**
* Compare source tier against reference tiers.
- * Compute completion percentage by srcCount/refCount. Return true iff the
highest percentages is greater than threshold.
+ * Compute completion percentage which is true iff the calculated
percentages is greater than threshold.
*
* @param datasetName A dataset short name like 'PageViewEvent'
* @param beginInMillis Unix timestamp in milliseconds
* @param endInMillis Unix timestamp in milliseconds
* @param threshold User defined threshold
+ *
+ * @return a map of completeness result by CompletenessType
*/
- public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis, double threshold)
- throws IOException {
- return getCompletenessPercentage(datasetName, beginInMillis, endInMillis)
> threshold;
+ public Map<CompletenessType, Boolean> calculateCompleteness(String
datasetName, long beginInMillis, long endInMillis,
+ double threshold) throws IOException {
+ Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
+ log.info(String.format("checkTierCounts: audit counts map for %s for range
[%s,%s]", datasetName, beginInMillis, endInMillis));
+ countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+
+ Map<CompletenessType, Boolean> result = new HashMap<>();
+ result.put(CompletenessType.ClassicCompleteness,
calculateCompleteness(datasetName, beginInMillis, endInMillis,
+ CompletenessType.ClassicCompleteness, countsByTier) > threshold);
+ result.put(CompletenessType.TotalCountCompleteness,
calculateCompleteness(datasetName, beginInMillis, endInMillis,
+ CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
+ return result;
}
- public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis)
- throws IOException {
- return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+ private double calculateCompleteness(String datasetName, long beginInMillis,
long endInMillis, CompletenessType type,
+ Map<String, Long> countsByTier) throws IOException {
+ if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
+ log.info(String.format("Found empty counts map for %s, returning
complete", datasetName));
+ return 1.0;
+ }
+
+ switch (type) {
+ case ClassicCompleteness:
+ return calculateClassicCompleteness(datasetName, beginInMillis,
endInMillis, countsByTier);
+ case TotalCountCompleteness:
+ return calculateTotalCountCompleteness(datasetName, beginInMillis,
endInMillis, countsByTier);
+ default:
+ log.error("Skip unsupported completeness type {}", type);
+ return -1;
+ }
}
/**
@@ -118,39 +159,70 @@ public class KafkaAuditCountVerifier {
*
* @return The highest percentage value
*/
- private double getCompletenessPercentage(String datasetName, long
beginInMillis, long endInMillis) throws IOException {
- Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
- log.info(String.format("Audit counts map for %s for range [%s,%s]",
datasetName, beginInMillis, endInMillis));
- countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
- if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
- log.info(String.format("Found empty counts map for %s, returning
complete", datasetName));
- return 1.0;
- }
- double percent = -1;
- if (!countsByTier.containsKey(this.srcTier)) {
- throw new IOException(String.format("Source tier %s audit count cannot
be retrieved for dataset %s between %s and %s", this.srcTier, datasetName,
beginInMillis, endInMillis));
- }
+ private double calculateClassicCompleteness(String datasetName, long
beginInMillis, long endInMillis,
+ Map<String, Long> countsByTier) throws IOException {
+ validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier,
this.srcTier, this.refTiers);
+ double percent = -1;
for (String refTier: this.refTiers) {
- if (!countsByTier.containsKey(refTier)) {
- throw new IOException(String.format("Reference tier %s audit count
cannot be retrieved for dataset %s between %s and %s", refTier, datasetName,
beginInMillis, endInMillis));
- }
long refCount = countsByTier.get(refTier);
- if(refCount <= 0) {
- throw new IOException(String.format("Reference tier %s count cannot be
less than or equal to zero", refTier));
- }
long srcCount = countsByTier.get(this.srcTier);
-
percent = Double.max(percent, (double) srcCount / (double) refCount);
}
if (percent < 0) {
throw new IOException("Cannot calculate completion percentage");
}
+ return percent;
+ }
+ /**
+ * Check total count based completeness by comparing source tier against
reference tiers,
+ * and calculate the completion percentage by srcCount/sum_of(refCount).
+ *
+ * @param datasetName A dataset short name like 'PageViewEvent'
+ * @param beginInMillis Unix timestamp in milliseconds
+ * @param endInMillis Unix timestamp in milliseconds
+ *
+ * @return The percentage value by srcCount/sum_of(refCount)
+ */
+ private double calculateTotalCountCompleteness(String datasetName, long
beginInMillis, long endInMillis,
+ Map<String, Long> countsByTier) throws IOException {
+ if (this.totalCountRefTiers == null) {
+ return -1;
+ }
+ validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier,
this.srcTier, this.totalCountRefTiers);
+
+ long srcCount = countsByTier.get(this.srcTier);
+ long totalRefCount = this.totalCountRefTiers
+ .stream()
+ .mapToLong(countsByTier::get)
+ .sum();
+ double percent = Double.max(-1, (double) srcCount / (double)
totalRefCount);
+ if (percent < 0) {
+ throw new IOException("Cannot calculate total count completion
percentage");
+ }
return percent;
}
+ private static void validateTierCounts(String datasetName, long
beginInMillis, long endInMillis, Map<String, Long> countsByTier,
+ String sourceTier, Collection<String> referenceTiers)
+ throws IOException {
+ if (!countsByTier.containsKey(sourceTier)) {
+ throw new IOException(String.format("Source tier %s audit count cannot
be retrieved for dataset %s between %s and %s", sourceTier, datasetName,
beginInMillis, endInMillis));
+ }
+
+ for (String refTier: referenceTiers) {
+ if (!countsByTier.containsKey(refTier)) {
+ throw new IOException(String.format("Reference tier %s audit count
cannot be retrieved for dataset %s between %s and %s", refTier, datasetName,
beginInMillis, endInMillis));
+ }
+ long refCount = countsByTier.get(refTier);
+ if(refCount <= 0) {
+ throw new IOException(String.format("Reference tier %s count cannot be
less than or equal to zero", refTier));
+ }
+ }
+ }
+
/**
* Fetch all <tier-count> pairs for a given dataset between a time range
*/
diff --git
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index 7a005c6c2..3e7140f41 100644
---
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -33,6 +33,11 @@ public class KafkaAuditCountVerifierTest {
public static final String SOURCE_TIER = "gobblin";
public static final String REFERENCE_TIERS = "producer";
+ public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
+ public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
+ public static final String TOTAL_COUNT_REFERENCE_TIERS =
TOTAL_COUNT_REF_TIER_0 + "," + TOTAL_COUNT_REF_TIER_1;
+
+
public void testFetch() throws IOException {
final String topic = "testTopic";
State props = new State();
@@ -48,22 +53,86 @@ public class KafkaAuditCountVerifierTest {
REFERENCE_TIERS, 1000L
));
// Default threshold
- Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L
));
- Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 1000L
));
- Assert.assertFalse(verifier.isComplete(topic, 0L, 0L));
+ Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+ }
+
+ public void testTotalCountCompleteness() throws IOException {
+ final String topic = "testTopic";
+ State props = new State();
+ props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+ props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS,
TOTAL_COUNT_REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+ TestAuditClient client = new TestAuditClient(props);
+ KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props,
client);
+
+ // All complete
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 1000L,
+ REFERENCE_TIERS, 1000L,
+ TOTAL_COUNT_REF_TIER_0, 600L,
+ TOTAL_COUNT_REF_TIER_1, 400L
+ ));
+ // Default threshold
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+
+ // 99.999 % complete
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 999L,
+ REFERENCE_TIERS, 1000L,
+ TOTAL_COUNT_REF_TIER_0, 600L,
+ TOTAL_COUNT_REF_TIER_1, 400L
+ ));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+
+ // <= 99% complete
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 990L,
+ REFERENCE_TIERS, 1000L,
+ TOTAL_COUNT_REF_TIER_0, 600L,
+ TOTAL_COUNT_REF_TIER_1, 400L
+ ));
+ Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}
+ public void testEmptyAuditCount() throws IOException {
+ final String topic = "testTopic";
+ State props = new State();
+ props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+ props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS,
TOTAL_COUNT_REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+ props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
+ TestAuditClient client = new TestAuditClient(props);
+ KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props,
client);
+ // Client gets empty audit count
+ client.setTierCounts(ImmutableMap.of());
+
+ // Should be complete, since COMPLETE_ON_NO_COUNTS=true
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+ }
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
new file mode 100644
index 000000000..b022ac339
--- /dev/null
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedSet;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.time.TimeIterator;
+
+import static
org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
+
+/**
+ * A class for completeness watermark updater.
+ * It computes the new watermarks and updates below entities:
+ * 1. the properties in {@link IcebergMetadataWriter.TableMetadata}
+ * 2. {@link gobblin.configuration.State}
+ * 3. the completionWatermark in {@link IcebergMetadataWriter.TableMetadata}
+ */
+@Slf4j
+public class CompletenessWatermarkUpdater {
+ private final String topic;
+ private final String auditCheckGranularity;
+
+ protected final String timeZone;
+ protected final IcebergMetadataWriter.TableMetadata tableMetadata;
+ protected final Map<String, String> propsToUpdate;
+ protected final State stateToUpdate;
+ protected KafkaAuditCountVerifier auditCountVerifier;
+
+ public CompletenessWatermarkUpdater(String topic, String
auditCheckGranularity, String timeZone,
+ IcebergMetadataWriter.TableMetadata tableMetadata, Map<String, String>
propsToUpdate, State stateToUpdate,
+ KafkaAuditCountVerifier auditCountVerifier) {
+ this.tableMetadata = tableMetadata;
+ this.topic = topic;
+ this.auditCheckGranularity = auditCheckGranularity;
+ this.timeZone = timeZone;
+ this.propsToUpdate = propsToUpdate;
+ this.stateToUpdate = stateToUpdate;
+ this.auditCountVerifier = auditCountVerifier;
+ }
+
+ /**
+ * Update TableMetadata with the new completion watermark upon a successful
audit check
+ * @param timestamps Sorted set in reverse order of timestamps to check
audit counts for
+ * @param includeTotalCountCompletionWatermark If
totalCountCompletionWatermark should be calculated
+ */
+ public void run(SortedSet<ZonedDateTime> timestamps, boolean
includeTotalCountCompletionWatermark) {
+ String tableName = tableMetadata.table.get().name();
+ if (this.topic == null) {
+ log.error(String.format("Not performing audit check. %s is null. Please
set as table property of %s",
+ TOPIC_NAME_KEY, tableName));
+ }
+ computeAndUpdateWatermark(tableName, timestamps,
includeTotalCountCompletionWatermark);
+ }
+
+ private void computeAndUpdateWatermark(String tableName,
SortedSet<ZonedDateTime> timestamps,
+ boolean includeTotalCountWatermark) {
+ log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s, previous totalCount watermark %s,
includeTotalCountWatermark=%b",
+ this.topic, timestamps, tableMetadata.completionWatermark,
tableMetadata.totalCountCompletionWatermark,
+ includeTotalCountWatermark));
+
+ WatermarkUpdaterSet updaterSet = new
WatermarkUpdaterSet(this.tableMetadata, this.timeZone, this.propsToUpdate,
+ this.stateToUpdate, includeTotalCountWatermark);
+ if(timestamps == null || timestamps.size() <= 0) {
+ log.error("Cannot create time iterator. Empty for null timestamps");
+ return;
+ }
+
+ ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
+ TimeIterator.Granularity granularity =
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
+ ZonedDateTime startDT = timestamps.first();
+ ZonedDateTime endDT = timestamps.last();
+ TimeIterator iterator = new TimeIterator(startDT, endDT, granularity,
true);
+ try {
+ while (iterator.hasNext()) {
+ ZonedDateTime timestampDT = iterator.next();
+ updaterSet.checkForEarlyStop(timestampDT, now, granularity);
+ if (updaterSet.allFinished()) {
+ break;
+ }
+
+ ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
+ Map<KafkaAuditCountVerifier.CompletenessType, Boolean> results =
+ this.auditCountVerifier.calculateCompleteness(this.topic,
+ auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
+ timestampDT.toInstant().toEpochMilli());
+
+ updaterSet.computeAndUpdate(results, timestampDT);
+ }
+ } catch (IOException e) {
+ log.warn("Exception during audit count check: ", e);
+ }
+ }
+
+ /**
+ * A class that contains both ClassicWatermakrUpdater and
TotalCountWatermarkUpdater
+ */
+ static class WatermarkUpdaterSet {
+ private final List<WatermarkUpdater> updaters;
+
+ WatermarkUpdaterSet(IcebergMetadataWriter.TableMetadata tableMetadata,
String timeZone,
+ Map<String, String> propsToUpdate, State stateToUpdate, boolean
includeTotalCountWatermark) {
+ this.updaters = new ArrayList<>();
+ this.updaters.add(new
ClassicWatermarkUpdater(tableMetadata.completionWatermark, timeZone,
tableMetadata,
+ propsToUpdate, stateToUpdate));
+ if (includeTotalCountWatermark) {
+ this.updaters.add(new
TotalCountWatermarkUpdater(tableMetadata.totalCountCompletionWatermark,
timeZone,
+ tableMetadata, propsToUpdate, stateToUpdate));
+ }
+ }
+
+ void checkForEarlyStop(ZonedDateTime timestampDT, ZonedDateTime now,
+ TimeIterator.Granularity granularity) {
+ this.updaters.stream().forEach(updater
+ -> updater.checkForEarlyStop(timestampDT, now, granularity));
+ }
+
+ boolean allFinished() {
+ return this.updaters.stream().allMatch(updater -> updater.isFinished());
+ }
+
+ void computeAndUpdate(Map<KafkaAuditCountVerifier.CompletenessType,
Boolean> results,
+ ZonedDateTime timestampDT) {
+ this.updaters.stream()
+ .filter(updater -> !updater.isFinished())
+ .forEach(updater -> updater.computeAndUpdate(results, timestampDT));
+ }
+ }
+
+ /**
+ * A stateful class for watermark updaters.
+ * The updater starts with finished=false state.
+ * Then computeAndUpdate() is called multiple times with the parameters:
+ * 1. The completeness audit results within (datepartition-1, datepartition)
+ * 2. the datepartition timestamp
+ * The method is call multiple times in descending order of the
datepartition timestamp.
+ * <p>
+ * When the audit result is complete for a timestamp, it updates below
entities:
+ * 1. the properties in {@link IcebergMetadataWriter.TableMetadata}
+ * 2. {@link gobblin.configuration.State}
+ * 3. the completionWatermark in {@link
IcebergMetadataWriter.TableMetadata}
+ * And it turns into finished=true state, in which the following
computeAndUpdate() calls will be skipped.
+ */
+ static abstract class WatermarkUpdater {
+ protected final long previousWatermark;
+ protected final ZonedDateTime prevWatermarkDT;
+ protected final String timeZone;
+ protected boolean finished = false;
+ protected final IcebergMetadataWriter.TableMetadata tableMetadata;
+ protected final Map<String, String> propsToUpdate;
+ protected final State stateToUpdate;
+
+ public WatermarkUpdater(long previousWatermark, String timeZone,
IcebergMetadataWriter.TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate, State stateToUpdate) {
+ this.previousWatermark = previousWatermark;
+ this.timeZone = timeZone;
+ this.tableMetadata = tableMetadata;
+ this.propsToUpdate = propsToUpdate;
+ this.stateToUpdate = stateToUpdate;
+
+ prevWatermarkDT =
Instant.ofEpochMilli(previousWatermark).atZone(ZoneId.of(this.timeZone));
+ }
+
+ public void computeAndUpdate(Map<KafkaAuditCountVerifier.CompletenessType,
Boolean> results,
+ ZonedDateTime timestampDT) {
+ if (finished) {
+ return;
+ }
+ computeAndUpdateInternal(results, timestampDT);
+ }
+
+ protected abstract void
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean>
results,
+ ZonedDateTime timestampDT);
+
+ protected boolean isFinished() {
+ return this.finished;
+ }
+
+ protected void setFinished() {
+ this.finished = true;
+ }
+
+ protected void checkForEarlyStop(ZonedDateTime timestampDT, ZonedDateTime
now,
+ TimeIterator.Granularity granularity) {
+ if (isFinished()
+ || (timestampDT.isAfter(this.prevWatermarkDT)
+ && TimeIterator.durationBetween(this.prevWatermarkDT, now,
granularity) > 0)) {
+ return;
+ }
+ setFinished();
+ }
+ }
+
+ @VisibleForTesting
+ void setAuditCountVerifier(KafkaAuditCountVerifier auditCountVerifier) {
+ this.auditCountVerifier = auditCountVerifier;
+ }
+
+ static class ClassicWatermarkUpdater extends WatermarkUpdater {
+ public ClassicWatermarkUpdater(long previousWatermark, String timeZone,
+ IcebergMetadataWriter.TableMetadata tableMetadata, Map<String, String>
propsToUpdate, State stateToUpdate) {
+ super(previousWatermark, timeZone, tableMetadata, propsToUpdate,
stateToUpdate);
+ }
+
+ @Override
+ protected void
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean>
results,
+ ZonedDateTime timestampDT) {
+ if
(!results.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness)) {
+ return;
+ }
+
+ setFinished();
+ long updatedWatermark = timestampDT.toInstant().toEpochMilli();
+ this.stateToUpdate.setProp(
+ String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
+ this.tableMetadata.table.get().name().toLowerCase(Locale.ROOT)),
+ updatedWatermark);
+
+ if (updatedWatermark > this.previousWatermark) {
+ log.info(String.format("Updating %s for %s to %s",
COMPLETION_WATERMARK_KEY,
+ this.tableMetadata.table.get().name(), updatedWatermark));
+ this.propsToUpdate.put(COMPLETION_WATERMARK_KEY,
String.valueOf(updatedWatermark));
+ this.propsToUpdate.put(COMPLETION_WATERMARK_TIMEZONE_KEY,
this.timeZone);
+
+ this.tableMetadata.completionWatermark = updatedWatermark;
+ }
+ }
+ }
+
+ static class TotalCountWatermarkUpdater extends WatermarkUpdater {
+ public TotalCountWatermarkUpdater(long previousWatermark, String timeZone,
+ IcebergMetadataWriter.TableMetadata tableMetadata, Map<String, String>
propsToUpdate, State stateToUpdate) {
+ super(previousWatermark, timeZone, tableMetadata, propsToUpdate,
stateToUpdate);
+ }
+
+ @Override
+ protected void
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean>
results,
+ ZonedDateTime timestampDT) {
+ if
(!results.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness))
{
+ return;
+ }
+
+ setFinished();
+ long updatedWatermark = timestampDT.toInstant().toEpochMilli();
+ this.stateToUpdate.setProp(
+ String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
+ this.tableMetadata.table.get().name().toLowerCase(Locale.ROOT)),
+ updatedWatermark);
+
+ if (updatedWatermark > previousWatermark) {
+ log.info(String.format("Updating %s for %s to %s",
TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+ this.tableMetadata.table.get().name(), updatedWatermark));
+ this.propsToUpdate.put(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
String.valueOf(updatedWatermark));
+ tableMetadata.totalCountCompletionWatermark = updatedWatermark;
+ }
+ }
+ }
+
+}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index b6c9e15dc..19bc805eb 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -32,7 +32,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@@ -122,7 +121,6 @@ import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
@@ -168,7 +166,9 @@ public class IcebergMetadataWriter implements
MetadataWriter {
private static final String ICEBERG_FILE_PATH_COLUMN =
DataFile.FILE_PATH.name();
private final boolean completenessEnabled;
+ private final boolean totalCountCompletenessEnabled;
private final WhitelistBlacklist completenessWhitelistBlacklist;
+ private final WhitelistBlacklist
totalCountBasedCompletenessWhitelistBlacklist;
private final String timeZone;
private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
private final String newPartitionColumn;
@@ -234,8 +234,13 @@ public class IcebergMetadataWriter implements
MetadataWriter {
FsPermission.getDefault());
}
this.completenessEnabled =
state.getPropAsBoolean(ICEBERG_COMPLETENESS_ENABLED,
DEFAULT_ICEBERG_COMPLETENESS);
+ this.totalCountCompletenessEnabled =
state.getPropAsBoolean(ICEBERG_TOTAL_COUNT_COMPLETENESS_ENABLED,
+ DEFAULT_ICEBERG_TOTAL_COUNT_COMPLETENESS);
this.completenessWhitelistBlacklist = new
WhitelistBlacklist(state.getProp(ICEBERG_COMPLETENESS_WHITELIST, ""),
state.getProp(ICEBERG_COMPLETENESS_BLACKLIST, ""));
+ this.totalCountBasedCompletenessWhitelistBlacklist = new
WhitelistBlacklist(
+ state.getProp(ICEBERG_TOTAL_COUNT_COMPLETENESS_WHITELIST, ""),
+ state.getProp(ICEBERG_TOTAL_COUNT_COMPLETENESS_BLACKLIST, ""));
this.timeZone = state.getProp(TIME_ZONE_KEY, DEFAULT_TIME_ZONE);
this.HOURLY_DATEPARTITION_FORMAT =
DateTimeFormatter.ofPattern(DATEPARTITION_FORMAT)
.withZone(ZoneId.of(this.timeZone));
@@ -258,7 +263,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws
NoSuchTableException {
- TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata(this.conf));
if (!tableMetadata.table.isPresent()) {
tableMetadata.table = Optional.of(catalog.loadTable(tid));
}
@@ -304,7 +309,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
public void write(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec)
throws IOException {
TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName());
- TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata(this.conf));
Table table;
try {
table = getIcebergTable(tid);
@@ -327,6 +332,12 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if(tableMetadata.completenessEnabled) {
tableMetadata.completionWatermark =
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+
+ if (tableMetadata.totalCountCompletenessEnabled) {
+ tableMetadata.totalCountCompletionWatermark = Long.parseLong(
+
table.properties().getOrDefault(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+ String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+ }
}
computeCandidateSchema(gmce, tid, tableSpec);
@@ -392,7 +403,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
* the given {@link TableIdentifier} with the input {@link
GobblinMetadataChangeEvent}
*/
private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier
tid) {
- TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata(this.conf));
tableMetadata.dataOffsetRange =
Optional.of(tableMetadata.dataOffsetRange.or(() ->
getLastOffset(tableMetadata)));
Map<String, List<Range>> offsets = tableMetadata.dataOffsetRange.get();
for (Map.Entry<String, String> entry :
gmce.getTopicPartitionOffsetsRange().entrySet()) {
@@ -424,7 +435,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
protected void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid,
GobblinMetadataChangeEvent gmce) {
org.apache.hadoop.hive.metastore.api.Table table =
HiveMetaStoreUtils.getTable(tableSpec.getTable());
- TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata(this.conf));
tableMetadata.newProperties =
Optional.of(IcebergUtils.getTableProperties(table));
String nativeName = tableMetadata.datasetName;
String topic = nativeName.substring(nativeName.lastIndexOf("/") + 1);
@@ -442,7 +453,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
*/
private void computeCandidateSchema(GobblinMetadataChangeEvent gmce,
TableIdentifier tid, HiveSpec spec) {
Table table = getIcebergTable(tid);
- TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata(this.conf));
org.apache.hadoop.hive.metastore.api.Table hiveTable =
HiveMetaStoreUtils.getTable(spec.getTable());
tableMetadata.lastProperties =
Optional.of(tableMetadata.lastProperties.or(() -> table.properties()));
Map<String, String> props = tableMetadata.lastProperties.get();
@@ -824,7 +835,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
writeLock.lock();
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
- TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata());
+ TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata(this.conf));
if (tableMetadata.transaction.isPresent()) {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
@@ -839,7 +850,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
if (tableMetadata.completenessEnabled) {
- checkAndUpdateCompletenessWatermark(tableMetadata, topicName,
tableMetadata.datePartitions, props);
+ updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
@@ -849,16 +861,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
// The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
- if (tableMetadata.completionWatermark >
DEFAULT_COMPLETION_WATERMARK) {
- log.info(String.format("Checking kafka audit for %s on
change_property ", topicName));
- SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
- ZonedDateTime dtAtBeginningOfHour =
ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
- timestamps.add(dtAtBeginningOfHour);
- checkAndUpdateCompletenessWatermark(tableMetadata, topicName,
timestamps, props);
- } else {
- log.info(String.format("Need valid watermark, current watermark is
%s, Not checking kafka audit for %s",
- tableMetadata.completionWatermark, topicName));
- }
+ updateWatermarkWithNoFilesRegistered(topicName, tableMetadata,
props);
}
//Set high waterMark
@@ -909,94 +912,36 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
}
- @Override
- public void reset(String dbName, String tableName) throws IOException {
- this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+ private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName,
TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate) {
+ return new CompletenessWatermarkUpdater(topicName,
this.auditCheckGranularity, this.timeZone,
+ tableMetadata, propsToUpdate, this.state,
this.auditCountVerifier.get());
}
- /**
- * Update TableMetadata with the new completion watermark upon a successful
audit check
- * @param tableMetadata metadata of table
- * @param topic topic name
- * @param timestamps Sorted set in reverse order of timestamps to check
audit counts for
- * @param props table properties map
- */
- private void checkAndUpdateCompletenessWatermark(TableMetadata
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
- Map<String, String> props) {
- String tableName = tableMetadata.table.get().name();
- if (topic == null) {
- log.error(String.format("Not performing audit check. %s is null. Please
set as table property of %s",
- TOPIC_NAME_KEY, tableName));
- }
- long newCompletenessWatermark =
- computeCompletenessWatermark(tableName, topic, timestamps,
tableMetadata.completionWatermark);
- if (newCompletenessWatermark > tableMetadata.completionWatermark) {
- log.info(String.format("Updating %s for %s to %s",
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
- newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_KEY,
String.valueOf(newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
- tableMetadata.completionWatermark = newCompletenessWatermark;
- }
+ private void updateWatermarkWithFilesRegistered(String topicName,
TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate, boolean
includeTotalCountCompletionWatermark) {
+ getWatermarkUpdater(topicName, tableMetadata, propsToUpdate)
+ .run(tableMetadata.datePartitions,
includeTotalCountCompletionWatermark);
}
- /**
- * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit
counts match
- * for that window (aka its is set to the beginning of next window)
- * For each timestamp in sorted collection of timestamps in descending order
- * if timestamp is greater than previousWatermark
- * and hour(now) > hour(prevWatermark)
- * check audit counts for completeness between
- * a source and reference tier for [timestamp -1 , timstamp unit of
granularity]
- * If the audit count matches update the watermark to the timestamp and
break
- * else continue
- * else
- * break
- * Using a {@link TimeIterator} that operates over a range of time in 1 unit
- * given the start, end and granularity
- * @param catalogDbTableName
- * @param topicName
- * @param timestamps a sorted set of timestamps in decreasing order
- * @param previousWatermark previous completion watermark for the table
- * @return updated completion watermark
- */
- private long computeCompletenessWatermark(String catalogDbTableName, String
topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
- log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", topicName, timestamps, previousWatermark));
- long completionWatermark = previousWatermark;
- ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
- try {
- if(timestamps == null || timestamps.size() <= 0) {
- log.error("Cannot create time iterator. Empty for null timestamps");
- return previousWatermark;
- }
- TimeIterator.Granularity granularity =
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
- ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
- .atZone(ZoneId.of(this.timeZone));
- ZonedDateTime startDT = timestamps.first();
- ZonedDateTime endDT = timestamps.last();
- TimeIterator iterator = new TimeIterator(startDT, endDT, granularity,
true);
- while (iterator.hasNext()) {
- ZonedDateTime timestampDT = iterator.next();
- if (timestampDT.isAfter(prevWatermarkDT)
- && TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
- long timestampMillis = timestampDT.toInstant().toEpochMilli();
- ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
- if (auditCountVerifier.get().isComplete(topicName,
- auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
- completionWatermark = timestampMillis;
- // Also persist the watermark into State object to share this with
other MetadataWriters
- // we enforce ourselves to always use lower-cased table name here
- String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
-
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased), completionWatermark);
- break;
- }
- } else {
- break;
- }
- }
- } catch (IOException e) {
- log.warn("Exception during audit count check: ", e);
+ private void updateWatermarkWithNoFilesRegistered(String topicName,
TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate) {
+ if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
+ log.info(String.format("Checking kafka audit for %s on change_property
", topicName));
+ SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
+ ZonedDateTime dtAtBeginningOfHour =
ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
+ timestamps.add(dtAtBeginningOfHour);
+
+ getWatermarkUpdater(topicName, tableMetadata,
propsToUpdate).run(timestamps, true);
+ } else {
+ log.info(String.format("Need valid watermark, current watermark is %s,
Not checking kafka audit for %s",
+ tableMetadata.completionWatermark, topicName));
}
- return completionWatermark;
+ }
+
+ @Override
+ public void reset(String dbName, String tableName) throws IOException {
+ this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}
private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata
tableMetadata, String dbName,
@@ -1031,6 +976,10 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
if (tableMetadata.completenessEnabled) {
gobblinTrackingEvent.addMetadata(COMPLETION_WATERMARK_KEY,
Long.toString(tableMetadata.completionWatermark));
+ if (tableMetadata.totalCountCompletenessEnabled) {
+ gobblinTrackingEvent.addMetadata(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+ Long.toString(tableMetadata.totalCountCompletionWatermark));
+ }
}
eventSubmitter.submit(gobblinTrackingEvent);
}
@@ -1109,7 +1058,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Long currentOffset =
((LongWatermark)recordEnvelope.getWatermark().getWatermark()).getValue();
if (currentOffset > currentWatermark) {
- if (!tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).lowWatermark.isPresent()) {
+ if (!tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata(this.conf)).lowWatermark.isPresent()) {
//This means we haven't register this table or met some error
before, we need to reset the low watermark
tableMetadataMap.get(tid).lowWatermark = Optional.of(currentOffset
- 1);
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
@@ -1117,6 +1066,11 @@ public class IcebergMetadataWriter implements
MetadataWriter {
tableMetadataMap.get(tid).newPartitionColumnEnabled = true;
if (this.completenessEnabled &&
this.completenessWhitelistBlacklist.acceptTable(dbName, tableName)) {
tableMetadataMap.get(tid).completenessEnabled = true;
+
+ if (this.totalCountCompletenessEnabled
+ &&
this.totalCountBasedCompletenessWhitelistBlacklist.acceptTable(dbName,
tableName)) {
+ tableMetadataMap.get(tid).totalCountCompletenessEnabled =
true;
+ }
}
}
}
@@ -1156,7 +1110,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
*
* Also note the difference with {@link org.apache.iceberg.TableMetadata}.
*/
- public class TableMetadata {
+ public static class TableMetadata {
Optional<Table> table = Optional.absent();
/**
@@ -1175,18 +1129,18 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
+ long totalCountCompletionWatermark = DEFAULT_COMPLETION_WATERMARK;
SortedSet<ZonedDateTime> datePartitions = new
TreeSet<>(Collections.reverseOrder());
List<String> serializedAuditCountMaps = new ArrayList<>();
@Setter
String datasetName;
boolean completenessEnabled;
+ boolean totalCountCompletenessEnabled;
boolean newPartitionColumnEnabled;
+ Configuration conf;
- Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
- .expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME,
DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
- TimeUnit.HOURS)
- .build();
+ Cache<CharSequence, String> addedFiles;
long lowestGMCEEmittedTime = Long.MAX_VALUE;
/**
@@ -1240,5 +1194,13 @@ public class IcebergMetadataWriter implements
MetadataWriter {
this.datePartitions.clear();
this.serializedAuditCountMaps.clear();
}
+
+ TableMetadata(Configuration conf) {
+ this.conf = conf;
+ addedFiles = CacheBuilder.newBuilder()
+ .expireAfterAccess(this.conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME,
DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
+ TimeUnit.HOURS)
+ .build();
+ }
}
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
index 73c206d5f..6d4b7b629 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -21,9 +21,14 @@ public class IcebergMetadataWriterConfigKeys {
public static final String ICEBERG_COMPLETENESS_ENABLED =
"iceberg.completeness.enabled";
public static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+ public static final String ICEBERG_TOTAL_COUNT_COMPLETENESS_ENABLED =
"iceberg.completeness.totalCount.enabled";
+ public static final boolean DEFAULT_ICEBERG_TOTAL_COUNT_COMPLETENESS = false;
public static final String ICEBERG_COMPLETENESS_WHITELIST =
"iceberg.completeness.whitelist";
+ public static final String ICEBERG_TOTAL_COUNT_COMPLETENESS_WHITELIST =
"iceberg.totalCount.completeness.whitelist";
public static final String ICEBERG_COMPLETENESS_BLACKLIST =
"iceberg.completeness.blacklist";
+ public static final String ICEBERG_TOTAL_COUNT_COMPLETENESS_BLACKLIST =
"iceberg.totalCount.completeness.blacklist";
public static final String COMPLETION_WATERMARK_KEY = "completionWatermark";
+ public static final String TOTAL_COUNT_COMPLETION_WATERMARK_KEY =
"totalCountCompletionWatermark";
public static final String COMPLETION_WATERMARK_TIMEZONE_KEY =
"completionWatermarkTimezone";
public static final long DEFAULT_COMPLETION_WATERMARK = -1L;
public static final String TIME_ZONE_KEY = "iceberg.completeness.timezone";
@@ -41,8 +46,7 @@ public class IcebergMetadataWriterConfigKeys {
public static final String ICEBERG_NEW_PARTITION_WHITELIST =
"iceberg.new.partition.whitelist";
public static final String ICEBERG_NEW_PARTITION_BLACKLIST =
"iceberg.new.partition.blacklist";
public static final String STATE_COMPLETION_WATERMARK_KEY_OF_TABLE =
"completion.watermark.%s";
+ public static final String
STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE =
"totalCount.completion.watermark.%s";
public static final String ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY =
"iceberg.enable.custom.metadata.retention.policy";
public static final boolean
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY = true;
-
-
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdaterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdaterTest.java
new file mode 100644
index 000000000..0b7848733
--- /dev/null
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdaterTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.time.TimeIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static
org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
+import static org.mockito.Mockito.*;
+
+
+public class CompletenessWatermarkUpdaterTest {
+ static final String TOPIC = "testTopic";
+ static final String TABLE_NAME = "testTopic_tableName";
+ static final String TIME_ZONE = "America/Los_Angeles";
+ static final String AUDIT_CHECK_GRANULARITY = "HOUR";
+
+ static final ZonedDateTime NOW =
ZonedDateTime.now(ZoneId.of(TIME_ZONE)).truncatedTo(ChronoUnit.HOURS);
+ static final ZonedDateTime ONE_HOUR_AGO = TimeIterator.dec(NOW,
TimeIterator.Granularity.valueOf(AUDIT_CHECK_GRANULARITY), 1);
+ static final ZonedDateTime TWO_HOUR_AGO = TimeIterator.dec(NOW,
TimeIterator.Granularity.valueOf(AUDIT_CHECK_GRANULARITY), 2);
+ static final ZonedDateTime THREE_HOUR_AGO = TimeIterator.dec(NOW,
TimeIterator.Granularity.valueOf(AUDIT_CHECK_GRANULARITY), 3);
+
+ @Test
+ public void testClassicWatermarkOnly() throws IOException {
+ TestParams params = createTestParams();
+
+ // Round 1: the expected completion watermark bootstraps to ONE_HOUR_AGO
+ // total completion watermark is not enabled
+ KafkaAuditCountVerifier verifier =
mockKafkaAuditCountVerifier(ImmutableList.of(
+ new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true /*
isCompleteClassic */, false /* isCompleteTotalCount */),
+ new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true,
true)));
+ CompletenessWatermarkUpdater updater =
+ new CompletenessWatermarkUpdater("testTopic", AUDIT_CHECK_GRANULARITY,
TIME_ZONE, params.tableMetadata, params.props, params.state, verifier);
+ SortedSet<ZonedDateTime> timestamps = new
TreeSet<>(Collections.reverseOrder());
+ timestamps.add(ONE_HOUR_AGO);
+ timestamps.add(TWO_HOUR_AGO);
+ boolean includeTotalCountCompletionWatermark = false;
+ updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+ validateCompletionWaterMark(ONE_HOUR_AGO, params);
+ validateEmptyTotalCompletionWatermark(params);
+
+ // Round 2: the expected completion watermark moves from ONE_HOUR_AGO to
NOW
+ // total completion watermark is not enabled
+ verifier = mockKafkaAuditCountVerifier(ImmutableList.of(
+ new AuditCountVerificationResult(ONE_HOUR_AGO, NOW, true /*
isCompleteClassic */, false /* isCompleteTotalCount */),
+ new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true,
true),
+ new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true,
true)));
+ updater.setAuditCountVerifier(verifier);
+ timestamps.add(NOW);
+ updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+ validateCompletionWaterMark(NOW, params);
+ validateEmptyTotalCompletionWatermark(params);
+ }
+
+ @Test
+ public void testClassicAndTotalCountWatermark() throws IOException {
+ TestParams params = createTestParams();
+
+ // Round 1: the expected completion watermark bootstraps to ONE_HOUR_AGO
+ // the expected total completion watermark bootstraps to
TOW_HOUR_AGO
+ KafkaAuditCountVerifier verifier =
mockKafkaAuditCountVerifier(ImmutableList.of(
+ new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true /*
isCompleteClassic */, false /* isCompleteTotalCount */),
+ new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true,
true)));
+ CompletenessWatermarkUpdater updater =
+ new CompletenessWatermarkUpdater("testTopic", AUDIT_CHECK_GRANULARITY,
TIME_ZONE, params.tableMetadata, params.props, params.state, verifier);
+ SortedSet<ZonedDateTime> timestamps = new
TreeSet<>(Collections.reverseOrder());
+ timestamps.add(ONE_HOUR_AGO);
+ timestamps.add(TWO_HOUR_AGO);
+ boolean includeTotalCountCompletionWatermark = true;
+ updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+ validateCompletionWaterMark(ONE_HOUR_AGO, params);
+ validateTotalCompletionWatermark(TWO_HOUR_AGO, params);
+
+ // Round 2: the expected completion watermark moves from ONE_HOUR_AGO to
NOW
+ // the expected total completion watermark moves from
TOW_HOUR_AGO to ONE_HOUR_AGO
+ verifier = mockKafkaAuditCountVerifier(ImmutableList.of(
+ new AuditCountVerificationResult(ONE_HOUR_AGO, NOW, true /*
isCompleteClassic */, false /* isCompleteTotalCount */),
+ new AuditCountVerificationResult(TWO_HOUR_AGO, ONE_HOUR_AGO, true,
true),
+ new AuditCountVerificationResult(THREE_HOUR_AGO, TWO_HOUR_AGO, true,
true)));
+ updater.setAuditCountVerifier(verifier);
+ timestamps.add(NOW);
+ updater.run(timestamps, includeTotalCountCompletionWatermark);
+
+ validateCompletionWaterMark(NOW, params);
+ validateTotalCompletionWatermark(ONE_HOUR_AGO, params);
+}
+
+ static void validateCompletionWaterMark(ZonedDateTime expectedDT, TestParams
params) {
+ long expected = expectedDT.toInstant().toEpochMilli();
+
+ // 1. assert updated tableMetadata.completionWatermark
+ Assert.assertEquals(params.tableMetadata.completionWatermark, expected);
+ // 2. assert updated property
+ Assert.assertEquals(params.props.get(COMPLETION_WATERMARK_KEY),
String.valueOf(expected));
+ Assert.assertEquals(params.props.get(COMPLETION_WATERMARK_TIMEZONE_KEY),
TIME_ZONE);
+ // 3. assert updated state
+ String watermarkKey =
String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
+ params.tableMetadata.table.get().name().toLowerCase(Locale.ROOT));
+ Assert.assertEquals(params.state.getProp(watermarkKey),
String.valueOf(expected));
+ }
+
+ static void validateTotalCompletionWatermark(ZonedDateTime expectedDT,
TestParams params) {
+ long expected = expectedDT.toInstant().toEpochMilli();
+
+ // 1. expect updated tableMetadata.totalCountCompletionWatermark
+ Assert.assertEquals(params.tableMetadata.totalCountCompletionWatermark,
expected);
+ // 2. expect updated property
+
Assert.assertEquals(params.props.get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
String.valueOf(expected));
+ // 3. expect updated state
+ String totalCountWatermarkKey =
String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
+ params.tableMetadata.table.get().name().toLowerCase(Locale.ROOT));
+ Assert.assertEquals(params.state.getProp(totalCountWatermarkKey),
String.valueOf(expected));
+ }
+
+ static void validateEmptyTotalCompletionWatermark(TestParams params) {
+ Assert.assertEquals(params.tableMetadata.totalCountCompletionWatermark,
DEFAULT_COMPLETION_WATERMARK);
+ Assert.assertNull(params.props.get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
+ String totalCountWatermarkKey =
String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
+ params.tableMetadata.table.get().name().toLowerCase(Locale.ROOT));
+ Assert.assertNull(params.state.getProp(totalCountWatermarkKey));
+ }
+
+ static class TestParams {
+ IcebergMetadataWriter.TableMetadata tableMetadata;
+ Map<String, String> props;
+ State state;
+ }
+
+ static TestParams createTestParams() throws IOException {
+ TestParams params = new TestParams();
+ params.tableMetadata = new IcebergMetadataWriter.TableMetadata(new
Configuration());
+
+ Table table = mock(Table.class);
+ when(table.name()).thenReturn(TABLE_NAME);
+ params.tableMetadata.table = Optional.of(table);
+
+ params.props = new HashMap<>();
+ params.state = new State();
+
+ return params;
+ }
+
+ static class AuditCountVerificationResult {
+ AuditCountVerificationResult(ZonedDateTime start, ZonedDateTime end,
boolean isCompleteClassic, boolean isCompleteTotalCount) {
+ this.start = start;
+ this.end = end;
+ this.isCompleteClassic = isCompleteClassic;
+ this.isCompleteTotalCount = isCompleteTotalCount;
+ }
+ ZonedDateTime start;
+ ZonedDateTime end;
+ boolean isCompleteClassic;
+ boolean isCompleteTotalCount;
+ }
+
+ static KafkaAuditCountVerifier
mockKafkaAuditCountVerifier(List<AuditCountVerificationResult> resultsToMock)
+ throws IOException {
+ KafkaAuditCountVerifier verifier =
mock(IcebergMetadataWriterTest.TestAuditCountVerifier.class);
+ for (AuditCountVerificationResult result : resultsToMock) {
+ Mockito.when(verifier.calculateCompleteness(TOPIC,
result.start.toInstant().toEpochMilli(), result.end.toInstant().toEpochMilli()))
+ .thenReturn(ImmutableMap.of(
+ KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
result.isCompleteClassic,
+ KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness,
result.isCompleteTotalCount));
+ }
+ return verifier;
+ }
+}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index e5819d6ba..6feb8adae 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -194,6 +194,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
State state = getState();
state.setProp(ICEBERG_NEW_PARTITION_ENABLED, true);
state.setProp(ICEBERG_COMPLETENESS_ENABLED, true);
+ state.setProp(ICEBERG_TOTAL_COUNT_COMPLETENESS_ENABLED, true);
state.setProp(NEW_PARTITION_KEY, "late");
state.setProp(NEW_PARTITION_TYPE_KEY, "int");
state.setProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY,
TestAuditClientFactory.class.getName());
@@ -223,6 +224,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
Assert.assertEquals(table.location(),
new File(tmpDir,
"testDB/testTopic/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "1000-2000").build());
GenericRecord genericGmce_1000_2000 =
GenericData.get().deepCopy(gmce.getSchema(), gmce);
@@ -238,6 +241,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Assert low watermark and high watermark set properly
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"9");
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"20");
+
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
/*test flush twice*/
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "2000-3000").build());
@@ -257,6 +262,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"20");
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"30");
+
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
/* Test it will skip event with lower watermark*/
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "3000-4000").build());
@@ -268,6 +275,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-3000");
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
+
Assert.assertFalse(table.properties().containsKey(COMPLETION_WATERMARK_KEY));
+
Assert.assertFalse(table.properties().containsKey(TOTAL_COUNT_COMPLETION_WATERMARK_KEY));
}
//Make sure hive test execute later and close the metastore
@@ -420,7 +429,9 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
- Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis
- TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+ Mockito.when(verifier.calculateCompleteness("testTopicCompleteness",
timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis))
+
.thenReturn(ImmutableMap.of(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
true,
+
KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, true));
IcebergMetadataWriter imw = (IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
imw.setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
@@ -429,8 +440,10 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
+
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
// 1631811600000L correspond to 2020-09-16-10 in PT
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
table.name().toLowerCase(Locale.ROOT))), 1631811600000L);
+
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
table.name().toLowerCase(Locale.ROOT))), 1631811600000L);
Iterator<org.apache.iceberg.DataFile> dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile.getAbsolutePath())).collect().iterator();
Assert.assertTrue(dfl.hasNext());
@@ -453,6 +466,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
+
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile1.getAbsolutePath())).collect().iterator();
Assert.assertTrue(dfl.hasNext());
@@ -475,12 +489,17 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(60L))));
- Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis1
- TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+ Mockito.when(verifier.calculateCompleteness("testTopicCompleteness",
timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1))
+
.thenReturn(ImmutableMap.of(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
true,
+
KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, true));
+
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis1));
+
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis1));
// watermark 1631815200000L correspond to 2021-09-16-11 in PT
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
+
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_TOTAL_COUNT_COMPLETION_WATERMARK_KEY_OF_TABLE,
table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile2.getAbsolutePath())).collect().iterator();
Assert.assertTrue(dfl.hasNext());
@@ -511,7 +530,10 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
// For quiet topics always check for previous hour window
- Mockito.when(verifier.isComplete("testTopicCompleteness",
expectedCWDt.minusHours(1).toInstant().toEpochMilli(),
expectedWatermark)).thenReturn(true);
+ Mockito.when(verifier.calculateCompleteness("testTopicCompleteness",
expectedCWDt.minusHours(1).toInstant().toEpochMilli(), expectedWatermark))
+
.thenReturn(ImmutableMap.of(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
true,
+
KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness, true));
+
((IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
@@ -521,7 +543,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(expectedWatermark));
-
+
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
String.valueOf(expectedWatermark));
}
private String writeRecord(File file) throws IOException {