[
https://issues.apache.org/jira/browse/GOBBLIN-1838?focusedWorklogId=865634&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865634
]
ASF GitHub Bot logged work on GOBBLIN-1838:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jun/23 22:08
Start Date: 14/Jun/23 22:08
Worklog Time Spent: 10m
Work Description: wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1230222909
##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -90,23 +102,52 @@ private static AuditCountClient getAuditClient(State
state) {
}
}
+ public Map<CompletenessType, Boolean> calculateCompleteness(String
datasetName, long beginInMillis, long endInMillis)
+ throws IOException {
+ return calculateCompleteness(datasetName, beginInMillis, endInMillis,
this.threshold);
+ }
+
/**
* Compare source tier against reference tiers.
- * Compute completion percentage by srcCount/refCount. Return true iff the
highest percentages is greater than threshold.
+ * Compute completion percentage which is true iff the calculated
percentages is greater than threshold.
*
* @param datasetName A dataset short name like 'PageViewEvent'
* @param beginInMillis Unix timestamp in milliseconds
* @param endInMillis Unix timestamp in milliseconds
* @param threshold User defined threshold
+ *
+ * @return a map of completeness result by CompletenessType
*/
- public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis, double threshold)
- throws IOException {
- return getCompletenessPercentage(datasetName, beginInMillis, endInMillis)
> threshold;
+ public Map<CompletenessType, Boolean> calculateCompleteness(String
datasetName, long beginInMillis, long endInMillis,
+ double threshold) throws IOException {
+ Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
+ log.info(String.format("checkTierCounts: audit counts map for %s for range
[%s,%s]", datasetName, beginInMillis, endInMillis));
+ countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+
+ Map<CompletenessType, Boolean> result = new HashMap<>();
+ result.put(CompletenessType.ClassicCompleteness,
CalculateCompleteness(datasetName, beginInMillis, endInMillis,
+ CompletenessType.ClassicCompleteness, countsByTier) > threshold);
+ result.put(CompletenessType.TotalCountCompleteness,
CalculateCompleteness(datasetName, beginInMillis, endInMillis,
+ CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
+ return result;
}
- public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis)
- throws IOException {
- return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+ private double CalculateCompleteness(String datasetName, long beginInMillis,
long endInMillis, CompletenessType type,
Review Comment:
Fixed.
Issue Time Tracking
-------------------
Worklog Id: (was: 865634)
Time Spent: 1h 40m (was: 1.5h)
> Introduce total count based completion watermark
> ------------------------------------------------
>
> Key: GOBBLIN-1838
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1838
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Andy Jiang
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Currently the completion watermark is determined according to the
> completeness percentage: "max of srcCount/refCount, for each refTier".
> This change introduces a new "total count based completion watermark", which
> is determined by a new completeness percentage: "srcCount / sum of all
> refCount, for each refTier".
--
This message was sent by Atlassian Jira
(v8.20.10#820010)