wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1222735916
##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long
beginInMillis, long endInMill
*/
private double getCompletenessPercentage(String datasetName, long
beginInMillis, long endInMillis) throws IOException {
Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
- log.info(String.format("Audit counts map for %s for range [%s,%s]",
datasetName, beginInMillis, endInMillis));
- countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+ validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier,
this.srcTier, this.refTiers);
if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
Review Comment:
Good catch. Fixed it and added a new unit test.
##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String
datasetName, long beginInMillis,
return percent;
}
+ private double getTotalCountCompletenessPercentage(String datasetName, long
beginInMillis, long endInMillis) throws IOException {
+ Preconditions.checkNotNull(this.totalCountRefTiers);
+
+ Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
+ validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier,
this.srcTier, this.totalCountRefTiers);
+ if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
Review Comment:
Refactored.
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String
catalogDbTableName, String topi
&& TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
long timestampMillis = timestampDT.toInstant().toEpochMilli();
ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
- if (auditCountVerifier.get().isComplete(topicName,
- auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
+ if (!isTotalCountCompletenessWatermark &&
auditCountVerifier.get().isComplete(
+ topicName,
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
completionWatermark = timestampMillis;
// Also persist the watermark into State object to share this with
other MetadataWriters
// we enforce ourselves to always use lower-cased table name here
String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
-
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased), completionWatermark);
+ this.state.setProp(
+ String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased),
+ completionWatermark);
+ break;
+ }
+
+ if (isTotalCountCompletenessWatermark &&
auditCountVerifier.get().isTotalCountComplete(
+ topicName,
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+ completionWatermark = timestampMillis;
+ // Also persist the watermark into State object to share this with
other MetadataWriters
+ // we enforce ourselves to always use lower-cased table name here
+ String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
+ this.state.setProp(
Review Comment:
Refactored.
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws
IOException {
log.info(String.format("Need valid watermark, current watermark is
%s, Not checking kafka audit for %s",
tableMetadata.completionWatermark, topicName));
}
+
+ // Deal with total count completion watermark.
+ if (tableMetadata.totalCountCompletionWatermark >
DEFAULT_COMPLETION_WATERMARK) {
Review Comment:
Refactored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]