wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1228473902
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -891,94 +898,40 @@ public void flush(String dbName, String tableName) throws
IOException {
}
}
- @Override
- public void reset(String dbName, String tableName) throws IOException {
- this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+ private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName,
TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate) {
+ return new CompletenessWatermarkUpdater(topicName,
this.auditCheckGranularity, this.timeZone,
+ tableMetadata, propsToUpdate, this.state,
this.auditCountVerifier.get());
}
- /**
- * Update TableMetadata with the new completion watermark upon a successful
audit check
- * @param tableMetadata metadata of table
- * @param topic topic name
- * @param timestamps Sorted set in reverse order of timestamps to check
audit counts for
- * @param props table properties map
- */
- private void checkAndUpdateCompletenessWatermark(TableMetadata
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
- Map<String, String> props) {
- String tableName = tableMetadata.table.get().name();
- if (topic == null) {
- log.error(String.format("Not performing audit check. %s is null. Please
set as table property of %s",
- TOPIC_NAME_KEY, tableName));
- }
- long newCompletenessWatermark =
- computeCompletenessWatermark(tableName, topic, timestamps,
tableMetadata.completionWatermark);
- if (newCompletenessWatermark > tableMetadata.completionWatermark) {
- log.info(String.format("Updating %s for %s to %s",
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
- newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_KEY,
String.valueOf(newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
- tableMetadata.completionWatermark = newCompletenessWatermark;
- }
+ private void updateWatermarkWithFilesRegistered(String topicName,
TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate, boolean
includeTotalCountCompletionWatermark) {
+ getWatermarkUpdater(topicName, tableMetadata, propsToUpdate)
+ .run(tableMetadata.datePartitions,
includeTotalCountCompletionWatermark);
}
- /**
- * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit
counts match
- * for that window (aka its is set to the beginning of next window)
- * For each timestamp in sorted collection of timestamps in descending order
- * if timestamp is greater than previousWatermark
- * and hour(now) > hour(prevWatermark)
- * check audit counts for completeness between
- * a source and reference tier for [timestamp -1 , timstamp unit of
granularity]
- * If the audit count matches update the watermark to the timestamp and
break
- * else continue
- * else
- * break
- * Using a {@link TimeIterator} that operates over a range of time in 1 unit
- * given the start, end and granularity
- * @param catalogDbTableName
- * @param topicName
- * @param timestamps a sorted set of timestamps in decreasing order
- * @param previousWatermark previous completion watermark for the table
- * @return updated completion watermark
- */
- private long computeCompletenessWatermark(String catalogDbTableName, String
topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
- log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", topicName, timestamps, previousWatermark));
- long completionWatermark = previousWatermark;
- ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
- try {
- if(timestamps == null || timestamps.size() <= 0) {
- log.error("Cannot create time iterator. Empty for null timestamps");
- return previousWatermark;
- }
- TimeIterator.Granularity granularity =
TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
- ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
- .atZone(ZoneId.of(this.timeZone));
- ZonedDateTime startDT = timestamps.first();
- ZonedDateTime endDT = timestamps.last();
- TimeIterator iterator = new TimeIterator(startDT, endDT, granularity,
true);
- while (iterator.hasNext()) {
- ZonedDateTime timestampDT = iterator.next();
- if (timestampDT.isAfter(prevWatermarkDT)
- && TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
- long timestampMillis = timestampDT.toInstant().toEpochMilli();
- ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
- if (auditCountVerifier.get().isComplete(topicName,
- auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
- completionWatermark = timestampMillis;
- // Also persist the watermark into State object to share this with
other MetadataWriters
- // we enforce ourselves to always use lower-cased table name here
- String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
-
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased), completionWatermark);
- break;
- }
- } else {
- break;
- }
- }
- } catch (IOException e) {
- log.warn("Exception during audit count check: ", e);
+ private void updateWatermarkWithNoFilesRegistered(String topicName,
TableMetadata tableMetadata,
Review Comment:
This will be refined in next iteration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]