ZihanLi58 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1137558092


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java:
##########
@@ -62,16 +63,21 @@ public class State implements WritableShim {
   @Getter
   private Properties specProperties;
 
+  // This in-mem state will be used to share partition completion information 
across different MetadataWriter impls
+  public Map<String, Set<String>> tableToCompletedPartitions;

Review Comment:
   The config can be set/overwritten on the flight at runtime, which is only 
for internally usage. Having a map does not make sense for most use cases of 
this state object, so I would suggest using the existing API. 



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, 
SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 
1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other 
MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);
+            
this.state.tableToCompletedPartitions.putIfAbsent(tableNameLowerCased, 
Sets.newHashSet());
+            
this.state.tableToCompletedPartitions.get(tableNameLowerCased).add(timestampDT.format(HOURLY_DATEPARTITION_FORMAT));

Review Comment:
   We do guarantee SLA in most cases, but there are exceptions and GCNs can 
happen downstream which will cause these delays. My suggestion is to have 
Jasper expose the last completed partition and Jasper writer to compute the 
delta here as well. But it's just a reminder here and no need to address in 
this PR, maybe have one comment to indicate this scenario



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, 
SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 
1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other 
MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);

Review Comment:
   Then change it to be dbTableName to make it more clear?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to