This is an automated email from the ASF dual-hosted git repository. yyan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push: new 68c48ec AWS: handle uncertain catalog state for glue (#2402) 68c48ec is described below commit 68c48ec9bd8c3c3ae14a365f937d1538a4fbf826 Author: yyanyy <ya...@amazon.com> AuthorDate: Mon Apr 5 18:04:36 2021 -0700 AWS: handle uncertain catalog state for glue (#2402) --- .../aws/glue/GlueCatalogCommitFailureTest.java | 287 +++++++++++++++++++++ .../iceberg/aws/glue/GlueTableOperations.java | 34 ++- .../iceberg/BaseMetastoreTableOperations.java | 55 ++++ .../apache/iceberg/hive/HiveTableOperations.java | 57 ---- 4 files changed, 367 insertions(+), 66 deletions(-) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogCommitFailureTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogCommitFailureTest.java new file mode 100644 index 0000000..efada18 --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogCommitFailureTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import com.amazonaws.SdkBaseException; +import com.amazonaws.services.s3.AmazonS3URI; +import java.io.File; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; + +public class GlueCatalogCommitFailureTest extends GlueTestBase { + + @Test + public void testFailedCommit() { + Table table = setupTable(); + GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + TableMetadata metadataV2 = updateTable(table, ops); + + GlueTableOperations spyOps = Mockito.spy(ops); + failCommitAndThrowException(spyOps); + + AssertHelpers.assertThrows("We should wrap the error to CommitFailedException if the " + + "commit actually doesn't succeed", CommitFailedException.class, "unexpected exception", + () -> spyOps.commit(metadataV2, metadataV1)); + Mockito.verify(spyOps, Mockito.times(1)).refresh(); + + ops.refresh(); + Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata should still exist", metadataFileExists(metadataV2)); + Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current())); + } + + @Test + public void testConcurrentModificationExceptionDoesNotCheckCommitStatus() { + Table table = setupTable(); + GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + TableMetadata metadataV2 = updateTable(table, ops); + + GlueTableOperations spyOps = Mockito.spy(ops); + failCommitAndThrowException(spyOps, ConcurrentModificationException.builder().build()); + + try { + spyOps.commit(metadataV2, metadataV1); + } catch (CommitFailedException e) { + Assert.assertTrue("Exception message should mention concurrent exception", + e.getMessage().contains("Glue detected concurrent update")); + Assert.assertTrue("Cause should be concurrent modification exception", + e.getCause() instanceof ConcurrentModificationException); + } + Mockito.verify(spyOps, Mockito.times(0)).refresh(); + + ops.refresh(); + Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata should still exist", metadataFileExists(metadataV2)); + Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current())); + } + + @Test + public void testCommitThrowsExceptionWhileSucceeded() { + Table table = setupTable(); + GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + TableMetadata metadataV2 = updateTable(table, ops); + + GlueTableOperations spyOps = Mockito.spy(ops); + + // Simulate a communication error after a successful commit + commitAndThrowException(ops, spyOps); + + // Shouldn't throw because the commit actually succeeds even though persistTable throws an exception + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("Commit should have been successful and new metadata file should be made", + 3, metadataFileCount(ops.current())); + } + + @Test + public void testFailedCommitThrowsUnknownException() { + Table table = setupTable(); + GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + TableMetadata metadataV2 = updateTable(table, ops); + + GlueTableOperations spyOps = Mockito.spy(ops); + failCommitAndThrowException(spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked", + CommitStateUnknownException.class, "Datacenter on fire", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + + Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("Client could not determine outcome so new metadata file should also exist", + 3, metadataFileCount(ops.current())); + } + + @Test + public void testSucceededCommitThrowsUnknownException() { + Table table = setupTable(); + GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + TableMetadata metadataV2 = updateTable(table, ops); + + GlueTableOperations spyOps = Mockito.spy(ops); + commitAndThrowException(ops, spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked", + CommitStateUnknownException.class, "Datacenter on fire", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + + Assert.assertNotEquals("Current metadata should have changed", ops.current(), metadataV2); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + } + + /** + * Pretends we threw an exception while persisting, the commit succeeded, the lock expired, + * and a second committer placed a commit on top of ours before the first committer was able to check + * if their commit succeeded or not + * + * Timeline: + * Client 1 commits which throws an exception but suceeded + * Client 1's lock expires while waiting to do the recheck for commit success + * Client 2 acquires a lock, commits successfully on top of client 1's commit and release lock + * Client 1 check's to see if their commit was successful + * + * This tests to make sure a disconnected client 1 doesn't think their commit failed just because it isn't the + * current one during the recheck phase. + */ + @Test + public void testExceptionThrownInConcurrentCommit() { + Table table = setupTable(); + GlueTableOperations ops = (GlueTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + TableMetadata metadataV2 = updateTable(table, ops); + + GlueTableOperations spyOps = Mockito.spy(ops); + concurrentCommitAndThrowException(ops, spyOps, table); + + /* + This commit and our concurrent commit should succeed even though this commit throws an exception + after the persist operation succeeds + */ + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("The column addition from the concurrent commit should have been successful", + 2, ops.current().schema().columns().size()); + } + + private void concurrentCommitAndThrowException(GlueTableOperations realOps, GlueTableOperations spyOperations, + Table table) { + // Simulate a communication error after a successful commit + Mockito.doAnswer(i -> { + Map<String, String> mapProperties = i.getArgumentAt(1, Map.class); + realOps.persistGlueTable( + i.getArgumentAt(0, software.amazon.awssdk.services.glue.model.Table.class), + mapProperties); + + // new metadata location is stored in map property, and used for locking + String newMetadataLocation = mapProperties.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + + // Simulate lock expiration or removal, use commit status null to avoid deleting data + realOps.cleanupMetadataAndUnlock(null, newMetadataLocation); + + table.refresh(); + table.updateSchema().addColumn("newCol", Types.IntegerType.get()).commit(); + throw new SdkBaseException("Datacenter on fire"); + }).when(spyOperations).persistGlueTable(Matchers.any(), Matchers.anyMap()); + } + + private Table setupTable() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + return glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + } + + private TableMetadata updateTable(Table table, GlueTableOperations ops) { + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, metadataV2.schema().columns().size()); + return metadataV2; + } + + private void commitAndThrowException(GlueTableOperations realOps, GlueTableOperations spyOps) { + Mockito.doAnswer(i -> { + realOps.persistGlueTable( + i.getArgumentAt(0, software.amazon.awssdk.services.glue.model.Table.class), + i.getArgumentAt(1, Map.class)); + throw new SdkBaseException("Datacenter on fire"); + }).when(spyOps).persistGlueTable(Matchers.any(), Matchers.anyMap()); + } + + private void failCommitAndThrowException(GlueTableOperations spyOps) { + failCommitAndThrowException(spyOps, new SdkBaseException("Datacenter on fire")); + } + + private void failCommitAndThrowException(GlueTableOperations spyOps, Exception exceptionToThrow) { + Mockito.doThrow(exceptionToThrow) + .when(spyOps).persistGlueTable(Matchers.any(), Matchers.anyMap()); + } + + private void breakFallbackCatalogCommitCheck(GlueTableOperations spyOperations) { + Mockito.when(spyOperations.refresh()) + .thenThrow(new RuntimeException("Still on fire")); // Failure on commit check + } + + private boolean metadataFileExists(TableMetadata metadata) { + AmazonS3URI amazonS3URI = new AmazonS3URI(metadata.metadataFileLocation()); + + try { + s3.headObject(HeadObjectRequest.builder() + .bucket(amazonS3URI.getBucket()) + .key(amazonS3URI.getKey()) + .build()); + return true; + } catch (NoSuchKeyException e) { + return false; + } + } + + private int metadataFileCount(TableMetadata metadata) { + AmazonS3URI amazonS3URI = new AmazonS3URI(metadata.metadataFileLocation()); + return (int) s3.listObjectsV2(ListObjectsV2Request.builder() + .bucket(amazonS3URI.getBucket()) + .prefix(new File(amazonS3URI.getKey()).getParent()) + .build()) + .contents().stream().filter(s3Object -> s3Object.key().endsWith("metadata.json")).count(); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java index 832c45d..e982246 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java @@ -28,12 +28,13 @@ import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; import software.amazon.awssdk.services.glue.model.CreateTableRequest; @@ -103,23 +104,36 @@ class GlueTableOperations extends BaseMetastoreTableOperations { @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); - boolean exceptionThrown = true; + CommitStatus commitStatus = CommitStatus.FAILURE; + try { lock(newMetadataLocation); Table glueTable = getGlueTable(); checkMetadataLocation(glueTable, base); Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation); persistGlueTable(glueTable, properties); - exceptionThrown = false; + commitStatus = CommitStatus.SUCCESS; } catch (ConcurrentModificationException e) { throw new CommitFailedException(e, "Cannot commit %s because Glue detected concurrent update", tableName()); } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { throw new AlreadyExistsException(e, "Cannot commit %s because its Glue table already exists when trying to create one", tableName()); - } catch (SdkException e) { - throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName()); + } catch (RuntimeException persistFailure) { + LOG.error("Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.", + fullTableName, persistFailure); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); + + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw new CommitFailedException(persistFailure, + "Cannot commit %s due to unexpected exception", tableName()); + case UNKNOWN: + throw new CommitStateUnknownException(persistFailure); + } } finally { - cleanupMetadataAndUnlock(exceptionThrown, newMetadataLocation); + cleanupMetadataAndUnlock(commitStatus, newMetadataLocation); } } @@ -164,7 +178,8 @@ class GlueTableOperations extends BaseMetastoreTableOperations { return properties; } - private void persistGlueTable(Table glueTable, Map<String, String> parameters) { + @VisibleForTesting + void persistGlueTable(Table glueTable, Map<String, String> parameters) { if (glueTable != null) { LOG.debug("Committing existing Glue table: {}", tableName()); glue.updateTable(UpdateTableRequest.builder() @@ -191,9 +206,10 @@ class GlueTableOperations extends BaseMetastoreTableOperations { } } - private void cleanupMetadataAndUnlock(boolean exceptionThrown, String metadataLocation) { + @VisibleForTesting + void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation) { try { - if (exceptionThrown) { + if (commitStatus == CommitStatus.FAILURE) { // if anything went wrong, clean up the uncommitted metadata file io().deleteFile(metadataLocation); } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 157d437..d8e6e93 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -32,10 +32,14 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; + public abstract class BaseMetastoreTableOperations implements TableOperations { private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class); @@ -46,6 +50,8 @@ public abstract class BaseMetastoreTableOperations implements TableOperations { private static final String METADATA_FOLDER_NAME = "metadata"; + private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000; + private TableMetadata currentMetadata = null; private String currentMetadataLocation = null; private boolean shouldRefresh = true; @@ -245,6 +251,55 @@ public abstract class BaseMetastoreTableOperations implements TableOperations { }; } + protected enum CommitStatus { + FAILURE, + SUCCESS, + UNKNOWN + } + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we were attempting + * to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has + * failed but are not proof that this is the case. Past locations must also be searched on the chance that a second + * committer was able to successfully commit on top of our commit. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { + int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS, + COMMIT_NUM_STATUS_CHECKS_DEFAULT); + + AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN); + + Tasks.foreach(newMetadataLocation) + .retry(maxAttempts) + .suppressFailureWhenFinished() + .exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0) + .onFailure((location, checkException) -> + LOG.error("Cannot check if commit to {} exists.", tableName(), checkException)) + .run(location -> { + TableMetadata metadata = refresh(); + String currentMetadataFileLocation = metadata.metadataFileLocation(); + boolean commitSuccess = currentMetadataFileLocation.equals(newMetadataLocation) || + metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation)); + if (commitSuccess) { + LOG.info("Commit status check: Commit to {} of {} succeeded", tableName(), newMetadataLocation); + status.set(CommitStatus.SUCCESS); + } else { + LOG.info("Commit status check: Commit to {} of {} failed", tableName(), newMetadataLocation); + status.set(CommitStatus.FAILURE); + } + }); + + if (status.get() == CommitStatus.UNKNOWN) { + LOG.error("Cannot determine commit state to {}. Failed during checking {} times. " + + "Treating commit state as unknown.", tableName(), maxAttempts); + } + return status.get(); + } + private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { String codecName = meta.property( TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 04f42ac..db9fe09 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -62,15 +62,11 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; - /** * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to * avoid code duplication between this class and Metacat Tables. @@ -78,8 +74,6 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAUL public class HiveTableOperations extends BaseMetastoreTableOperations { private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class); - private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000; - private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms"; private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms"; private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; @@ -99,12 +93,6 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { } } - private enum CommitStatus { - FAILURE, - SUCCESS, - UNKNOWN - } - private final String fullName; private final String database; private final String tableName; @@ -234,7 +222,6 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { throw new CommitStateUnknownException(persistFailure); } } - } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); @@ -256,50 +243,6 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { } } - /** - * Attempt to load the table and see if any current or past metadata location matches the one we were attempting - * to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has - * failed but are not proof that this is the case. Past locations must also be searched on the chance that a second - * committer was able to successfully commit on top of our commit. - * - * @param newMetadataLocation the path of the new commit file - * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown - */ - private CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS, - COMMIT_NUM_STATUS_CHECKS_DEFAULT); - - AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN); - - Tasks.foreach(newMetadataLocation) - .retry(maxAttempts) - .suppressFailureWhenFinished() - .exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0) - .onFailure((location, checkException) -> - LOG.error("Cannot check if commit to {}.{} exists.", database, tableName, checkException)) - .run(location -> { - TableMetadata metadata = refresh(); - String currentMetadataLocation = metadata.metadataFileLocation(); - boolean commitSuccess = currentMetadataLocation.equals(newMetadataLocation) || - metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation)); - if (commitSuccess) { - LOG.info("Commit status check: Commit to {}.{} of {} succeeded", database, tableName, newMetadataLocation); - status.set(CommitStatus.SUCCESS); - } else { - LOG.info("Commit status check: Commit to {}.{} of {} failed", database, tableName, newMetadataLocation); - status.set(CommitStatus.FAILURE); - } - }); - - if (status.get() == CommitStatus.UNKNOWN) { - LOG.error("Cannot determine commit state to {}.{}. Failed during checking {} times. " + - "Treating commit state as unknown.", - database, tableName, maxAttempts); - } - return status.get(); - } - @VisibleForTesting void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException { if (updateHiveTable) {