aokolnychyi commented on a change in pull request #2328:
URL: https://github.com/apache/iceberg/pull/2328#discussion_r599144029



##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -270,29 +270,33 @@ private CommitStatus checkCommitStatus(String 
newMetadataLocation, TableMetadata
     int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), 
COMMIT_NUM_STATUS_CHECKS,
         COMMIT_NUM_STATUS_CHECKS_DEFAULT);
 
-    for (int attempt = 1; attempt <= maxAttempts; attempt++) {
-      try {
-        Thread.sleep(COMMIT_STATUS_RECHECK_SLEEP);
-        TableMetadata metadata = refresh();
-        String metadataLocation = metadata.metadataFileLocation();
-        boolean commitSuccess = metadataLocation.equals(newMetadataLocation) ||
-            metadata.previousFiles().stream().anyMatch(log -> 
log.file().equals(newMetadataLocation));
-        if (commitSuccess) {
-          LOG.info("Commit status check: Commit to {}.{} of {} succeeded", 
database, tableName, newMetadataLocation);
-          return CommitStatus.SUCCESS;
-        } else {
-          LOG.info("Commit status check: Commit to {}.{} of {} failed", 
database, tableName, newMetadataLocation);
-          return CommitStatus.FAILURE;
-        }
-      } catch (Throwable checkFailure) {
-        LOG.error("Cannot check if commit to {}.{} exists. Retry attempt {} of 
{}.",
-            database, tableName, attempt, maxAttempts, checkFailure);
-      }
-    }
-
-    LOG.error("Cannot determine commit state to {}.{}. Failed to check {} 
times. Treating commit state as unknown.",
+    AtomicReference<CommitStatus> status = new 
AtomicReference<>(CommitStatus.UNKNOWN);
+
+    Tasks

Review comment:
       I think it is common to start with `Tasks.foreach(newMetadataLocation)` 
on one line.

##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -270,29 +270,33 @@ private CommitStatus checkCommitStatus(String 
newMetadataLocation, TableMetadata
     int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), 
COMMIT_NUM_STATUS_CHECKS,
         COMMIT_NUM_STATUS_CHECKS_DEFAULT);
 
-    for (int attempt = 1; attempt <= maxAttempts; attempt++) {
-      try {
-        Thread.sleep(COMMIT_STATUS_RECHECK_SLEEP);
-        TableMetadata metadata = refresh();
-        String metadataLocation = metadata.metadataFileLocation();
-        boolean commitSuccess = metadataLocation.equals(newMetadataLocation) ||
-            metadata.previousFiles().stream().anyMatch(log -> 
log.file().equals(newMetadataLocation));
-        if (commitSuccess) {
-          LOG.info("Commit status check: Commit to {}.{} of {} succeeded", 
database, tableName, newMetadataLocation);
-          return CommitStatus.SUCCESS;
-        } else {
-          LOG.info("Commit status check: Commit to {}.{} of {} failed", 
database, tableName, newMetadataLocation);
-          return CommitStatus.FAILURE;
-        }
-      } catch (Throwable checkFailure) {
-        LOG.error("Cannot check if commit to {}.{} exists. Retry attempt {} of 
{}.",
-            database, tableName, attempt, maxAttempts, checkFailure);
-      }
-    }
-
-    LOG.error("Cannot determine commit state to {}.{}. Failed to check {} 
times. Treating commit state as unknown.",
+    AtomicReference<CommitStatus> status = new 
AtomicReference<>(CommitStatus.UNKNOWN);
+
+    Tasks
+        .foreach(newMetadataLocation)
+        .retry(maxAttempts)
+        .suppressFailureWhenFinished()
+        .exponentialBackoff(COMMIT_STATUS_RECHECK_SLEEP, 
COMMIT_STATUS_RECHECK_SLEEP, Long.MAX_VALUE, 1.0)

Review comment:
       If we switch to using `Tasks`, I'd also add a timeout of let's say 30 
mins to be safe. Checking the status may hit request timeouts, making the check 
time quite long. Also, using exponential backoff sounds reasonable (i.e. 
changing 1.0 to 2.0).
   
   Also, it is common to use `_TS` suffixes for these constants. Something like 
below.
   
   ```
       Tasks.foreach(newMetadataLocation)
           .retry(maxAttempts)
           .suppressFailureWhenFinished()
           .exponentialBackoff(
               COMMIT_STATUS_CHECK_WAIT_MS,
               COMMIT_STATUS_CHECK_WAIT_MS,
               COMMIT_STATUS_CHECK_TIMEOUT_MS,
               2.0)
   ```

##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -270,29 +270,33 @@ private CommitStatus checkCommitStatus(String 
newMetadataLocation, TableMetadata
     int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), 
COMMIT_NUM_STATUS_CHECKS,
         COMMIT_NUM_STATUS_CHECKS_DEFAULT);
 
-    for (int attempt = 1; attempt <= maxAttempts; attempt++) {
-      try {
-        Thread.sleep(COMMIT_STATUS_RECHECK_SLEEP);
-        TableMetadata metadata = refresh();
-        String metadataLocation = metadata.metadataFileLocation();
-        boolean commitSuccess = metadataLocation.equals(newMetadataLocation) ||
-            metadata.previousFiles().stream().anyMatch(log -> 
log.file().equals(newMetadataLocation));
-        if (commitSuccess) {
-          LOG.info("Commit status check: Commit to {}.{} of {} succeeded", 
database, tableName, newMetadataLocation);
-          return CommitStatus.SUCCESS;
-        } else {
-          LOG.info("Commit status check: Commit to {}.{} of {} failed", 
database, tableName, newMetadataLocation);
-          return CommitStatus.FAILURE;
-        }
-      } catch (Throwable checkFailure) {
-        LOG.error("Cannot check if commit to {}.{} exists. Retry attempt {} of 
{}.",
-            database, tableName, attempt, maxAttempts, checkFailure);
-      }
-    }
-
-    LOG.error("Cannot determine commit state to {}.{}. Failed to check {} 
times. Treating commit state as unknown.",
+    AtomicReference<CommitStatus> status = new 
AtomicReference<>(CommitStatus.UNKNOWN);
+
+    Tasks
+        .foreach(newMetadataLocation)
+        .retry(maxAttempts)
+        .suppressFailureWhenFinished()
+        .exponentialBackoff(COMMIT_STATUS_RECHECK_SLEEP, 
COMMIT_STATUS_RECHECK_SLEEP, Long.MAX_VALUE, 1.0)
+        .onFailure((location, checkException) ->
+            LOG.error("Cannot check if commit to {}.{} exists.", database, 
tableName, checkException))
+        .run(location -> {
+          TableMetadata metadata = refresh();
+          String currentMetadataLocation = metadata.metadataFileLocation();
+          boolean commitSuccess = 
currentMetadataLocation.equals(newMetadataLocation) ||
+              metadata.previousFiles().stream().anyMatch(log -> 
log.file().equals(newMetadataLocation));
+          if (commitSuccess) {
+            LOG.info("Commit status check: Commit to {}.{} of {} succeeded", 
database, tableName, newMetadataLocation);
+            status.set(CommitStatus.SUCCESS);
+          } else {
+            LOG.info("Commit status check: Commit to {}.{} of {} failed", 
database, tableName, newMetadataLocation);
+            status.set(CommitStatus.FAILURE);
+          }
+        });
+
+    LOG.error("Cannot determine commit state to {}.{}. Failed during checking 
{} times. " +

Review comment:
       I am afraid we will always log this error now. Shall we do this only if 
state is still unknown?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to