This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 15f61c2d3 [hive] Refactor codes in HiveMigrator
15f61c2d3 is described below
commit 15f61c2d3b49aa8a1fb5070d15a7a7a1cb39339e
Author: Jingsong <[email protected]>
AuthorDate: Tue Nov 28 12:08:30 2023 +0800
[hive] Refactor codes in HiveMigrator
---
.../apache/paimon/hive/migrate/HiveMigrator.java | 49 +++++++++-------------
...reTest.java => MigrateFileProcedureITCase.java} | 2 +-
...eTest.java => MigrateTableProcedureITCase.java} | 2 +-
3 files changed, 21 insertions(+), 32 deletions(-)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 3a3b8f5ae..e45d39228 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -43,18 +43,15 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.Table;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
@@ -127,23 +124,19 @@ public class HiveMigrator implements Migrator {
rollBack));
}
- Queue<CommitMessage> commitMessages = new LinkedBlockingQueue<>();
- List<Future<?>> futures = new ArrayList<>();
- tasks.forEach(
- task ->
- futures.add(
- COMMON_IO_FORK_JOIN_POOL.submit(
- () ->
commitMessages.add(task.get()))));
-
+ List<Future<CommitMessage>> futures =
+
tasks.stream().map(COMMON_IO_FORK_JOIN_POOL::submit).collect(Collectors.toList());
+ List<CommitMessage> commitMessages = new ArrayList<>();
try {
- for (Future<?> future : futures) {
- future.get();
+ for (Future<CommitMessage> future : futures) {
+ commitMessages.add(future.get());
}
} catch (Exception e) {
futures.forEach(f -> f.cancel(true));
for (Future<?> future : futures) {
// wait all task cancelled or finished
while (!future.isDone()) {
+ //noinspection BusyWait
Thread.sleep(100);
}
}
@@ -312,7 +305,7 @@ public class HiveMigrator implements Migrator {
}
/** One import task for one partition. */
- public static class MigrateTask implements Supplier<CommitMessage> {
+ public static class MigrateTask implements Callable<CommitMessage> {
private final FileIO fileIO;
private final String format;
@@ -340,21 +333,17 @@ public class HiveMigrator implements Migrator {
}
@Override
- public CommitMessage get() {
- try {
- List<DataFileMeta> fileMetas =
- FileMetaUtils.construct(
- fileIO,
- format,
- location,
- paimonTable,
- HIDDEN_PATH_FILTER,
- newDir,
- rollback);
- return FileMetaUtils.commitFile(partitionRow, fileMetas);
- } catch (IOException e) {
- throw new RuntimeException("Can't get commit message", e);
- }
+ public CommitMessage call() throws Exception {
+ List<DataFileMeta> fileMetas =
+ FileMetaUtils.construct(
+ fileIO,
+ format,
+ location,
+ paimonTable,
+ HIDDEN_PATH_FILTER,
+ newDir,
+ rollback);
+ return FileMetaUtils.commitFile(partitionRow, fileMetas);
}
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
similarity index 98%
rename from
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
rename to
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index 7fa93a9b7..18f25bd40 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -40,7 +40,7 @@ import java.util.List;
import java.util.Random;
/** Tests for {@link MigrateFileProcedure}. */
-public class MigrateFileProcedureTest extends ActionITCaseBase {
+public class MigrateFileProcedureITCase extends ActionITCaseBase {
private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
similarity index 99%
rename from
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
rename to
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index a7630a8fe..bba377b5e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -45,7 +45,7 @@ import java.util.Map;
import java.util.Random;
/** Tests for {@link MigrateFileProcedure}. */
-public class MigrateTableProcedureTest extends ActionITCaseBase {
+public class MigrateTableProcedureITCase extends ActionITCaseBase {
private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();