wuchong commented on code in PR #2477:
URL: https://github.com/apache/fluss/pull/2477#discussion_r2787907431


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java:
##########
@@ -378,6 +411,66 @@ public void testChangelogWithScanStartupMode() throws 
Exception {
                 .containsExactly(
                         "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
                         "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+        // 3. Test scan.startup.mode='latest' - should only read new records 
after subscription

Review Comment:
   It will be more clear to extract this test into a separate test method. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java:
##########
@@ -378,6 +411,66 @@ public void testChangelogWithScanStartupMode() throws 
Exception {
                 .containsExactly(
                         "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
                         "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+        // 3. Test scan.startup.mode='latest' - should only read new records 
after subscription
+        // Create a new source table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test_mode_latest ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+        tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test_mode_latest");
+        // Pre-populate the new source table with some existing records.
+        TableResult insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO "
+                                + tablePath.getTableName()
+                                + " SELECT * FROM startup_mode_test");
+        // Stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+        // Init env with savepoint Path
+        tEnv = initTableEnvironment(savepointPath);
+        // Recreate a table
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test_mode_latest ("
+                        + " id INT NOT NULL,"
+                        + " name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+        // Write a new record into the source table
+        tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test_mode_latest");
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(6, "v6")), false);
+
+        // Recreate a table
+        tEnv.executeSql(
+                "CREATE TABLE "

Review Comment:
   we don't need to re-create the table
   



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java:
##########
@@ -378,6 +411,66 @@ public void testChangelogWithScanStartupMode() throws 
Exception {
                 .containsExactly(
                         "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
                         "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+        // 3. Test scan.startup.mode='latest' - should only read new records 
after subscription
+        // Create a new source table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test_mode_latest ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+        tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test_mode_latest");
+        // Pre-populate the new source table with some existing records.
+        TableResult insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO "
+                                + tablePath.getTableName()
+                                + " SELECT * FROM startup_mode_test");
+        // Stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+        // Init env with savepoint Path
+        tEnv = initTableEnvironment(savepointPath);
+        // Recreate a table
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test_mode_latest ("
+                        + " id INT NOT NULL,"
+                        + " name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+        // Write a new record into the source table
+        tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test_mode_latest");
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(6, "v6")), false);
+
+        // Recreate a table
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + tablePath.getTableName()
+                        + " ("
+                        + " id INT NOT NULL,"
+                        + " name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') 
*/";
+        String queryLatest =
+                "SELECT _change_type, id, name FROM "
+                        + tablePath.getTableName()
+                        + "$changelog"
+                        + optionsLatest;
+        CloseableIterator<Row> rowIterLatest = 
tEnv.executeSql(queryLatest).collect();

Review Comment:
   rerun the query of insertResult



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java:
##########
@@ -378,6 +411,66 @@ public void testChangelogWithScanStartupMode() throws 
Exception {
                 .containsExactly(
                         "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
                         "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+        // 3. Test scan.startup.mode='latest' - should only read new records 
after subscription
+        // Create a new source table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test_mode_latest ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+        tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test_mode_latest");
+        // Pre-populate the new source table with some existing records.
+        TableResult insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO "
+                                + tablePath.getTableName()
+                                + " SELECT * FROM startup_mode_test");

Review Comment:
   Run the query with `startup_mode_test$changelog` with latest startup mode. 
So we can materilaize the latest offsets into state.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java:
##########
@@ -378,6 +411,66 @@ public void testChangelogWithScanStartupMode() throws 
Exception {
                 .containsExactly(
                         "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
                         "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+        // 3. Test scan.startup.mode='latest' - should only read new records 
after subscription
+        // Create a new source table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test_mode_latest ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+        tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test_mode_latest");
+        // Pre-populate the new source table with some existing records.
+        TableResult insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO "
+                                + tablePath.getTableName()
+                                + " SELECT * FROM startup_mode_test");
+        // Stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+        // Init env with savepoint Path
+        tEnv = initTableEnvironment(savepointPath);
+        // Recreate a table
+        tEnv.executeSql(
+                "CREATE TABLE startup_mode_test_mode_latest ("

Review Comment:
   we don't need to re-create the table, the table is already there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to