aokolnychyi commented on a change in pull request #2328:
URL: https://github.com/apache/iceberg/pull/2328#discussion_r599144029
##########
File path:
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -270,29 +270,33 @@ private CommitStatus checkCommitStatus(String
newMetadataLocation, TableMetadata
int maxAttempts = PropertyUtil.propertyAsInt(config.properties(),
COMMIT_NUM_STATUS_CHECKS,
COMMIT_NUM_STATUS_CHECKS_DEFAULT);
- for (int attempt = 1; attempt <= maxAttempts; attempt++) {
- try {
- Thread.sleep(COMMIT_STATUS_RECHECK_SLEEP);
- TableMetadata metadata = refresh();
- String metadataLocation = metadata.metadataFileLocation();
- boolean commitSuccess = metadataLocation.equals(newMetadataLocation) ||
- metadata.previousFiles().stream().anyMatch(log ->
log.file().equals(newMetadataLocation));
- if (commitSuccess) {
- LOG.info("Commit status check: Commit to {}.{} of {} succeeded",
database, tableName, newMetadataLocation);
- return CommitStatus.SUCCESS;
- } else {
- LOG.info("Commit status check: Commit to {}.{} of {} failed",
database, tableName, newMetadataLocation);
- return CommitStatus.FAILURE;
- }
- } catch (Throwable checkFailure) {
- LOG.error("Cannot check if commit to {}.{} exists. Retry attempt {} of
{}.",
- database, tableName, attempt, maxAttempts, checkFailure);
- }
- }
-
- LOG.error("Cannot determine commit state to {}.{}. Failed to check {}
times. Treating commit state as unknown.",
+ AtomicReference<CommitStatus> status = new
AtomicReference<>(CommitStatus.UNKNOWN);
+
+ Tasks
Review comment:
I think it is common to start with `Tasks.foreach(newMetadataLocation)`
on one line.
##########
File path:
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -270,29 +270,33 @@ private CommitStatus checkCommitStatus(String
newMetadataLocation, TableMetadata
int maxAttempts = PropertyUtil.propertyAsInt(config.properties(),
COMMIT_NUM_STATUS_CHECKS,
COMMIT_NUM_STATUS_CHECKS_DEFAULT);
- for (int attempt = 1; attempt <= maxAttempts; attempt++) {
- try {
- Thread.sleep(COMMIT_STATUS_RECHECK_SLEEP);
- TableMetadata metadata = refresh();
- String metadataLocation = metadata.metadataFileLocation();
- boolean commitSuccess = metadataLocation.equals(newMetadataLocation) ||
- metadata.previousFiles().stream().anyMatch(log ->
log.file().equals(newMetadataLocation));
- if (commitSuccess) {
- LOG.info("Commit status check: Commit to {}.{} of {} succeeded",
database, tableName, newMetadataLocation);
- return CommitStatus.SUCCESS;
- } else {
- LOG.info("Commit status check: Commit to {}.{} of {} failed",
database, tableName, newMetadataLocation);
- return CommitStatus.FAILURE;
- }
- } catch (Throwable checkFailure) {
- LOG.error("Cannot check if commit to {}.{} exists. Retry attempt {} of
{}.",
- database, tableName, attempt, maxAttempts, checkFailure);
- }
- }
-
- LOG.error("Cannot determine commit state to {}.{}. Failed to check {}
times. Treating commit state as unknown.",
+ AtomicReference<CommitStatus> status = new
AtomicReference<>(CommitStatus.UNKNOWN);
+
+ Tasks
+ .foreach(newMetadataLocation)
+ .retry(maxAttempts)
+ .suppressFailureWhenFinished()
+ .exponentialBackoff(COMMIT_STATUS_RECHECK_SLEEP,
COMMIT_STATUS_RECHECK_SLEEP, Long.MAX_VALUE, 1.0)
Review comment:
If we switch to using `Tasks`, I'd also add a timeout of let's say 30
mins to be safe. Checking the status may hit request timeouts, making the check
time quite long. Also, using exponential backoff sounds reasonable (i.e.
changing 1.0 to 2.0).
Also, it is common to use `_TS` suffixes for these constants. Something like
below.
```
Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(
COMMIT_STATUS_CHECK_WAIT_MS,
COMMIT_STATUS_CHECK_WAIT_MS,
COMMIT_STATUS_CHECK_TIMEOUT_MS,
2.0)
```
##########
File path:
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -270,29 +270,33 @@ private CommitStatus checkCommitStatus(String
newMetadataLocation, TableMetadata
int maxAttempts = PropertyUtil.propertyAsInt(config.properties(),
COMMIT_NUM_STATUS_CHECKS,
COMMIT_NUM_STATUS_CHECKS_DEFAULT);
- for (int attempt = 1; attempt <= maxAttempts; attempt++) {
- try {
- Thread.sleep(COMMIT_STATUS_RECHECK_SLEEP);
- TableMetadata metadata = refresh();
- String metadataLocation = metadata.metadataFileLocation();
- boolean commitSuccess = metadataLocation.equals(newMetadataLocation) ||
- metadata.previousFiles().stream().anyMatch(log ->
log.file().equals(newMetadataLocation));
- if (commitSuccess) {
- LOG.info("Commit status check: Commit to {}.{} of {} succeeded",
database, tableName, newMetadataLocation);
- return CommitStatus.SUCCESS;
- } else {
- LOG.info("Commit status check: Commit to {}.{} of {} failed",
database, tableName, newMetadataLocation);
- return CommitStatus.FAILURE;
- }
- } catch (Throwable checkFailure) {
- LOG.error("Cannot check if commit to {}.{} exists. Retry attempt {} of
{}.",
- database, tableName, attempt, maxAttempts, checkFailure);
- }
- }
-
- LOG.error("Cannot determine commit state to {}.{}. Failed to check {}
times. Treating commit state as unknown.",
+ AtomicReference<CommitStatus> status = new
AtomicReference<>(CommitStatus.UNKNOWN);
+
+ Tasks
+ .foreach(newMetadataLocation)
+ .retry(maxAttempts)
+ .suppressFailureWhenFinished()
+ .exponentialBackoff(COMMIT_STATUS_RECHECK_SLEEP,
COMMIT_STATUS_RECHECK_SLEEP, Long.MAX_VALUE, 1.0)
+ .onFailure((location, checkException) ->
+ LOG.error("Cannot check if commit to {}.{} exists.", database,
tableName, checkException))
+ .run(location -> {
+ TableMetadata metadata = refresh();
+ String currentMetadataLocation = metadata.metadataFileLocation();
+ boolean commitSuccess =
currentMetadataLocation.equals(newMetadataLocation) ||
+ metadata.previousFiles().stream().anyMatch(log ->
log.file().equals(newMetadataLocation));
+ if (commitSuccess) {
+ LOG.info("Commit status check: Commit to {}.{} of {} succeeded",
database, tableName, newMetadataLocation);
+ status.set(CommitStatus.SUCCESS);
+ } else {
+ LOG.info("Commit status check: Commit to {}.{} of {} failed",
database, tableName, newMetadataLocation);
+ status.set(CommitStatus.FAILURE);
+ }
+ });
+
+ LOG.error("Cannot determine commit state to {}.{}. Failed during checking
{} times. " +
Review comment:
I am afraid we will always log this error now. Shall we do this only if
state is still unknown?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]