This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d4056530d2 Core: Fix lock acquisition logic in HadoopTableOperations 
rename (#9498)
d4056530d2 is described below

commit d4056530d27864adb6cf141d85c81adde46c7b28
Author: N-o-Z <[email protected]>
AuthorDate: Wed Jan 17 23:02:47 2024 +0200

    Core: Fix lock acquisition logic in HadoopTableOperations rename (#9498)
---
 .../iceberg/hadoop/HadoopTableOperations.java      | 10 ++++--
 .../apache/iceberg/hadoop/TestHadoopCommits.java   | 40 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 44936f2514..9ef2c63e26 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -360,7 +360,11 @@ public class HadoopTableOperations implements 
TableOperations {
    */
   private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
     try {
-      lockManager.acquire(dst.toString(), src.toString());
+      if (!lockManager.acquire(dst.toString(), src.toString())) {
+        throw new CommitFailedException(
+            "Failed to acquire lock on file: %s with owner: %s", dst, src);
+      }
+
       if (fs.exists(dst)) {
         throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
       }
@@ -383,7 +387,9 @@ public class HadoopTableOperations implements 
TableOperations {
       }
       throw cfe;
     } finally {
-      lockManager.release(dst.toString(), src.toString());
+      if (!lockManager.release(dst.toString(), src.toString())) {
+        LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
+      }
     }
   }
 
diff --git 
a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java 
b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
index e02b9deaee..b3ddc09c0f 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
@@ -30,18 +30,22 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.LockManager;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -451,4 +455,40 @@ public class TestHadoopCommits extends HadoopTableTestBase 
{
     Assertions.assertThat(Lists.newArrayList(tableWithHighRetries.snapshots()))
         .hasSize(threadsCount * numberOfCommitedFilesPerThread);
   }
+
+  @Test
+  public void testCommitFailedToAcquireLock() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Configuration conf = new Configuration();
+    LockManager lockManager = new NoLockManager();
+    HadoopTableOperations tableOperations =
+        new HadoopTableOperations(
+            new Path(table.location()), new HadoopFileIO(conf), conf, 
lockManager);
+    tableOperations.refresh();
+    BaseTable baseTable = (BaseTable) table;
+    TableMetadata meta2 = baseTable.operations().current();
+    Assertions.assertThatThrownBy(() -> 
tableOperations.commit(tableOperations.current(), meta2))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageStartingWith("Failed to acquire lock on file");
+  }
+
+  // Always returns false when trying to acquire
+  static class NoLockManager implements LockManager {
+
+    @Override
+    public boolean acquire(String entityId, String ownerId) {
+      return false;
+    }
+
+    @Override
+    public boolean release(String entityId, String ownerId) {
+      return false;
+    }
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void initialize(Map<String, String> properties) {}
+  }
 }

Reply via email to