This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 831272ea52 Hive: Set commit state as Unknown before throwing
CommitStateUnknownException (#7931) (#8029)
831272ea52 is described below
commit 831272ea5253833b8ea3359a445b7e3b5d1f6287
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Jul 10 18:57:04 2023 +0200
Hive: Set commit state as Unknown before throwing
CommitStateUnknownException (#7931) (#8029)
---
.../apache/iceberg/hive/HiveTableOperations.java | 1 +
.../org/apache/iceberg/hive/TestHiveCommits.java | 43 ++++++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 2723353741..977aa170cb 100644
---
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -250,6 +250,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
commitStatus = CommitStatus.SUCCESS;
} catch (LockException le) {
+ commitStatus = CommitStatus.UNKNOWN;
throw new CommitStateUnknownException(
"Failed to heartbeat for hive lock while "
+ "committing changes. This can lead to a concurrent commit
attempt be able to overwrite this commit. "
diff --git
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index 887235a4ed..6ae39d70ca 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -352,6 +352,49 @@ public class TestHiveCommits extends HiveTableBaseTest {
Assert.assertEquals("New metadata files should not exist", 2,
metadataFileCount(ops.current()));
}
+ @Test
+ public void testLockExceptionUnknownSuccessCommit() throws TException,
InterruptedException {
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+ HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations)
table).operations();
+
+ TableMetadata metadataV1 = ops.current();
+
+ table.updateSchema().addColumn("n", Types.IntegerType.get()).commit();
+
+ ops.refresh();
+
+ TableMetadata metadataV2 = ops.current();
+
+
Assertions.assertThat(ops.current().schema().columns().size()).isEqualTo(2);
+
+ HiveTableOperations spyOps = spy(ops);
+
+ // Simulate a communication error after a successful commit
+ doAnswer(
+ i -> {
+ org.apache.hadoop.hive.metastore.api.Table tbl =
+ i.getArgument(0,
org.apache.hadoop.hive.metastore.api.Table.class);
+ String location = i.getArgument(2, String.class);
+ ops.persistTable(tbl, true, location);
+ throw new LockException("Datacenter on fire");
+ })
+ .when(spyOps)
+ .persistTable(any(), anyBoolean(), any());
+
+ Assertions.assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1))
+ .hasMessageContaining("Failed to heartbeat for hive lock while")
+ .isInstanceOf(CommitStateUnknownException.class);
+
+ ops.refresh();
+
+ Assertions.assertThat(ops.current().location())
+ .as("Current metadata should have changed to metadata V1")
+ .isEqualTo(metadataV1.location());
+ Assertions.assertThat(metadataFileExists(ops.current()))
+ .as("Current metadata file should still exist")
+ .isTrue();
+ }
+
private void commitAndThrowException(
HiveTableOperations realOperations, HiveTableOperations spyOperations)
throws TException, InterruptedException {