Copilot commented on code in PR #1708:
URL: https://github.com/apache/fluss/pull/1708#discussion_r2351635500


##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java:
##########
@@ -231,6 +254,18 @@ protected void assertReplicaStatus(TableBucket tb, long 
expectedLogEndOffset) {
                 });
     }
 
+    /**
+     * Wait until the default number of partitions is created. Return the map 
from partition id to
+     * partition name. .

Review Comment:
   Remove the extra period at the end of the comment.
   ```suggestion
        * partition name.
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -129,6 +141,114 @@ void testReadLogTableFullType(boolean isPartitioned) 
throws Exception {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" 
: "non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read iceberg
+        // may read fluss or not, depends on the log offset of iceberg snapshot
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + tableName).collect();
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+        // cancel the tiering job
+        jobClient.cancel().get();
+
+        // write some log data again
+        writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+        // query the log table again and check the data
+        // it should read both iceberg snapshot and fluss log
+        actual =
+                streamTEnv
+                        .executeSql(
+                                "select * from "
+                                        + tableName
+                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+                        .collect();
+        if (isPartitioned) {
+            // we write to a new partition to verify partition discovery
+            writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+        }
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception 
{
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName1 =
+                "restore_logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+        String resultTableName =
+                "result_table" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+        TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, 
isPartitioned, writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned);
+
+        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);

Review Comment:
   Method call has a typo: 'buildSteamTEnv' should be 'buildStreamTEnv'.



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java:
##########
@@ -54,5 +58,25 @@ public void beforeEach() {
                         CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
         batchTEnv.executeSql("use catalog " + CATALOG_NAME);
         batchTEnv.executeSql("use " + DEFAULT_DB);
+        buildSteamTEnv(null);
+    }
+
+    protected StreamTableEnvironment buildSteamTEnv(@Nullable String 
savepointPath) {

Review Comment:
   Method name has a typo: 'buildSteamTEnv' should be 'buildStreamTEnv'.
   ```suggestion
           buildStreamTEnv(null);
       }
   
       protected StreamTableEnvironment buildStreamTEnv(@Nullable String 
savepointPath) {
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java:
##########
@@ -54,5 +58,25 @@ public void beforeEach() {
                         CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
         batchTEnv.executeSql("use catalog " + CATALOG_NAME);
         batchTEnv.executeSql("use " + DEFAULT_DB);
+        buildSteamTEnv(null);
+    }
+
+    protected StreamTableEnvironment buildSteamTEnv(@Nullable String 
savepointPath) {

Review Comment:
   Method call has a typo: 'buildSteamTEnv' should be 'buildStreamTEnv'.
   ```suggestion
           buildStreamTEnv(null);
       }
   
       protected StreamTableEnvironment buildStreamTEnv(@Nullable String 
savepointPath) {
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -176,6 +302,12 @@ protected long createFullTypeLogTable(TablePath tablePath, 
int bucketNum, boolea
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
                         .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
 
+        if (lakeEnabled) {
+            tableBuilder
+                    .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                    .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        }

Review Comment:
   The datalake properties are being set twice when lakeEnabled is true. Lines 
302-303 already set these properties unconditionally, making the conditional 
block redundant.
   ```suggestion
           // Removed redundant datalake property settings; already set 
unconditionally above.
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -129,6 +141,114 @@ void testReadLogTableFullType(boolean isPartitioned) 
throws Exception {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" 
: "non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read iceberg
+        // may read fluss or not, depends on the log offset of iceberg snapshot
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + tableName).collect();
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+        // cancel the tiering job
+        jobClient.cancel().get();
+
+        // write some log data again
+        writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+        // query the log table again and check the data
+        // it should read both iceberg snapshot and fluss log
+        actual =
+                streamTEnv
+                        .executeSql(
+                                "select * from "
+                                        + tableName
+                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+                        .collect();
+        if (isPartitioned) {
+            // we write to a new partition to verify partition discovery
+            writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+        }
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception 
{
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName1 =
+                "restore_logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+        String resultTableName =
+                "result_table" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+        TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, 
isPartitioned, writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned);
+
+        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+        // now, start to read the log table to write to a fluss result table
+        // may read fluss or not, depends on the log offset of iceberg snapshot
+        createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
+        TableResult insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + 
resultTableName).collect();
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, writtenRows, false);
+        } else {
+            assertResultsExactOrder(actual, writtenRows, false);
+        }
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+
+        // re buildSteamTEnv
+        streamTEnv = buildSteamTEnv(savepointPath);

Review Comment:
   Method call has a typo: 'buildSteamTEnv' should be 'buildStreamTEnv'.
   ```suggestion
           // re buildStreamTEnv
           streamTEnv = buildStreamTEnv(savepointPath);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to