luoyuxia commented on code in PR #1663:
URL: https://github.com/apache/fluss/pull/1663#discussion_r2336532960
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -169,6 +176,67 @@ void testReadLogTableInStreamMode(boolean isPartitioned)
throws Exception {
actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception
{
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+ String resultTableName =
+ "result_table" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM,
isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
+
+ StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+ // now, start to read the log table, which will read paimon
Review Comment:
```suggestion
// now, start to read the log table to write to a fluss result table
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -169,6 +176,67 @@ void testReadLogTableInStreamMode(boolean isPartitioned)
throws Exception {
actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception
{
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+ String resultTableName =
+ "result_table" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM,
isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
+
+ StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+ // now, start to read the log table, which will read paimon
+ // may read fluss or not, depends on the log offset of paimon snapshot
+ createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
+ TableResult insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " +
resultTableName).collect();
+ assertResultsIgnoreOrder(
Review Comment:
nit
```suggestion
assertResultsExactOrder(actual, writtenRows, false)
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -169,6 +176,67 @@ void testReadLogTableInStreamMode(boolean isPartitioned)
throws Exception {
actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception
{
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+ String resultTableName =
+ "result_table" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM,
isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
+
+ StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+ // now, start to read the log table, which will read paimon
+ // may read fluss or not, depends on the log offset of paimon snapshot
+ createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
+ TableResult insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " +
resultTableName).collect();
+ assertResultsIgnoreOrder(
+ actual,
+
writtenRows.stream().map(Row::toString).collect(Collectors.toList()),
+ false);
+
+ // now, stop the job with save point
+ String savepointPath =
+ insertResult
+ .getJobClient()
+ .get()
+ .stopWithSavepoint(
+ false,
+ savepointDir.getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get();
+
+ // re buildSteamTEnv
+ streamTEnv = buildSteamTEnv(savepointPath);
+ insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ // write some log data again
+ List<Row> rows = writeRows(table1, 3, isPartitioned);
+
+ assertResultsIgnoreOrder(
Review Comment:
```suggestion
assertResultsExactOrder(actual, writtenRows, false)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]