luoyuxia commented on code in PR #1708:
URL: https://github.com/apache/fluss/pull/1708#discussion_r2351752711


##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -129,6 +141,114 @@ void testReadLogTableFullType(boolean isPartitioned) 
throws Exception {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" 
: "non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read iceberg
+        // may read fluss or not, depends on the log offset of iceberg snapshot
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + tableName).collect();
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+        // cancel the tiering job
+        jobClient.cancel().get();
+
+        // write some log data again
+        writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+        // query the log table again and check the data
+        // it should read both iceberg snapshot and fluss log
+        actual =
+                streamTEnv
+                        .executeSql(
+                                "select * from "
+                                        + tableName
+                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+                        .collect();
+        if (isPartitioned) {
+            // we write to a new partition to verify partition discovery
+            writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+        }
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception 
{
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName1 =
+                "restore_logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+        String resultTableName =
+                "result_table" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+        TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, 
isPartitioned, writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned);
+
+        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+        // now, start to read the log table to write to a fluss result table
+        // may read fluss or not, depends on the log offset of iceberg snapshot
+        createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
+        TableResult insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + 
resultTableName).collect();
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, writtenRows, false);
+        } else {
+            assertResultsExactOrder(actual, writtenRows, false);
+        }
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+
+        // re buildSteamTEnv
+        streamTEnv = buildSteamTEnv(savepointPath);

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to