This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c18cd48da HIVE-28373: ‎FlakyTest: TestHiveHadoopCommits‎ (#6345)
51c18cd48da is described below

commit 51c18cd48dabfcd7ba104ddfc2bfcbf8a51854ec
Author: PLASH SPEED <[email protected]>
AuthorDate: Fri Mar 13 20:51:22 2026 +0800

    HIVE-28373: ‎FlakyTest: TestHiveHadoopCommits‎ (#6345)
---
 .../iceberg/hadoop/TestHiveHadoopCommits.java      | 113 ++++++++-------------
 1 file changed, 40 insertions(+), 73 deletions(-)

diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
index c304b7c205e..c69f6bd9082 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
@@ -21,6 +21,9 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -34,6 +37,7 @@
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.util.Tasks;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -55,7 +59,7 @@ void testCommitFailedBeforeChangeVersionHint() {
     BaseTable baseTable = (BaseTable) table;
     HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
 
-    HadoopTableOperations spyOps2 = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps2 = spy(tableOperations);
     doReturn(10000).when(spyOps2).findVersionWithOutVersionHint(any());
     TableMetadata metadataV1 = spyOps2.current();
     SortOrder dataSort = 
SortOrder.builderFor(baseTable.schema()).asc("data").build();
@@ -64,7 +68,7 @@ void testCommitFailedBeforeChangeVersionHint() {
           .isInstanceOf(CommitFailedException.class)
           .hasMessageContaining("Are there other clients running in parallel 
with the current task?");
 
-    HadoopTableOperations spyOps3 = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps3 = spy(tableOperations);
     doReturn(false).when(spyOps3).nextVersionIsLatest(anyInt(), anyInt());
     assertCommitNotChangeVersion(
           baseTable,
@@ -72,7 +76,7 @@ void testCommitFailedBeforeChangeVersionHint() {
           CommitFailedException.class,
           "Are there other clients running in parallel with the current 
task?");
 
-    HadoopTableOperations spyOps4 = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps4 = spy(tableOperations);
     doThrow(new RuntimeException("FileSystem crash!"))
           .when(spyOps4)
           .renameMetaDataFileAndCheck(any(), any(), any(), anyBoolean());
@@ -85,7 +89,7 @@ void testCommitFailedAndCheckFailed() throws IOException {
     table.newFastAppend().appendFile(FILE_A).commit();
     BaseTable baseTable = (BaseTable) table;
     HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-    HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps = spy(tableOperations);
     doThrow(new IOException("FileSystem crash!"))
           .when(spyOps)
           .renameMetaDataFile(any(), any(), any());
@@ -95,13 +99,13 @@ void testCommitFailedAndCheckFailed() throws IOException {
     assertCommitNotChangeVersion(
           baseTable, spyOps, CommitStateUnknownException.class, "FileSystem 
crash!");
 
-    HadoopTableOperations spyOps2 = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps2 = spy(tableOperations);
     doThrow(new OutOfMemoryError("Java heap space"))
           .when(spyOps2)
           .renameMetaDataFile(any(), any(), any());
     assertCommitFail(baseTable, spyOps2, OutOfMemoryError.class, "Java heap 
space");
 
-    HadoopTableOperations spyOps3 = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps3 = spy(tableOperations);
     doThrow(new RuntimeException("UNKNOWN ERROR"))
           .when(spyOps3)
           .renameMetaDataFile(any(), any(), any());
@@ -114,7 +118,7 @@ void testCommitFailedAndRenameNotSuccess() throws 
IOException {
     table.newFastAppend().appendFile(FILE_A).commit();
     BaseTable baseTable = (BaseTable) table;
     HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-    HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps = spy(tableOperations);
     doThrow(new IOException("FileSystem crash!"))
           .when(spyOps)
           .renameMetaDataFile(any(), any(), any());
@@ -130,7 +134,7 @@ void testCommitFailedButActualSuccess() throws IOException {
     table.newFastAppend().appendFile(FILE_A).commit();
     BaseTable baseTable = (BaseTable) table;
     HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-    HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps = spy(tableOperations);
     doThrow(new IOException("FileSystem crash!"))
           .when(spyOps)
           .renameMetaDataFile(any(), any(), any());
@@ -174,7 +178,7 @@ void testCommitFailedAfterChangeVersionHintRepeatCommit() {
     table.newFastAppend().appendFile(FILE_A).commit();
     BaseTable baseTable = (BaseTable) table;
     HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-    HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps = spy(tableOperations);
     doThrow(new RuntimeException("FileSystem crash!"))
           .when(spyOps)
           .deleteRemovedMetadataFiles(any(), any());
@@ -204,22 +208,15 @@ void testCommitFailedAfterChangeVersionHintRepeatCommit() 
{
 
   @Test
   void testTwoClientCommitSameVersion() throws InterruptedException {
-    // In the linux environment, the JDK FileSystem interface implementation 
class is
-    // java.io.UnixFileSystem.
-    // Its behavior follows the posix protocol, which causes rename operations 
to overwrite the
-    // target file (because linux is compatible with some of the unix 
interfaces).
-    // However, linux also supports renaming without overwriting the target 
file. In addition, some
-    // other file systems such as Windows, HDFS, etc. also support renaming 
without overwriting the
-    // target file.
-    // We use the `mv -n` command to simulate the behavior of such filesystems.
     table.newFastAppend().appendFile(FILE_A).commit();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
     AtomicReference<Throwable> unexpectedException = new 
AtomicReference<>(null);
     AtomicReference<Throwable> expectedException = new AtomicReference<>(null);
     CountDownLatch countDownLatch = new CountDownLatch(2);
     BaseTable baseTable = (BaseTable) table;
     assertThat(((HadoopTableOperations) 
baseTable.operations()).findVersion()).isEqualTo(2);
-    executorService.execute(() -> {
+    final Object lock = new Object();
+    Runnable commitTask = () -> {
       try {
         HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
         HadoopTableOperations spyOps = spy(tableOperations);
@@ -232,47 +229,16 @@ void testTwoClientCommitSameVersion() throws 
InterruptedException {
         doAnswer(x -> {
           Path srcPath = x.getArgument(1);
           Path dstPath = x.getArgument(2);
-          File src = new File(srcPath.toUri());
-          File dst = new File(dstPath.toUri());
-          String srcPathStr = src.getAbsolutePath();
-          String dstPathStr = dst.getAbsolutePath();
-          String cmd = String.format("mv -n %s  %s", srcPathStr, dstPathStr);
-          Process process = Runtime.getRuntime().exec(cmd);
-          assertThat(process.waitFor()).isZero();
-          return dst.exists() && !src.exists();
-        }).when(spyOps).renameMetaDataFile(any(), any(), any());
-        TableMetadata metadataV1 = spyOps.current();
-        SortOrder dataSort = 
SortOrder.builderFor(baseTable.schema()).asc("data").build();
-        TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort);
-        spyOps.commit(metadataV1, metadataV2);
-      } catch (CommitFailedException e) {
-        expectedException.set(e);
-      } catch (Throwable e) {
-        unexpectedException.set(e);
-      }
-    });
-
-    executorService.execute(() -> {
-      try {
-        HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-        HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
-        doNothing().when(spyOps).tryLock(any(), any());
-        doAnswer(x -> {
-          countDownLatch.countDown();
-          countDownLatch.await();
-          return x.callRealMethod();
-        }).when(spyOps).renameMetaDataFileAndCheck(any(), any(), any(), 
anyBoolean());
-        doAnswer(x -> {
-          Path srcPath = x.getArgument(1);
-          Path dstPath = x.getArgument(2);
-          File src = new File(srcPath.toUri());
-          File dst = new File(dstPath.toUri());
-          String srcPathStr = src.getAbsolutePath();
-          String dstPathStr = dst.getAbsolutePath();
-          String cmd = String.format("mv -n %s  %s", srcPathStr, dstPathStr);
-          Process process = Runtime.getRuntime().exec(cmd);
-          assertThat(process.waitFor()).isZero();
-          return dst.exists() && !src.exists();
+          var src = Paths.get(srcPath.toUri());
+          var dst = Paths.get(dstPath.toUri());
+          synchronized (lock) {
+            if (Files.exists(dst)) {
+              return false;
+            } else {
+              Files.move(src, dst, StandardCopyOption.ATOMIC_MOVE);
+              return true;
+            }
+          }
         }).when(spyOps).renameMetaDataFile(any(), any(), any());
         TableMetadata metadataV1 = spyOps.current();
         SortOrder dataSort = 
SortOrder.builderFor(baseTable.schema()).asc("data").build();
@@ -283,7 +249,8 @@ void testTwoClientCommitSameVersion() throws 
InterruptedException {
       } catch (Throwable e) {
         unexpectedException.set(e);
       }
-    });
+    };
+    Tasks.range(2).executeWith(executorService).run(i -> commitTask.run());
     executorService.shutdown();
     if (!executorService.awaitTermination(610, TimeUnit.SECONDS)) {
       executorService.shutdownNow();
@@ -302,7 +269,7 @@ void testTwoClientCommitSameVersion() throws 
InterruptedException {
   void testConcurrentCommitAndRejectCommitAlreadyExistsVersion()
         throws InterruptedException {
     table.newFastAppend().appendFile(FILE_A).commit();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
     AtomicReference<Throwable> unexpectedException = new 
AtomicReference<>(null);
     AtomicInteger commitTimes = new AtomicInteger(0);
     int maxCommitTimes = 20;
@@ -312,7 +279,7 @@ void 
testConcurrentCommitAndRejectCommitAlreadyExistsVersion()
       try {
         BaseTable baseTable = (BaseTable) table;
         HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-        HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+        HadoopTableOperations spyOps = spy(tableOperations);
         doNothing().when(spyOps).tryLock(any(), any());
         doAnswer(x -> {
           countDownLatch2.countDown();
@@ -353,7 +320,7 @@ void 
testRejectCommitAlreadyExistsVersionWithUsingObjectStore()
     // memory locks. So we can use the local file system to simulate the use 
of object storage.
     table.updateProperties().set(TableProperties.OBJECT_STORE_ENABLED, 
"true").commit();
     table.newFastAppend().appendFile(FILE_A).commit();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
     AtomicReference<Throwable> unexpectedException = new 
AtomicReference<>(null);
     AtomicInteger commitTimes = new AtomicInteger(0);
     int maxCommitTimes = 20;
@@ -363,7 +330,7 @@ void 
testRejectCommitAlreadyExistsVersionWithUsingObjectStore()
       try {
         BaseTable baseTable = (BaseTable) table;
         HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-        HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+        HadoopTableOperations spyOps = spy(tableOperations);
         doAnswer(x -> {
           countDownLatch2.countDown();
           countDownLatch.await();
@@ -403,7 +370,7 @@ void testConcurrentCommitAndRejectTooOldCommit() throws 
InterruptedException {
     
table.updateProperties().set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
"2").commit();
     
table.updateProperties().set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
 "true")
           .commit();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
     AtomicReference<Throwable> unexpectedException = new 
AtomicReference<>(null);
     AtomicInteger commitTimes = new AtomicInteger(0);
     int maxCommitTimes = 20;
@@ -414,7 +381,7 @@ void testConcurrentCommitAndRejectTooOldCommit() throws 
InterruptedException {
       try {
         BaseTable baseTable = (BaseTable) table;
         HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-        HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+        HadoopTableOperations spyOps = spy(tableOperations);
         doNothing().when(spyOps).tryLock(any(), any());
         doAnswer(x -> {
           countDownLatch2.countDown();
@@ -466,7 +433,7 @@ void testRejectTooOldCommitWithUsingObjectStore() throws 
InterruptedException {
     table.updateProperties()
           .set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true")
           .commit();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
     AtomicReference<Throwable> unexpectedException = new 
AtomicReference<>(null);
     AtomicInteger commitTimes = new AtomicInteger(0);
     int maxCommitTimes = 20;
@@ -476,7 +443,7 @@ void testRejectTooOldCommitWithUsingObjectStore() throws 
InterruptedException {
       try {
         BaseTable baseTable = (BaseTable) table;
         HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-        HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+        HadoopTableOperations spyOps = spy(tableOperations);
         doNothing().when(spyOps).tryLock(any(), any());
         doAnswer(x -> {
           countDownLatch2.countDown();
@@ -521,7 +488,7 @@ void testConcurrentCommitAndRejectDirtyCommit() throws 
InterruptedException {
     table.updateProperties()
           .set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true")
           .commit();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
     AtomicReference<Throwable> unexpectedException = new 
AtomicReference<>(null);
     AtomicInteger commitTimes = new AtomicInteger(0);
     CountDownLatch countDownLatch = new CountDownLatch(5);
@@ -531,7 +498,7 @@ void testConcurrentCommitAndRejectDirtyCommit() throws 
InterruptedException {
       try {
         BaseTable baseTable = (BaseTable) table;
         HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-        HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+        HadoopTableOperations spyOps = spy(tableOperations);
         TableMetadata metadataV1 = spyOps.current();
         SortOrder dataSort = 
SortOrder.builderFor(baseTable.schema()).asc("data").build();
         TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort);
@@ -579,13 +546,13 @@ void testCleanTooOldDirtyCommit() throws 
InterruptedException {
     table.updateProperties()
           .set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true")
           .commit();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
     AtomicReference<Exception> unexpectedException = new 
AtomicReference<>(null);
     AtomicInteger commitTimes = new AtomicInteger(0);
     int maxCommitTimes = 20;
     BaseTable baseTable = (BaseTable) table;
     HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
-    HadoopTableOperations spyOps = (HadoopTableOperations) 
spy(tableOperations);
+    HadoopTableOperations spyOps = spy(tableOperations);
     CountDownLatch countDownLatch = new CountDownLatch(5);
     CountDownLatch countDownLatch2 = new CountDownLatch(1);
     AtomicReference<File> dirtyCommitFile = new AtomicReference<>(null);

Reply via email to