lsyldliu commented on code in PR #24953:
URL: https://github.com/apache/flink/pull/24953#discussion_r1650020249


##########
flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java:
##########
@@ -126,6 +134,299 @@ public void testSqlClientExecuteStatement() throws 
Exception {
                         InetAddress.getByName("localhost").getHostAddress(), 
restPort.getPort()));
     }
 
+    @Test
+    public void testMaterializedTableInContinuousMode() throws Exception {
+        Duration continuousWaitTime = Duration.ofMinutes(5);
+        Duration continuousWaitPause = Duration.ofSeconds(10);
+
+        try (GatewayController gateway = flinkResource.startSqlGateway();
+                ClusterController ignore = flinkResource.startCluster(2)) {
+
+            FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+                    initSessionWithCatalogStore(Collections.emptyMap());
+
+            gatewayRestClient.executeStatementWithResult(
+                    "CREATE TABLE streaming_source (\n"
+                            + "    `timestamp` TIMESTAMP(3),\n"
+                            + "    `user` VARCHAR,\n"
+                            + "    `type` VARCHAR\n"
+                            + " ) with ("
+                            + "   'format' = 'json',"
+                            + "   'source.monitor-interval' = '10s'"
+                            + ")");
+
+            gatewayRestClient.executeStatementWithResult(
+                    "insert into streaming_source select 
TO_TIMESTAMP('2024-06-20 00:00:00'), 'Alice', 'INFO'");
+            gatewayRestClient.executeStatementWithResult(
+                    "insert into streaming_source select 
TO_TIMESTAMP('2024-06-20 00:00:00'), 'Bob', 'ERROR'");
+
+            gatewayRestClient.executeStatementWithResult(
+                    " CREATE MATERIALIZED TABLE 
my_materialized_table_in_continuous_mode\n"
+                            + " PARTITIONED BY (ds)\n"
+                            + " with (\n"
+                            + "   'format' = 'json',\n"
+                            + "   'sink.rolling-policy.rollover-interval' = 
'10s',\n"
+                            + "   'sink.rolling-policy.check-interval' = 
'10s'\n"
+                            + "  )\n"
+                            + " FRESHNESS = INTERVAL '10' SECOND\n"
+                            + " REFRESH_MODE = CONTINUOUS\n"
+                            + " AS SELECT\n"
+                            + " DATE_FORMAT(`timestamp`, 'yyyy-MM-dd') AS 
ds,\n"
+                            + " user,\n"
+                            + " type\n"
+                            + " FROM streaming_source");
+
+            // set current session mode to batch for verify the materialized 
table
+            gatewayRestClient.executeStatementWithResult("SET 
'execution.runtime-mode' = 'batch'");
+
+            // verify the result
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        List<RowData> result =
+                                gatewayRestClient.executeStatementWithResult(
+                                        "select * from 
my_materialized_table_in_continuous_mode order by ds, user");
+                        return result.toString()
+                                .equals("[+I(2024-06-20,Alice,INFO), 
+I(2024-06-20,Bob,ERROR)]");
+                    },
+                    continuousWaitTime,
+                    continuousWaitPause,
+                    "Failed to wait for the result");
+
+            File savepointFolder = FOLDER.newFolder("savepoint");
+            // configure savepoint path
+            gatewayRestClient.executeStatementWithResult(
+                    String.format(
+                            "set 
'execution.checkpointing.savepoint-dir'='file://%s'",
+                            savepointFolder.getAbsolutePath()));
+
+            // suspend the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE 
my_materialized_table_in_continuous_mode SUSPEND");
+
+            // send more data to the source
+            gatewayRestClient.executeStatementWithResult(
+                    "insert into streaming_source select 
TO_TIMESTAMP('2024-06-20 00:00:00'), 'Charlie', 'WARN'");
+
+            // resume the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE 
my_materialized_table_in_continuous_mode RESUME");
+
+            // verify the result
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        List<RowData> result =
+                                gatewayRestClient.executeStatementWithResult(
+                                        "select * from 
my_materialized_table_in_continuous_mode order by ds, user");
+                        return result.toString()
+                                .equals(
+                                        "[+I(2024-06-20,Alice,INFO), 
+I(2024-06-20,Bob,ERROR), +I(2024-06-20,Charlie,WARN)]");
+                    },
+                    continuousWaitTime,
+                    continuousWaitPause,
+                    "Failed to wait for the result");
+        }
+    }
+
+    @Test
+    public void testMaterializedTableInFullMode() throws Exception {
+        Duration fullModeWaitTime = Duration.ofMinutes(5);
+        Duration fullModeWaitPause = Duration.ofSeconds(10);
+
+        // init session
+        try (GatewayController gateway = flinkResource.startSqlGateway();
+                ClusterController ignore = flinkResource.startCluster(2)) {
+
+            Map<String, String> sessionProperties = new HashMap<>();
+            sessionProperties.put("workflow-scheduler.type", "embedded");
+            FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+                    initSessionWithCatalogStore(sessionProperties);
+
+            gatewayRestClient.executeStatementWithResult(
+                    "CREATE TABLE batch_source (\n"
+                            + "    `timestamp` TIMESTAMP(3),\n"
+                            + "    `user` VARCHAR,\n"
+                            + "    `type` VARCHAR\n"
+                            + " ) with ("
+                            + "   'format' = 'json'"
+                            + ")");
+
+            gatewayRestClient.executeStatementWithResult(
+                    " CREATE MATERIALIZED TABLE 
my_materialized_table_in_full_mode\n"
+                            + " PARTITIONED BY (ds)\n"
+                            + " WITH (\n"
+                            + "   'partition.fields.ds.date-formatter' = 
'yyyy-MM-dd',\n"
+                            + "   'format' = 'json'\n"
+                            + " )\n"
+                            + " FRESHNESS = INTERVAL '30' SECOND\n"
+                            + " REFRESH_MODE = FULL\n"
+                            + " AS SELECT\n"
+                            + " ds,\n"
+                            + " count(*) as cnt\n"
+                            + " FROM (\n"
+                            + "   SELECT\n"
+                            + "   DATE_FORMAT(`timestamp`, 'yyyy-MM-dd') AS 
ds,\n"
+                            + "   user,\n"
+                            + " type\n"
+                            + " FROM batch_source\n"
+                            + " ) GROUP BY ds");
+
+            long systemTime = System.currentTimeMillis();
+            String todayTimestamp =
+                    formatTimestampMillis(systemTime, "yyyy-MM-dd HH:mm:ss", 
TimeZone.getDefault());
+            String yesterdayTimestamp =
+                    formatTimestampMillis(
+                            systemTime - 24 * 60 * 60 * 1000,
+                            "yyyy-MM-dd HH:mm:ss",
+                            TimeZone.getDefault());
+            String tomorrowTimestamp =
+                    formatTimestampMillis(
+                            systemTime + 24 * 60 * 60 * 1000,
+                            "yyyy-MM-dd HH:mm:ss",
+                            TimeZone.getDefault());
+            String todayDateStr = todayTimestamp.substring(0, 10);
+            String yesterdayDateStr = yesterdayTimestamp.substring(0, 10);
+            String tomorrowDateStr = tomorrowTimestamp.substring(0, 10);
+
+            // Both send date to current date, yesterday and tomorrow
+            gatewayRestClient.executeStatementWithResult(
+                    String.format(
+                            "INSERT INTO batch_source select 
TO_TIMESTAMP('%s'), 'Alice', 'INFO'",

Review Comment:
   We insert these three records in one query via values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to