vikrambohra commented on a change in pull request #3385:
URL: https://github.com/apache/gobblin/pull/3385#discussion_r704696128
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -731,6 +871,34 @@ public void flush(String dbName, String tableName) throws
IOException {
}
}
+ /**
+ * For a sorted collection of timestamps greater than an existitng
watermark, check audit counts for completeness between
+ * a source and reference tier with a granularit if 1 hour
+ * If the audit count matches update the watermark to the timestamp
+ * @param table
+ * @param timestamps
+ * @param previousWatermark
+ * @return updated completion watermark
+ */
+ private long computeCompletenessWatermark(String table, Collection<Long>
timestamps, long previousWatermark) {
+ log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", table, timestamps, previousWatermark));
+ long completionWatermark = previousWatermark;
+ try {
+ for(long timestamp : timestamps) {
+ if (timestamp > previousWatermark) {
+ if(auditCountVerifier.get().isComplete(table, timestamp, timestamp +
TimeUnit.HOURS.toMillis(1))) {
+ completionWatermark = timestamp;
+ }
+ } else {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
Review comment:
Yes. intention is to keep going even if audit count system is down.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]