wuchong commented on a change in pull request #15709:
URL: https://github.com/apache/flink/pull/15709#discussion_r619666563



##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -223,6 +225,130 @@ public void testStreamingAppend() throws Exception {
                 });
     }
 
+    @Test(timeout = 120000)
+    public void testStreamingSinkOnTimestampLtzWatermrk() throws Exception {
+        testStreamingWriteWithTimestampLtzWatermark(this::checkSuccessFiles);
+    }
+
+    private void testStreamingWriteWithTimestampLtzWatermark(Consumer<String> 
pathConsumer)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        try {
+            tEnv.executeSql("create database db1");
+            tEnv.useDatabase("db1");
+            tEnv.executeSql(
+                    "create external table sink_table ("
+                            + " a int,"
+                            + " b string,"
+                            + " c string)"
+                            + " partitioned by (d string,e string)"
+                            + " stored as parquet TBLPROPERTIES ("
+                            + " 
'partition.time-extractor.timestamp-pattern'='$d $e:00:00',"
+                            + " 
'sink.partition-commit.trigger'='partition-time',"
+                            + " 'sink.partition-commit.delay'='1h',"
+                            + " 
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai',"
+                            + " 
'sink.partition-commit.policy.kind'='metastore,success-file',"
+                            + " 
'sink.partition-commit.success-file.name'='_MY_SUCCESS')");
+
+            // hive dialect only works with hive tables at the moment, switch 
to default dialect
+            tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+            tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+            // prepare source
+            // epoch mills 1588460400000L <=>  local timestamp 2020-05-03 
07:00:00 in Shanghai
+            // epoch mills 1588464000000L <=>  local timestamp 2020-05-03 
08:00:00 in Shanghai
+            // epoch mills 1588467600000L <=>  local timestamp 2020-05-03 
09:00:00 in Shanghai
+            // epoch mills 1588471200000L <=>  local timestamp 2020-05-03 
10:00:00 in Shanghai
+            // epoch mills 1588474800000L <=>  local timestamp 2020-05-03 
11:00:00 in Shanghai
+            List<Row> data =
+                    Arrays.asList(
+                            Row.of(1, "a", "b", "2020-05-03", "7", 
1588460400000L),
+                            Row.of(1, "a", "b", "2020-05-03", "7", 
1588460400000L),
+                            Row.of(2, "p", "q", "2020-05-03", "8", 
1588464000000L),
+                            Row.of(2, "p", "q", "2020-05-03", "8", 
1588464000000L),
+                            Row.of(3, "x", "y", "2020-05-03", "9", 
1588467600000L),
+                            Row.of(3, "x", "y", "2020-05-03", "9", 
1588467600000L),
+                            Row.of(4, "x", "y", "2020-05-03", "10", 
1588471200000L),
+                            Row.of(4, "x", "y", "2020-05-03", "10", 
1588471200000L),
+                            Row.of(5, "x", "y", "2020-05-03", "11", 
1588474800000L),
+                            Row.of(5, "x", "y", "2020-05-03", "11", 
1588474800000L));
+
+            String dataId = TestValuesTableFactory.registerData(data);
+            String sourceTableDDL =
+                    String.format(
+                            "create table my_table("
+                                    + " a INT,"
+                                    + " b STRING,"
+                                    + " c STRING,"
+                                    + " d STRING,"
+                                    + " e STRING,"
+                                    + " ts BIGINT,"
+                                    + " ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),"
+                                    + " WATERMARK FOR ts_ltz as ts_ltz"
+                                    + ") with ("
+                                    + " 'connector' = 'values',"
+                                    + " 'data-id' = '%s',"
+                                    + " 'failing-source' = 'true')",
+                            dataId);
+            tEnv.executeSql(sourceTableDDL);
+            tEnv.executeSql("insert into sink_table select a, b, c, d, e from 
my_table").await();
+
+            assertBatch(
+                    "db1.sink_table",
+                    Arrays.asList(
+                            "+I[1, a, b, 2020-05-03, 7]",
+                            "+I[1, a, b, 2020-05-03, 7]",
+                            "+I[2, p, q, 2020-05-03, 8]",
+                            "+I[2, p, q, 2020-05-03, 8]",
+                            "+I[3, x, y, 2020-05-03, 9]",
+                            "+I[3, x, y, 2020-05-03, 9]",
+                            "+I[4, x, y, 2020-05-03, 10]",
+                            "+I[4, x, y, 2020-05-03, 10]",
+                            "+I[5, x, y, 2020-05-03, 11]",
+                            "+I[5, x, y, 2020-05-03, 11]"));
+
+            // using batch table env to query.

Review comment:
       It seems this is duplicate with above `assertBatch`?

##########
File path: docs/content/docs/connectors/table/hive/hive_read_write.md
##########
@@ -382,6 +382,38 @@ SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
 
 ```
 
+```sql

Review comment:
       Please add a short description (what's the purpose) for the following 
example. The **Full Example** is not just for your new introduced config 
option, most users don't have the context. 

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -438,4 +444,43 @@ FROM kafka_table;
 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
 ```
 
+```sql

Review comment:
       Please add a short description (what's the purpose) for the following 
example. The **Full Example** is not just for your new introduced config 
option, most users don't have the context. 

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition 
commit trigger:
         <td>Duration</td>
         <td>The partition will not commit until the delay time. If it is a 
daily partition, should be '1 d', if it is a hourly partition, should be '1 
h'.</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.watermark-time-zone</h5></td>

Review comment:
       Should we call it `time-zone-of-partition-time`? I find it's difficult 
to understand why watermark has time zone semantic and how to infer it. If we 
call it time zone of partition time, then the inference can be simply forward 
rowtime time zone, e.g. if rowtime is NTZ, then it should be UTC; if rowtime is 
LTZ, then it should be session time zone. 
   
   Besides, we should also remind users
    1. this option is only take effect when `sink.partition-commit.trigger` is 
set to `partition-time`.
    2. If this option is not configured correctly, e.g. source rowtime is LTZ, 
but this config is not configured, then users may see the partition committed 
after a few hours.  

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -223,6 +225,130 @@ public void testStreamingAppend() throws Exception {
                 });
     }
 
+    @Test(timeout = 120000)
+    public void testStreamingSinkOnTimestampLtzWatermrk() throws Exception {
+        testStreamingWriteWithTimestampLtzWatermark(this::checkSuccessFiles);
+    }
+
+    private void testStreamingWriteWithTimestampLtzWatermark(Consumer<String> 
pathConsumer)

Review comment:
       This is only used by one method, is it necessary to extract this method? 

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -223,6 +225,130 @@ public void testStreamingAppend() throws Exception {
                 });
     }
 
+    @Test(timeout = 120000)
+    public void testStreamingSinkOnTimestampLtzWatermrk() throws Exception {
+        testStreamingWriteWithTimestampLtzWatermark(this::checkSuccessFiles);
+    }
+
+    private void testStreamingWriteWithTimestampLtzWatermark(Consumer<String> 
pathConsumer)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        try {
+            tEnv.executeSql("create database db1");
+            tEnv.useDatabase("db1");
+            tEnv.executeSql(
+                    "create external table sink_table ("
+                            + " a int,"
+                            + " b string,"
+                            + " c string)"
+                            + " partitioned by (d string,e string)"
+                            + " stored as parquet TBLPROPERTIES ("
+                            + " 
'partition.time-extractor.timestamp-pattern'='$d $e:00:00',"
+                            + " 
'sink.partition-commit.trigger'='partition-time',"
+                            + " 'sink.partition-commit.delay'='1h',"
+                            + " 
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai',"

Review comment:
       Unfortunately, this test case can't verify we have fixed the problem, 
because if I remove the `watermark-time-zone` config option, this test case can 
still pass. 
   
   I think we need a streaming source, and verify the sink result partition by 
partition, rather than verifying result when job finished (the result will be 
influenced by MAX watermark).
   
   

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition 
commit trigger:
         <td>Duration</td>
         <td>The partition will not commit until the delay time. If it is a 
daily partition, should be '1 d', if it is a hourly partition, should be '1 
h'.</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.watermark-time-zone</h5></td>
+        <td style="word-wrap: break-word;">0 s</td>

Review comment:
       UTC




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to