This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 12bd06e6e1 [hotfix] Try to fix unstable hive clone action tests (#5641)
12bd06e6e1 is described below
commit 12bd06e6e13612c36ba9a93c294523772f927b19
Author: yuzelin <[email protected]>
AuthorDate: Wed May 21 13:35:51 2025 +0800
[hotfix] Try to fix unstable hive clone action tests (#5641)
---
.../paimon/hive/procedure/CloneActionITCase.java | 275 ++++++++++++---------
.../paimon/hudi/CloneActionForHudiITCase.java | 112 +++++----
2 files changed, 215 insertions(+), 172 deletions(-)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 9e8d6427cb..dcb82ac125 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -28,18 +28,19 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -57,36 +58,41 @@ public class CloneActionITCase extends ActionITCaseBase {
private static final int PORT = 9088;
- @BeforeEach
- public void beforeEach() throws IOException {
- super.before();
+ @BeforeAll
+ public static void beforeAll() {
TEST_HIVE_METASTORE.start(PORT);
}
- @AfterEach
- public void afterEach() throws Exception {
- super.after();
+ @AfterAll
+ public static void afterAll() throws Exception {
TEST_HIVE_METASTORE.stop();
}
@Test
public void testMigrateOneNonPartitionedTable() throws Exception {
String format = randomFormat();
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql("CREATE TABLE hivetable (id string, id2 int, id3 int)
STORED AS " + format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING, id2 INT, id3 INT) STORED AS
%s",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");
@@ -94,9 +100,9 @@ public class CloneActionITCase extends ActionITCaseBase {
CloneAction.class,
"clone",
"--database",
- "default",
+ dbName,
"--table",
- "hivetable",
+ tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -109,13 +115,10 @@ public class CloneActionITCase extends ActionITCaseBase {
"warehouse=" + warehouse)
.run();
- List<Row> r2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.test_table").collect());
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
- List<Row> files =
- ImmutableList.copyOf(
- tEnv.executeSql("SELECT file_path FROM
test.`test_table$files`").collect());
+ List<Row> files = sql(tEnv, "SELECT file_path FROM
test.`test_table$files`");
assertThat(files).hasSize(1);
// file name should be start with data-, which is generated by uuid
@@ -134,23 +137,33 @@ public class CloneActionITCase extends ActionITCaseBase {
public void testMigrateOnePartitionedTableImpl(boolean specificFilter)
throws Exception {
String format = randomFormat();
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql(
- "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
- + format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- String whereSql = specificFilter ? "id2 = 1 OR id3 = 1" : null;
- String query = "SELECT * FROM hivetable " + (whereSql == null ? "" :
"WHERE " + whereSql);
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql(query).collect());
+ List<Row> r1 =
+ sql(
+ tEnv,
+ "SELECT * FROM %s.%s %s",
+ dbName,
+ tableName,
+ specificFilter ? "WHERE id2 = 1 OR id3 = 1" : "");
tEnv.executeSql(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
@@ -162,9 +175,9 @@ public class CloneActionITCase extends ActionITCaseBase {
Arrays.asList(
"clone",
"--database",
- "default",
+ dbName,
"--table",
- "hivetable",
+ tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -175,9 +188,9 @@ public class CloneActionITCase extends ActionITCaseBase {
"test_table",
"--target_catalog_conf",
"warehouse=" + warehouse));
- if (whereSql != null) {
+ if (specificFilter) {
args.add("--where");
- args.add(whereSql);
+ args.add("id2 = 1 OR id3 = 1");
}
createAction(CloneAction.class, args).run();
@@ -187,8 +200,7 @@ public class CloneActionITCase extends ActionITCaseBase {
assertThat(paimonTable.partitionKeys()).containsExactly("id2", "id3");
// first run, validate clone
- List<Row> r2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.test_table").collect());
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
if (specificFilter) {
@@ -206,16 +218,13 @@ public class CloneActionITCase extends ActionITCaseBase {
assertThat(manifests).noneMatch(manifest ->
manifest.numDeletedFiles() > 0);
// expect all
- r1 =
- ImmutableList.copyOf(
- tEnv.executeSql("SELECT * FROM
PAIMON_GE.`default`.hivetable")
- .collect());
- r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.test_table").collect());
+ r1 = sql(tEnv, "SELECT * FROM PAIMON_GE.%s.%s", dbName, tableName);
+ r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
} else {
// run again, validate overwrite
createAction(CloneAction.class, args).run();
- r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.test_table").collect());
+ r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
}
@@ -223,19 +232,24 @@ public class CloneActionITCase extends ActionITCaseBase {
@Test
public void testMigrateOnePartitionedTableAndFilterNoPartition() throws
Exception {
String format = randomFormat();
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql(
- "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
- + format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");
@@ -243,9 +257,9 @@ public class CloneActionITCase extends ActionITCaseBase {
Arrays.asList(
"clone",
"--database",
- "default",
+ dbName,
"--table",
- "hivetable",
+ tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -271,30 +285,37 @@ public class CloneActionITCase extends ActionITCaseBase {
@Test
public void testMigrateWholeDatabase() throws Exception {
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName1 = "hivetable1" + StringUtils.randomNumericString(10);
+ String tableName2 = "hivetable2" + StringUtils.randomNumericString(10);
+
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql("CREATE DATABASE hivedb");
- tEnv.executeSql(
- "CREATE TABLE hivedb.hivetable1 (id string, id2 int, id3 int)
STORED AS "
- + randomFormat());
- tEnv.executeSql("INSERT INTO hivedb.hivetable1 VALUES" +
data(100)).await();
- tEnv.executeSql(
- "CREATE TABLE hivedb.hivetable2 (id string) PARTITIONED BY
(id2 int, id3 int) STORED AS "
- + randomFormat());
- tEnv.executeSql("INSERT INTO hivedb.hivetable2 VALUES" +
data(100)).await();
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING, id2 INT, id3 INT) STORED AS
%s",
+ dbName,
+ tableName1,
+ randomFormat());
+ sql(tEnv, "INSERT INTO TABLE %s.%s VALUES %s", dbName, tableName1,
data(100));
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName2,
+ randomFormat());
+ sql(tEnv, "INSERT INTO TABLE %s.%s VALUES %s", dbName, tableName2,
data(100));
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- List<Row> r1 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivedb.hivetable1").collect());
- List<Row> r2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivedb.hivetable2").collect());
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName1);
+ List<Row> r2 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName2);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");
@@ -302,7 +323,7 @@ public class CloneActionITCase extends ActionITCaseBase {
CloneAction.class,
"clone",
"--database",
- "hivedb",
+ dbName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -313,10 +334,8 @@ public class CloneActionITCase extends ActionITCaseBase {
"warehouse=" + warehouse)
.run();
- List<Row> actualR1 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.hivetable1").collect());
- List<Row> actualR2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.hivetable2").collect());
+ List<Row> actualR1 = sql(tEnv, "SELECT * FROM test.%s", tableName1);
+ List<Row> actualR2 = sql(tEnv, "SELECT * FROM test.%s", tableName2);
Assertions.assertThatList(actualR1).containsExactlyInAnyOrderElementsOf(r1);
Assertions.assertThatList(actualR2).containsExactlyInAnyOrderElementsOf(r2);
@@ -324,35 +343,39 @@ public class CloneActionITCase extends ActionITCaseBase {
@Test
public void testMigrateWholeDatabaseWithFilter() throws Exception {
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName1 = "hivetable1" + StringUtils.randomNumericString(10);
+ String tableName2 = "hivetable1" + StringUtils.randomNumericString(10);
+
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql("CREATE DATABASE hivedb");
- tEnv.executeSql(
- "CREATE TABLE hivedb.hivetable1 (id string) PARTITIONED BY
(id2 int, id3 int) STORED AS "
- + randomFormat());
- tEnv.executeSql("INSERT INTO hivedb.hivetable1 VALUES" +
data(100)).await();
- tEnv.executeSql(
- "CREATE TABLE hivedb.hivetable2 (id string) PARTITIONED BY
(id2 int, id3 int) STORED AS "
- + randomFormat());
- tEnv.executeSql("INSERT INTO hivedb.hivetable2 VALUES" +
data(100)).await();
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName1,
+ randomFormat());
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName1,
data(100));
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName2,
+ randomFormat());
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName1,
data(100));
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName2,
data(100));
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- List<Row> r1 =
- ImmutableList.copyOf(
- tEnv.executeSql("SELECT * FROM hivedb.hivetable1 WHERE
id2=1 OR id3=1")
- .collect());
- List<Row> r2 =
- ImmutableList.copyOf(
- tEnv.executeSql("SELECT * FROM hivedb.hivetable2 WHERE
id2=1 OR id3=1")
- .collect());
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s WHERE id2=1 OR id3=1",
dbName, tableName1);
+ List<Row> r2 = sql(tEnv, "SELECT * FROM %s.%s WHERE id2=1 OR id3=1",
dbName, tableName2);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");
@@ -360,7 +383,7 @@ public class CloneActionITCase extends ActionITCaseBase {
CloneAction.class,
"clone",
"--database",
- "hivedb",
+ dbName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -373,10 +396,8 @@ public class CloneActionITCase extends ActionITCaseBase {
"id2=1 OR id3=1")
.run();
- List<Row> actualR1 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.hivetable1").collect());
- List<Row> actualR2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.hivetable2").collect());
+ List<Row> actualR1 = sql(tEnv, "SELECT * FROM test.%s", tableName1);
+ List<Row> actualR2 = sql(tEnv, "SELECT * FROM test.%s", tableName2);
Assertions.assertThatList(actualR1).containsExactlyInAnyOrderElementsOf(r1);
Assertions.assertThatList(actualR2).containsExactlyInAnyOrderElementsOf(r2);
@@ -385,25 +406,29 @@ public class CloneActionITCase extends ActionITCaseBase {
@Test
public void testCloneWithExistedTable() throws Exception {
String format = randomFormat();
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql(
- "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
- + format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- String query = "SELECT * FROM hivetable";
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql(query).collect());
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");
// create a paimon table with the same name
@@ -415,9 +440,9 @@ public class CloneActionITCase extends ActionITCaseBase {
Arrays.asList(
"clone",
"--database",
- "default",
+ dbName,
"--table",
- "hivetable",
+ tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -440,33 +465,34 @@ public class CloneActionITCase extends ActionITCaseBase {
Assertions.assertThat(paimonTable.partitionKeys()).containsExactly("id2",
"id3");
- List<Row> r2 =
- ImmutableList.copyOf(
- tEnv.executeSql("SELECT * FROM
test.test_table").collect());
-
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
}
@Test
public void testCloneWithNotExistedDatabase() throws Exception {
- String format = "avro";
+ String format = randomFormat();
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql(
- "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
- + format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
- String query = "SELECT * FROM hivetable";
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql(query).collect());
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
List<String> args =
@@ -474,9 +500,9 @@ public class CloneActionITCase extends ActionITCaseBase {
Arrays.asList(
"clone",
"--database",
- "default",
+ dbName,
"--table",
- "hivetable",
+ tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -494,9 +520,7 @@ public class CloneActionITCase extends ActionITCaseBase {
Assertions.assertThat(paimonTable.partitionKeys()).containsExactly("id2",
"id3");
- List<Row> r2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.test_table").collect());
-
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
@@ -581,11 +605,18 @@ public class CloneActionITCase extends ActionITCaseBase {
return formats[i];
}
- protected FileStoreTable paimonTable(
- TableEnvironment tEnv, String catalogName, Identifier table)
+ private FileStoreTable paimonTable(TableEnvironment tEnv, String
catalogName, Identifier table)
throws org.apache.paimon.catalog.Catalog.TableNotExistException {
FlinkCatalog flinkCatalog = (FlinkCatalog)
tEnv.getCatalog(catalogName).get();
Catalog catalog = flinkCatalog.catalog();
return (FileStoreTable) catalog.getTable(table);
}
+
+ private List<Row> sql(TableEnvironment tEnv, String query, Object... args)
{
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java
index decec2e136..1a952b9c9d 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hudi/CloneActionForHudiITCase.java
@@ -21,21 +21,21 @@ package org.apache.paimon.hudi;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.CloneAction;
import org.apache.paimon.hive.TestHiveMetastore;
+import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
/** Test clone Hudi table. */
public class CloneActionForHudiITCase extends ActionITCaseBase {
@@ -44,58 +44,60 @@ public class CloneActionForHudiITCase extends
ActionITCaseBase {
private static final int PORT = 9089;
- @BeforeEach
- public void beforeEach() throws IOException {
- super.before();
+ @BeforeAll
+ public static void beforeAll() {
TEST_HIVE_METASTORE.start(PORT);
}
- @AfterEach
- public void afterEach() throws Exception {
- super.after();
+ @AfterAll
+ public static void afterAll() throws Exception {
TEST_HIVE_METASTORE.stop();
}
@Test
public void testMigrateOneNonPartitionedTable() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ String dbName = "hudidb" + StringUtils.randomNumericString(10);
+ String tableName = "huditable" + StringUtils.randomNumericString(10);
- tEnv.executeSql(
- "CREATE TABLE hudi_table ("
+ sql(
+ tEnv,
+ "CREATE TABLE %s ("
+ " id STRING PRIMARY KEY NOT ENFORCED,"
+ " name STRING,"
+ " price INT"
+ ") WITH ("
+ " 'connector' = 'hudi',"
- + String.format(
- "'path' = '%s/%s/hudi_table',",
-
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
- UUID.randomUUID())
+ + " 'path' = '%s/%s',"
+ " 'table.type' = 'COPY_ON_WRITE',"
+ " 'hive_sync.enable' = 'true',"
+ " 'hive_sync.mode' = 'hms',"
- + String.format(
- "'hive_sync.metastore.uris' =
'thrift://localhost:%s',", PORT)
- + " 'hive_sync.db' = 'default',"
- + " 'hive_sync.table' = 'hudi_table'"
- + ")");
+ + " 'hive_sync.metastore.uris' =
'thrift://localhost:%s',"
+ + " 'hive_sync.db' = '%s',"
+ + " 'hive_sync.table' = '%s'"
+ + ")",
+ tableName,
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+ tableName,
+ PORT,
+ dbName,
+ tableName);
List<String> insertValues = new ArrayList<>();
for (int i = 0; i < 50; i++) {
insertValues.add(String.format("('%s', '%s', %s)", i, "A", i));
}
- tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",",
insertValues)).await();
+ sql(tEnv, "INSERT INTO %s VALUES %s", tableName, String.join(",",
insertValues));
// test pk
insertValues.clear();
for (int i = 0; i < 50; i++) {
insertValues.add(String.format("('%s', '%s', %s)", i, "B", i));
}
- tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",",
insertValues)).await();
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hudi_table").collect());
+ sql(tEnv, "INSERT INTO %s VALUES %s", tableName, String.join(",",
insertValues));
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s", tableName);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");
@@ -103,9 +105,9 @@ public class CloneActionForHudiITCase extends
ActionITCaseBase {
CloneAction.class,
"clone",
"--database",
- "default",
+ dbName,
"--table",
- "hudi_table",
+ tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -118,54 +120,57 @@ public class CloneActionForHudiITCase extends
ActionITCaseBase {
"warehouse=" + warehouse)
.run();
- List<Row> r2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.test_table").collect());
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
@Test
public void testMigrateOnePartitionedTable() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ String dbName = "hudidb" + StringUtils.randomNumericString(10);
+ String tableName = "huditable" + StringUtils.randomNumericString(10);
- tEnv.executeSql(
- "CREATE TABLE hudi_table ("
+ sql(
+ tEnv,
+ "CREATE TABLE %s ("
+ " id STRING PRIMARY KEY NOT ENFORCED,"
+ " name STRING,"
+ " pt STRING"
+ ") PARTITIONED BY (pt) WITH ("
+ " 'connector' = 'hudi',"
- + String.format(
- "'path' = '%s/%s/hudi_table',",
-
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
- UUID.randomUUID())
+ + " 'path' = '%s/%s',"
+ " 'table.type' = 'COPY_ON_WRITE',"
+ " 'hive_sync.enable' = 'true',"
+ " 'hive_sync.mode' = 'hms',"
- + String.format(
- "'hive_sync.metastore.uris' =
'thrift://localhost:%s',", PORT)
- + " 'hive_sync.db' = 'default',"
- + " 'hive_sync.table' = 'hudi_table',"
+ + " 'hive_sync.metastore.uris' =
'thrift://localhost:%s',"
+ + " 'hive_sync.db' = '%s',"
+ + " 'hive_sync.table' = '%s',"
+ " 'hive_sync.partition_fields' = 'pt',"
+ " 'hoodie.datasource.write.hive_style_partitioning'
= 'true',"
+ " 'hive_sync.partition_extractor_class' =
'org.apache.hudi.hive.HiveStylePartitionValueExtractor'"
- + ")");
+ + ")",
+ tableName,
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+ tableName,
+ PORT,
+ dbName,
+ tableName);
List<String> insertValues = new ArrayList<>();
for (int i = 0; i < 50; i++) {
insertValues.add(String.format("('%s', '%s', '%s')", i, "A",
"2025-01-01"));
}
- tEnv.executeSql("insert into hudi_table values " + String.join(",",
insertValues)).await();
+ sql(tEnv, "INSERT INTO %s VALUES %s", tableName, String.join(",",
insertValues));
// test pk
insertValues.clear();
for (int i = 0; i < 50; i++) {
insertValues.add(String.format("('%s', '%s', '%s')", i, "B",
"2025-01-01"));
}
- tEnv.executeSql("INSERT INTO hudi_table VALUES " + String.join(",",
insertValues)).await();
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hudi_table").collect());
+ sql(tEnv, "INSERT INTO %s VALUES %s", tableName, String.join(",",
insertValues));
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s", tableName);
- tEnv.executeSql(
- "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");
@@ -173,9 +178,9 @@ public class CloneActionForHudiITCase extends
ActionITCaseBase {
CloneAction.class,
"clone",
"--database",
- "default",
+ dbName,
"--table",
- "hudi_table",
+ tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
@@ -188,8 +193,15 @@ public class CloneActionForHudiITCase extends
ActionITCaseBase {
"warehouse=" + warehouse)
.run();
- List<Row> r2 =
- ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
test.test_table").collect());
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
+
+ private List<Row> sql(TableEnvironment tEnv, String query, Object... args)
{
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}