sv2000 commented on a change in pull request #3385:
URL: https://github.com/apache/gobblin/pull/3385#discussion_r705596403



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -671,14 +796,29 @@ public void flush(String dbName, String tableName) throws 
IOException {
       TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new 
TableMetadata());
       if (tableMetadata.transaction.isPresent()) {
         Transaction transaction = tableMetadata.transaction.get();
+        Map<String, String> props = tableMetadata.newProperties.or(
+            
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
+          if(tableMetadata.completenessEnabled) {
+            String topicName = props.get(TOPIC_NAME_KEY);
+            if(topicName == null) {
+              log.error(String.format("Not performing audit check. %s is null. 
Please set as table property of %s.%s",
+                  TOPIC_NAME_KEY, dbName, tableName));
+            } else {
+              long newCompletenessWatermark =

Review comment:
       My main idea is to optimize for the common case, which is that pipeline 
is caught up and data is current. If hour(currentTime) == prevWatermark: skip 
audit check, else (we are lagging) and hence, do the check.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to