This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b6957f020d0 [HUDI-9205] Refactor Flink tests to avoid sleeping for
data results (#13027)
b6957f020d0 is described below
commit b6957f020d00aca9d5c83f51b8e8f0bd102ac9bd
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Mar 27 17:08:09 2025 +0800
[HUDI-9205] Refactor Flink tests to avoid sleeping for data results (#13027)
---
.../apache/hudi/sink/ITTestDataStreamWrite.java | 56 ++-----
.../apache/hudi/table/ITTestHoodieDataSource.java | 172 +++++++++++----------
.../apache/hudi/table/ITTestSchemaEvolution.java | 4 -
.../org/apache/hudi/utils/FlinkMiniCluster.java | 11 ++
.../org/apache/hudi/utils/TestConfigurations.java | 15 +-
.../utils/factory/CollectSinkTableFactory.java | 62 +++++++-
.../org/apache/hudi/storage/HoodieStorage.java | 5 +-
packaging/bundle-validation/flink/insert.sql | 2 +
packaging/bundle-validation/validate.sh | 2 +-
9 files changed, 191 insertions(+), 138 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 41912c4f84e..f0aa1a9e8c6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -41,20 +41,15 @@ import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.source.ContinuousFileSource;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -63,7 +58,6 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
@@ -167,6 +161,8 @@ public class ITTestDataStreamWrite extends TestLogger {
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ // use synchronized compaction to ensure flink job finishing with
compaction completed.
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
defaultWriteAndCheckExpected(conf, "mor_write_with_compact", 1);
@@ -186,11 +182,13 @@ public class ITTestDataStreamWrite extends TestLogger {
writeWithClusterAndCheckExpected(conf, "cow_write_with_cluster", 1,
EXPECTED);
}
- @Disabled("HUDI-9196")
@ParameterizedTest
@ValueSource(strings = {"COPY_ON_WRITE", "MERGE_ON_READ"})
public void testStreamWriteWithIndexBootstrap(String tableType) throws
Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ // use synchronized compaction to avoid sleeping for async compact.
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE, tableType);
writeAndCheckExpected(
@@ -267,26 +265,14 @@ public class ITTestDataStreamWrite extends TestLogger {
boolean isMor =
conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
DataStream<RowData> dataStream;
- if (isMor) {
- TextInputFormat format = new TextInputFormat(new Path(sourcePath));
- format.setFilesFilter(FilePathFilter.createDefaultFilter());
- TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
- format.setCharsetName("UTF-8");
-
- dataStream = execEnv
- // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
- .readFile(format, sourcePath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
- .map(JsonDeserializationFunction.getInstance(rowType))
- .setParallelism(1);
- } else {
- dataStream = execEnv
- // use continuous file source to trigger checkpoint
- .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), checkpoints))
- .name("continuous_file_source")
- .setParallelism(1)
- .map(JsonDeserializationFunction.getInstance(rowType))
- .setParallelism(4);
- }
+
+ dataStream = execEnv
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), checkpoints))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(JsonDeserializationFunction.getInstance(rowType))
+ .setParallelism(4);
if (transformer.isPresent()) {
dataStream = transformer.get().apply(dataStream);
@@ -345,20 +331,8 @@ public class ITTestDataStreamWrite extends TestLogger {
}
public void execute(StreamExecutionEnvironment execEnv, boolean isMor,
String jobName) throws Exception {
- if (isMor) {
- JobClient client = execEnv.executeAsync(jobName);
- if (client.getJobStatus().get() != JobStatus.FAILED) {
- try {
- TimeUnit.SECONDS.sleep(35); // wait long enough for the compaction
to finish
- client.cancel();
- } catch (Throwable var1) {
- // ignored
- }
- }
- } else {
- // wait for the streaming job to finish
- execEnv.execute(jobName);
- }
+ // wait for the streaming job to finish
+ execEnv.execute(jobName);
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 891f3bd1206..997d266fd4e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -47,13 +47,14 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExceptionUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -83,6 +84,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.hudi.utils.TestConfigurations.catalog;
+import static org.apache.hudi.utils.TestConfigurations.getCollectSinkDDL;
import static org.apache.hudi.utils.TestConfigurations.sql;
import static org.apache.hudi.utils.TestData.array;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
@@ -147,19 +149,19 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.READ_START_COMMIT, firstCommit)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
- List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
- List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+ List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
// specify the start commit as earliest
- List<Row> rows3 = execSelectSql(streamTableEnv,
- "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
+ List<Row> rows3 = execSelectSqlWithExpectedNum(streamTableEnv,
+ "select * from t1/*+options('read.start-commit'='earliest')*/",
TestData.DATA_SET_SOURCE_INSERT.size());
assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
}
@@ -184,18 +186,18 @@ public class ITTestHoodieDataSource {
execInsertSql(streamTableEnv, insertInto);
String firstCommit =
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
- List<Row> rows = execSelectSql(streamTableEnv,
- "select * from t1/*+options('read.start-commit'='" + firstCommit +
"')*/", 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv,
+ "select * from t1/*+options('read.start-commit'='" + firstCommit +
"')*/", TestData.DATA_SET_SOURCE_INSERT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
// insert another batch of data
execInsertSql(streamTableEnv, TestSQL.UPDATE_INSERT_T1);
- List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+ List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_CHANGELOG.size());
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_CHANGELOG);
// specify the start commit as earliest
- List<Row> rows3 = execSelectSql(streamTableEnv,
- "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
+ List<Row> rows3 = execSelectSqlWithExpectedNum(streamTableEnv,
+ "select * from t1/*+options('read.start-commit'='earliest')*/",
TestData.DATA_SET_SOURCE_MERGED.size());
assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_MERGED);
}
@@ -218,12 +220,12 @@ public class ITTestHoodieDataSource {
execInsertSql(streamTableEnv, insertInto);
// reading from the latest commit instance.
- List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
- List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+ List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}
@@ -259,7 +261,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.READ_START_COMMIT, specifiedCommit)
.end();
streamTableEnv.executeSql(createHoodieTable2);
- List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t2", TestData.DATA_SET_SOURCE_MERGED.size());
// all the data with same keys are appended within one data bucket and one
log file,
// so when consume, the same keys are merged
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED);
@@ -360,7 +362,7 @@ public class ITTestHoodieDataSource {
streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+
options('read.start-commit'='%s')*/", instant);
- List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, query,
TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}
@@ -388,7 +390,7 @@ public class ITTestHoodieDataSource {
streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+
options('read.start-commit'='%s')*/", instant);
- List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, query,
TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}
@@ -416,7 +418,7 @@ public class ITTestHoodieDataSource {
final String query = String.format("select * from t1/*+
options('read.start-commit'='%s')*/",
FlinkOptions.START_COMMIT_EARLIEST);
- List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+ List<Row> rows = execSelectSql(streamTableEnv, query);
// batch read will not lose data when cleaned clustered files.
assertRowsEquals(rows,
CollectionUtils.combine(TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT,
TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT));
@@ -502,10 +504,10 @@ public class ITTestHoodieDataSource {
+ " name varchar(20),\n"
+ " age_sum int\n"
+ ") with (\n"
- + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
+ + " 'sink-expected-row-num' = '2'"
+ ")";
- List<Row> result = execSelectSql(streamTableEnv,
- "select name, sum(age) from t1 group by name", sinkDDL, 10);
+ List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select
name, sum(age) from t1 group by name", sinkDDL);
final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
assertRowsEquals(result, expected, true);
}
@@ -556,8 +558,9 @@ public class ITTestHoodieDataSource {
+ "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
"[+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+ "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]");
+ List<Integer> expectedNums = Arrays.asList(8, 3, 2);
for (int i = 0; i < sqls.size(); i++) {
- List<Row> result = execSelectSql(streamTableEnv, sqls.get(i), 10);
+ List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv,
sqls.get(i), expectedNums.get(i));
assertRowsEquals(result, expectResults.get(i));
}
}
@@ -583,11 +586,11 @@ public class ITTestHoodieDataSource {
.end();
streamTableEnv.executeSql(hoodieTableDDL);
- List<Row> result = execSelectSql(streamTableEnv,
- "select * from t1 where `partition`='par1'", 10);
final String expected = "["
+ "+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), "
+ "+I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]";
+ List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv,
+ "select * from t1 where `partition`='par1'", 2);
assertRowsEquals(result, expected, true);
}
@@ -612,18 +615,8 @@ public class ITTestHoodieDataSource {
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
-
- List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
- final String expected = "["
- + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
- + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], "
- + "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], "
- + "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], "
- + "+I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], "
- + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
- + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
- + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]";
- assertRowsEquals(result, expected);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
@ParameterizedTest
@@ -821,13 +814,12 @@ public class ITTestHoodieDataSource {
+ "+I[id7, Bob, 44, par4, 1970-01-01T00:00:07], "
+ "+I[id8, Han, 56, par4, 1970-01-01T00:00:08]]";
- List<Row> result = execSelectSql(streamTableEnv, "select * from t1",
execMode);
-
+ List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", "t1", 8);
assertRowsEquals(result, expected);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
- List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1",
execMode);
+ List<Row> result2 = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", "t1", 8);
assertRowsEquals(result2, expected);
}
@@ -861,12 +853,12 @@ public class ITTestHoodieDataSource {
+ "+I[4, Fabian, 2021-12-04T15:16:04.400004], "
+ "+I[5, Tom, 2721-12-04T15:16:04.500005]]";
- List<Row> result = execSelectSql(streamTableEnv, "select * from t1",
execMode);
+ List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", "t1", 5);
assertRowsEquals(result, expected);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
- List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1",
execMode);
+ List<Row> result2 = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", "t1", 5);
assertRowsEquals(result2, expected);
}
@@ -959,7 +951,7 @@ public class ITTestHoodieDataSource {
execInsertSql(streamTableEnv, insertInto);
// reading from the earliest commit instance.
- List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 20);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
@@ -1162,9 +1154,9 @@ public class ITTestHoodieDataSource {
.end();
batchTableEnv.executeSql(createHoodieTable);
- // no exception expects to be thrown
- assertThrows(Exception.class,
- () -> execSelectSql(batchTableEnv, "select * from t1", 10),
+ // ValidationException expects to be thrown
+ assertThrows(ValidationException.class,
+ () -> execSelectSql(batchTableEnv, "select * from t1"),
"Exception should throw when querying non-exists table in batch mode");
// case2: empty table without data files
@@ -1221,7 +1213,10 @@ public class ITTestHoodieDataSource {
+ "+I[109, 9000, spare tire, 22.200000762939453], "
+ "+I[110, 14000, jacket, 0.5]]";
- List<Row> result = execSelectSql(streamTableEnv, "select * from
hoodie_sink", execMode);
+ List<Row> result =
+ execMode == ExecMode.STREAM
+ ? execSelectSqlWithExpectedNum(streamTableEnv, "select * from
hoodie_sink", "hoodie_sink", 10)
+ : execSelectSql(streamTableEnv, "select * from hoodie_sink");
assertRowsEquals(result, expected);
}
@@ -1554,7 +1549,6 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result2.subList(result2.size() - 2, result2.size()),
"[-U[1], +U[2]]");
}
- @Disabled("HUDI-9196")
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws
Exception {
@@ -2189,7 +2183,7 @@ public class ITTestHoodieDataSource {
.end();
batchTableEnv.executeSql(readHoodieTableDDL);
- List<Row> result = execSelectSql(batchTableEnv, "select * from t1",
ExecMode.BATCH);
+ List<Row> result = execSelectSql(batchTableEnv, "select * from t1");
assertRowsEquals(result, expected1.toString());
batchTableEnv.executeSql("drop table t1");
@@ -2211,7 +2205,7 @@ public class ITTestHoodieDataSource {
.end();
batchTableEnv.executeSql(readHoodieTableDDL);
- result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+ result = execSelectSql(batchTableEnv, "select * from t1");
assertRowsEquals(result, expected2.toString());
batchTableEnv.executeSql("drop table t1");
@@ -2233,12 +2227,11 @@ public class ITTestHoodieDataSource {
.end();
batchTableEnv.executeSql(readHoodieTableDDL);
- result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+ result = execSelectSql(batchTableEnv, "select * from t1");
assertRowsEquals(result, expected3.toString());
}
- @Disabled("HUDI-9196")
@ParameterizedTest
@MethodSource("tableTypeAndBooleanTrueFalseParams")
void testDynamicPartitionPrune(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
@@ -2262,11 +2255,11 @@ public class ITTestHoodieDataSource {
// launch a streaming query
TableResult tableResult = submitSelectSql(streamTableEnv,
"select uuid, name, age, ts, `partition` as part from t1 where
`partition` > 'par4'",
- TestConfigurations.getCollectSinkDDL("sink"));
+ TestConfigurations.getCollectSinkDDLWithExpectedNum("sink",
TestData.DATA_SET_INSERT_SEPARATE_PARTITION.size()));
// write second commit
TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
// stop the streaming query and get data
- List<Row> actualResult = fetchResult(streamTableEnv, tableResult, 10);
+ List<Row> actualResult = fetchResultWithExpectedNum(streamTableEnv,
tableResult);
assertRowsEquals(actualResult,
TestData.DATA_SET_INSERT_SEPARATE_PARTITION);
}
@@ -2326,8 +2319,7 @@ public class ITTestHoodieDataSource {
@ParameterizedTest
@MethodSource("parametersForMetaColumnsSkip")
- void testWriteWithoutMetaColumns(HoodieTableType tableType,
WriteOperationType operation)
- throws TableNotExistException, InterruptedException {
+ void testWriteWithoutMetaColumns(HoodieTableType tableType,
WriteOperationType operation) throws Exception {
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
@@ -2346,7 +2338,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
- List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
@@ -2402,7 +2394,7 @@ public class ITTestHoodieDataSource {
execInsertSql(streamTableEnv, insertInto);
// reading from the earliest
- List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
@@ -2527,44 +2519,44 @@ public class ITTestHoodieDataSource {
}
}
- private List<Row> execSelectSql(TableEnvironment tEnv, String select,
ExecMode execMode)
- throws TableNotExistException, InterruptedException {
- final String[] splits = select.split(" ");
- final String tableName = splits[splits.length - 1];
- switch (execMode) {
- case STREAM:
- return execSelectSql(tEnv, select, 10, tableName);
- case BATCH:
- return CollectionUtil.iterableToList(
- () -> tEnv.sqlQuery("select * from " +
tableName).execute().collect());
- default:
- throw new AssertionError();
- }
+ /**
+ * Use TableResult#collect() to collect results directly for bounded source.
+ */
+ private List<Row> execSelectSql(TableEnvironment tEnv, String select) {
+ return CollectionUtil.iterableToList(
+ () -> tEnv.sqlQuery(select).execute().collect());
}
- private List<Row> execSelectSql(TableEnvironment tEnv, String select, long
timeout)
- throws InterruptedException, TableNotExistException {
- return execSelectSql(tEnv, select, timeout, null);
+ /**
+ * Use CollectTableSink to collect results with expected row number.
+ */
+ private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String
select, int expectedNum) throws Exception {
+ return execSelectSqlWithExpectedNum(tEnv, select, null, expectedNum);
}
- private List<Row> execSelectSql(TableEnvironment tEnv, String select, long
timeout, String sourceTable)
- throws InterruptedException, TableNotExistException {
+ /**
+ * Use CollectTableSink to collect results with expected row number.
+ */
+ private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String
select, String sourceTable, int expectedNum)
+ throws Exception {
final String sinkDDL;
if (sourceTable != null) {
- // use the source table schema as the sink schema if the source table
was specified, .
+ // use the source table schema as the sink schema if the source table
was specified.
ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(),
sourceTable);
TableSchema schema =
tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema();
- sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema);
+ sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink",
schema, expectedNum);
} else {
- sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
+ sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink",
expectedNum);
}
- return execSelectSql(tEnv, select, sinkDDL, timeout);
+ return execSelectSqlWithExpectedNum(tEnv, select, sinkDDL);
}
- private List<Row> execSelectSql(TableEnvironment tEnv, String select, String
sinkDDL, long timeout)
- throws InterruptedException {
+ /**
+ * Use CollectTableSink to collect results with expected row number.
+ */
+ private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String
select, String sinkDDL) {
TableResult tableResult = submitSelectSql(tEnv, select, sinkDDL);
- return fetchResult(tEnv, tableResult, timeout);
+ return fetchResultWithExpectedNum(tEnv, tableResult);
}
private TableResult submitSelectSql(TableEnvironment tEnv, String select,
String sinkDDL) {
@@ -2574,14 +2566,28 @@ public class ITTestHoodieDataSource {
return tableResult;
}
- private List<Row> fetchResult(TableEnvironment tEnv, TableResult
tableResult, long timeout)
- throws InterruptedException {
- // wait for the timeout then cancels the job
+ private List<Row> execSelectSql(TableEnvironment tEnv, String select, long
timeout) throws InterruptedException {
+ TableResult tableResult = submitSelectSql(tEnv, select,
getCollectSinkDDL("sink"));
TimeUnit.SECONDS.sleep(timeout);
+ // wait for the timeout then cancels the job
tableResult.getJobClient().ifPresent(JobClient::cancel);
tEnv.executeSql("DROP TABLE IF EXISTS sink");
return CollectSinkTableFactory.RESULT.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
+
+ private List<Row> fetchResultWithExpectedNum(TableEnvironment tEnv,
TableResult tableResult) {
+ try {
+ // wait the continuous streaming query to be terminated by forced
exception with expected row number
+ // and max waiting timeout is 30s
+ tableResult.await(30, TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ ExceptionUtils.assertThrowable(e,
CollectSinkTableFactory.SuccessException.class);
+ }
+ tEnv.executeSql("DROP TABLE IF EXISTS sink");
+ return CollectSinkTableFactory.RESULT.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index c0dced5a613..5026af91a96 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -36,7 +36,6 @@ import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -81,9 +80,6 @@ public class ITTestSchemaEvolution {
public void setUp() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
tEnv = StreamTableEnvironment.create(env);
- // flink job uses child-first classloader by default, async services fired
by flink job are not guaranteed
- // to be killed right away, which then may trigger classloader leak
checking exception
-
tEnv.getConfig().getConfiguration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER,
false);
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
index bfddb43578c..da4442890e5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
@@ -18,6 +18,8 @@
package org.apache.hudi.utils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.AbstractTestBase;
@@ -41,6 +43,7 @@ public class FlinkMiniCluster implements BeforeAllCallback,
AfterAllCallback, Af
private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getDefaultConfig())
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.build());
@@ -60,6 +63,14 @@ public class FlinkMiniCluster implements BeforeAllCallback,
AfterAllCallback, Af
cleanupRunningJobs();
}
+ private static Configuration getDefaultConfig() {
+ Configuration config = new Configuration();
+ // flink job uses child-first classloader by default, async services fired
by flink job are not
+ // guaranteed to be killed right away, which then may trigger classloader
leak checking exception.
+ config.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+ return config;
+ }
+
private void cleanupRunningJobs() throws Exception {
if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) {
// do nothing if the MiniCluster is not running
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 92d44a8e7ef..ed5aef764ab 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -244,6 +244,11 @@ public class TestConfigurations {
}
public static String getCollectSinkDDL(String tableName) {
+ // set expectedRowNum as -1 to disable forced exception to terminate a
successful sink
+ return getCollectSinkDDLWithExpectedNum(tableName, -1);
+ }
+
+ public static String getCollectSinkDDLWithExpectedNum(String tableName, int
expectedRowNum) {
return "create table " + tableName + "(\n"
+ " uuid varchar(20),\n"
+ " name varchar(10),\n"
@@ -251,11 +256,16 @@ public class TestConfigurations {
+ " ts timestamp(3),\n"
+ " `partition` varchar(20)\n"
+ ") with (\n"
- + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
+ + " 'sink-expected-row-num' = '" + expectedRowNum + "'"
+ ")";
}
public static String getCollectSinkDDL(String tableName, TableSchema
tableSchema) {
+ return getCollectSinkDDLWithExpectedNum(tableName, tableSchema, -1);
+ }
+
+ public static String getCollectSinkDDLWithExpectedNum(String tableName,
TableSchema tableSchema, int expectRowNum) {
final StringBuilder builder = new StringBuilder("create table " +
tableName + "(\n");
String[] fieldNames = tableSchema.getFieldNames();
DataType[] fieldTypes = tableSchema.getFieldDataTypes();
@@ -271,7 +281,8 @@ public class TestConfigurations {
}
final String withProps = ""
+ ") with (\n"
- + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
+ + " 'sink-expected-row-num' = '" + expectRowNum + "'"
+ ")";
builder.append(withProps);
return builder.toString();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
index da0761a7542..ebce7c4ddbd 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -18,12 +18,14 @@
package org.apache.hudi.utils.factory;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.util.ChangelogModes;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -43,6 +45,7 @@ import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -59,14 +62,19 @@ public class CollectSinkTableFactory implements
DynamicTableSinkFactory {
// global results to collect and query
public static final Map<Integer, List<Row>> RESULT = new HashMap<>();
+ // options
+ private static final ConfigOption<Integer> SINK_EXPECTED_ROW_NUM =
+ ConfigOptions.key("sink-expected-row-num").intType().defaultValue(-1);
+
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
TableSchema schema = context.getCatalogTable().getSchema();
+ int expectRowNum = helper.getOptions().get(SINK_EXPECTED_ROW_NUM);
RESULT.clear();
- return new CollectTableSink(schema,
context.getObjectIdentifier().getObjectName());
+ return new CollectTableSink(schema,
context.getObjectIdentifier().getObjectName(), expectRowNum);
}
@Override
@@ -81,7 +89,7 @@ public class CollectSinkTableFactory implements
DynamicTableSinkFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Collections.emptySet();
+ return new HashSet<>(Collections.singletonList(SINK_EXPECTED_ROW_NUM));
}
//
--------------------------------------------------------------------------------------------
@@ -95,12 +103,15 @@ public class CollectSinkTableFactory implements
DynamicTableSinkFactory {
private final TableSchema schema;
private final String tableName;
+ private final int expectedRowNum;
private CollectTableSink(
TableSchema schema,
- String tableName) {
+ String tableName,
+ int expectedRowNum) {
this.schema = schema;
this.tableName = tableName;
+ this.expectedRowNum = expectedRowNum;
}
@Override
@@ -113,12 +124,16 @@ public class CollectSinkTableFactory implements
DynamicTableSinkFactory {
final DataType rowType = schema.toPhysicalRowDataType();
final RowTypeInfo rowTypeInfo = (RowTypeInfo)
TypeConversions.fromDataTypeToLegacyInfo(rowType);
DataStructureConverter converter =
context.createDataStructureConverter(schema.toPhysicalRowDataType());
- return SinkFunctionProvider.of(new CollectSinkFunction(converter,
rowTypeInfo));
+ if (expectedRowNum != -1) {
+ return SinkFunctionProvider.of(new
CollectSinkFunctionWithExpectedNum(converter, rowTypeInfo, expectedRowNum));
+ } else {
+ return SinkFunctionProvider.of(new CollectSinkFunction(converter,
rowTypeInfo));
+ }
}
@Override
public DynamicTableSink copy() {
- return new CollectTableSink(schema, tableName);
+ return new CollectTableSink(schema, tableName, expectedRowNum);
}
@Override
@@ -170,7 +185,42 @@ public class CollectSinkTableFactory implements
DynamicTableSinkFactory {
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
resultState.clear();
- resultState.addAll(RESULT.get(taskID));
+ List<Row> rows = RESULT.get(taskID);
+ if (rows != null) {
+ resultState.addAll(RESULT.get(taskID));
+ }
+ }
+ }
+
+ static class CollectSinkFunctionWithExpectedNum extends CollectSinkFunction {
+ private final int expectRowNum;
+
+ protected CollectSinkFunctionWithExpectedNum(
+ DynamicTableSink.DataStructureConverter converter,
+ RowTypeInfo rowTypeInfo,
+ int expectRowNum) {
+ super(converter, rowTypeInfo);
+ ValidationUtils.checkArgument(expectRowNum > 0, "Expected row number
should be positive.");
+ this.expectRowNum = expectRowNum;
+ }
+
+ @Override
+ public void invoke(RowData value, Context context) {
+ super.invoke(value, context);
+ if (RESULT.values().stream().mapToInt(List::size).sum() >= expectRowNum)
{
+ throw new SuccessException();
+ }
+ }
+ }
+
+ /**
+ * Exception that is thrown to terminate a program and indicate success.
+ */
+ public static class SuccessException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public SuccessException() {
+ super("Forced exception to terminate a successful sink.");
}
}
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index d11bd7fc6e9..adcbf6a630f 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -389,7 +389,10 @@ public abstract class HoodieStorage implements Closeable {
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public final boolean needCreateTempFile() {
- return StorageSchemes.HDFS.getScheme().equals(getScheme());
+ return StorageSchemes.HDFS.getScheme().equals(getScheme())
+ // Local file will be visible immediately after
LocalFileSystem#create(..), even before the output
+ // stream is closed, so temporary file is also needed for atomic file
creating with content written.
+ || StorageSchemes.FILE.getScheme().equals(getScheme());
}
/**
diff --git a/packaging/bundle-validation/flink/insert.sql
b/packaging/bundle-validation/flink/insert.sql
index 624aab5d357..b53b3f55263 100644
--- a/packaging/bundle-validation/flink/insert.sql
+++ b/packaging/bundle-validation/flink/insert.sql
@@ -17,6 +17,8 @@
* under the License.
*/
+SET 'table.dml-sync' = 'true';
+
CREATE TABLE t1
(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
diff --git a/packaging/bundle-validation/validate.sh
b/packaging/bundle-validation/validate.sh
index ebaa590e9d1..3dfa53b67f5 100755
--- a/packaging/bundle-validation/validate.sh
+++ b/packaging/bundle-validation/validate.sh
@@ -182,7 +182,7 @@ test_flink_bundle() {
change_java_runtime_version
$FLINK_HOME/bin/start-cluster.sh
$FLINK_HOME/bin/sql-client.sh -j $JARS_DIR/flink.jar -f
$WORKDIR/flink/insert.sql
- sleep 10 # for test stability
+ echo "validate flink insert finished."
$WORKDIR/flink/compact.sh $JARS_DIR/flink.jar
local EXIT_CODE=$?
$FLINK_HOME/bin/stop-cluster.sh