This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 15f61c2d3 [hive] Refactor codes in HiveMigrator
15f61c2d3 is described below

commit 15f61c2d3b49aa8a1fb5070d15a7a7a1cb39339e
Author: Jingsong <[email protected]>
AuthorDate: Tue Nov 28 12:08:30 2023 +0800

    [hive] Refactor codes in HiveMigrator
---
 .../apache/paimon/hive/migrate/HiveMigrator.java   | 49 +++++++++-------------
 ...reTest.java => MigrateFileProcedureITCase.java} |  2 +-
 ...eTest.java => MigrateTableProcedureITCase.java} |  2 +-
 3 files changed, 21 insertions(+), 32 deletions(-)

diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 3a3b8f5ae..e45d39228 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -43,18 +43,15 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.Table;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
@@ -127,23 +124,19 @@ public class HiveMigrator implements Migrator {
                             rollBack));
         }
 
-        Queue<CommitMessage> commitMessages = new LinkedBlockingQueue<>();
-        List<Future<?>> futures = new ArrayList<>();
-        tasks.forEach(
-                task ->
-                        futures.add(
-                                COMMON_IO_FORK_JOIN_POOL.submit(
-                                        () -> 
commitMessages.add(task.get()))));
-
+        List<Future<CommitMessage>> futures =
+                
tasks.stream().map(COMMON_IO_FORK_JOIN_POOL::submit).collect(Collectors.toList());
+        List<CommitMessage> commitMessages = new ArrayList<>();
         try {
-            for (Future<?> future : futures) {
-                future.get();
+            for (Future<CommitMessage> future : futures) {
+                commitMessages.add(future.get());
             }
         } catch (Exception e) {
             futures.forEach(f -> f.cancel(true));
             for (Future<?> future : futures) {
                 // wait all task cancelled or finished
                 while (!future.isDone()) {
+                    //noinspection BusyWait
                     Thread.sleep(100);
                 }
             }
@@ -312,7 +305,7 @@ public class HiveMigrator implements Migrator {
     }
 
     /** One import task for one partition. */
-    public static class MigrateTask implements Supplier<CommitMessage> {
+    public static class MigrateTask implements Callable<CommitMessage> {
 
         private final FileIO fileIO;
         private final String format;
@@ -340,21 +333,17 @@ public class HiveMigrator implements Migrator {
         }
 
         @Override
-        public CommitMessage get() {
-            try {
-                List<DataFileMeta> fileMetas =
-                        FileMetaUtils.construct(
-                                fileIO,
-                                format,
-                                location,
-                                paimonTable,
-                                HIDDEN_PATH_FILTER,
-                                newDir,
-                                rollback);
-                return FileMetaUtils.commitFile(partitionRow, fileMetas);
-            } catch (IOException e) {
-                throw new RuntimeException("Can't get commit message", e);
-            }
+        public CommitMessage call() throws Exception {
+            List<DataFileMeta> fileMetas =
+                    FileMetaUtils.construct(
+                            fileIO,
+                            format,
+                            location,
+                            paimonTable,
+                            HIDDEN_PATH_FILTER,
+                            newDir,
+                            rollback);
+            return FileMetaUtils.commitFile(partitionRow, fileMetas);
         }
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
similarity index 98%
rename from 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
rename to 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index 7fa93a9b7..18f25bd40 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.Random;
 
 /** Tests for {@link MigrateFileProcedure}. */
-public class MigrateFileProcedureTest extends ActionITCaseBase {
+public class MigrateFileProcedureITCase extends ActionITCaseBase {
 
     private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
similarity index 99%
rename from 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
rename to 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index a7630a8fe..bba377b5e 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -45,7 +45,7 @@ import java.util.Map;
 import java.util.Random;
 
 /** Tests for {@link MigrateFileProcedure}. */
-public class MigrateTableProcedureTest extends ActionITCaseBase {
+public class MigrateTableProcedureITCase extends ActionITCaseBase {
 
     private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
 

Reply via email to