[
https://issues.apache.org/jira/browse/GOBBLIN-1838?focusedWorklogId=865329&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865329
]
ASF GitHub Bot logged work on GOBBLIN-1838:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Jun/23 17:31
Start Date: 13/Jun/23 17:31
Worklog Time Spent: 10m
Work Description: wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1228473902
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -891,94 +898,40 @@ public void flush(String dbName, String tableName) throws
IOException {
}
}
- @Override
- public void reset(String dbName, String tableName) throws IOException {
- this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+ private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName,
TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate) {
+ return new CompletenessWatermarkUpdater(topicName,
this.auditCheckGranularity, this.timeZone,
+ tableMetadata, propsToUpdate, this.state,
this.auditCountVerifier.get());
}
- /**
- * Update TableMetadata with the new completion watermark upon a successful
audit check
- * @param tableMetadata metadata of table
- * @param topic topic name
- * @param timestamps Sorted set in reverse order of timestamps to check
audit counts for
- * @param props table properties map
- */
- private void checkAndUpdateCompletenessWatermark(TableMetadata
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
- Map<String, String> props) {
- String tableName = tableMetadata.table.get().name();
- if (topic == null) {
- log.error(String.format("Not performing audit check. %s is null. Please
set as table property of %s",
- TOPIC_NAME_KEY, tableName));
- }
- long newCompletenessWatermark =
- computeCompletenessWatermark(tableName, topic, timestamps,
tableMetadata.completionWatermark);
- if (newCompletenessWatermark > tableMetadata.completionWatermark) {
- log.info(String.format("Updating %s for %s to %s",
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
- newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_KEY,
String.valueOf(newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
- tableMetadata.completionWatermark = newCompletenessWatermark;
- }
+ private void updateWatermarkWithFilesRegistered(String topicName,
TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate, boolean
includeTotalCountCompletionWatermark) {
+ getWatermarkUpdater(topicName, tableMetadata, propsToUpdate)
+ .run(tableMetadata.datePartitions,
includeTotalCountCompletionWatermark);
}
- /**
- * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit
counts match
- * for that window (aka its is set to the beginning of next window)
- * For each timestamp in sorted collection of timestamps in descending order
- * if timestamp is greater than previousWatermark
- * and hour(now) > hour(prevWatermark)
- * check audit counts for completeness between
- * a source and reference tier for [timestamp -1 , timstamp unit of
granularity]
- * If the audit count matches update the watermark to the timestamp and
break
- * else continue
- * else
- * break
- * Using a {@link TimeIterator} that operates over a range of time in 1 unit
- * given the start, end and granularity
- * @param catalogDbTableName
- * @param topicName
- * @param timestamps a sorted set of timestamps in decreasing order
- * @param previousWatermark previous completion watermark for the table
- * @return updated completion watermark
- */
- private long computeCompletenessWatermark(String catalogDbTableName, String
topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
- log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", topicName, timestamps, previousWatermark));
- long completionWatermark = previousWatermark;
- ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
- try {
- if(timestamps == null || timestamps.size() <= 0) {
- log.error("Cannot create time iterator. Empty for null timestamps");
- return previousWatermark;
- }
- TimeIterator.Granularity granularity =
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
- ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
- .atZone(ZoneId.of(this.timeZone));
- ZonedDateTime startDT = timestamps.first();
- ZonedDateTime endDT = timestamps.last();
- TimeIterator iterator = new TimeIterator(startDT, endDT, granularity,
true);
- while (iterator.hasNext()) {
- ZonedDateTime timestampDT = iterator.next();
- if (timestampDT.isAfter(prevWatermarkDT)
- && TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
- long timestampMillis = timestampDT.toInstant().toEpochMilli();
- ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
- if (auditCountVerifier.get().isComplete(topicName,
- auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
- completionWatermark = timestampMillis;
- // Also persist the watermark into State object to share this with
other MetadataWriters
- // we enforce ourselves to always use lower-cased table name here
- String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
-
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased), completionWatermark);
- break;
- }
- } else {
- break;
- }
- }
- } catch (IOException e) {
- log.warn("Exception during audit count check: ", e);
+ private void updateWatermarkWithNoFilesRegistered(String topicName,
TableMetadata tableMetadata,
Review Comment:
This will be refined in next iteration.
Issue Time Tracking
-------------------
Worklog Id: (was: 865329)
Time Spent: 1h 20m (was: 1h 10m)
> Introduce total count based completion watermark
> ------------------------------------------------
>
> Key: GOBBLIN-1838
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1838
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Andy Jiang
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Currently the completion watermark is determined according to the
> completeness percentage: "max of srcCount/refCount, for each refTier".
> This change introduces a new "total count based completion watermark", which
> is determined by a new completeness percentage: "srcCount / sum of all
> refCount, for each refTier".
--
This message was sent by Atlassian Jira
(v8.20.10#820010)