luoyuxia commented on code in PR #1708:
URL: https://github.com/apache/fluss/pull/1708#discussion_r2351752164
##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java:
##########
@@ -54,5 +58,25 @@ public void beforeEach() {
CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
batchTEnv.executeSql("use catalog " + CATALOG_NAME);
batchTEnv.executeSql("use " + DEFAULT_DB);
+ buildSteamTEnv(null);
+ }
+
+ protected StreamTableEnvironment buildSteamTEnv(@Nullable String
savepointPath) {
Review Comment:
+1
##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -129,6 +141,114 @@ void testReadLogTableFullType(boolean isPartitioned)
throws Exception {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "stream_logTable_" + (isPartitioned ? "partitioned"
: "non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned,
writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // now, start to read the log table, which will read iceberg
+ // may read fluss or not, depends on the log offset of iceberg snapshot
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " + tableName).collect();
+ assertResultsIgnoreOrder(
+ actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+ // cancel the tiering job
+ jobClient.cancel().get();
+
+ // write some log data again
+ writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+ // query the log table again and check the data
+ // it should read both iceberg snapshot and fluss log
+ actual =
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+ .collect();
+ if (isPartitioned) {
+ // we write to a new partition to verify partition discovery
+ writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+ }
+ assertResultsIgnoreOrder(
+ actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception
{
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+ String resultTableName =
+ "result_table" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM,
isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
+
+ StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]