wuchong commented on a change in pull request #12880:
URL: https://github.com/apache/flink/pull/12880#discussion_r460697474



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -51,4 +52,24 @@ private TableConfigOptions() {}
                        .withDescription("The SQL dialect defines how to parse 
a SQL query. " +
                                        "A different SQL dialect may support 
different SQL grammar. " +
                                        "Currently supported dialects are: 
default and hive");
+
+       @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+       public static final ConfigOption<String> LOCAL_TIME_ZONE = 
key("table.local-time-zone")
+                       .stringType()
+                       // special value to decide whether to use 
ZoneId.systemDefault() in TableConfig.getLocalTimeZone()
+                       .defaultValue("System")

Review comment:
       What about to use `(system-default)` here? It will be more like a system 
default value like `(none)`.  
   Besides, please store the "(system-default)" in a static final variable. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time 
maxTime) {
                }
                minIdleStateRetentionTime = minTime.toMilliseconds();
                maxIdleStateRetentionTime = maxTime.toMilliseconds();

Review comment:
       The member variables `minIdleStateRetentionTime`, 
`maxIdleStateRetentionTime` can be removed? 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time 
maxTime) {
                }
                minIdleStateRetentionTime = minTime.toMilliseconds();
                maxIdleStateRetentionTime = maxTime.toMilliseconds();
+               
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+       }
+
+       /**
+        * Specifies a retention time interval for how long idle state, i.e., 
state which
+        * was not updated, will be retained.
+        * State will never be cleared until it was idle for less than the 
retention time and will never
+        * be kept if it was idle for more than the 1.5 * retention time.
+        *
+        * <p>When new data arrives for previously cleaned-up state, the new 
data will be handled as if it
+        * was the first data. This can result in previous results being 
overwritten.
+        *
+        * <p>Set to 0 (zero) to never clean-up the state.
+        *
+        * <p>NOTE: Cleaning up state requires additional bookkeeping which 
becomes less expensive for
+        * larger differences of minTime and maxTime. The difference between 
minTime and maxTime must be
+        * at least 5 minutes.
+        *
+        * @param duration The retention time interval for which idle state is 
retained. Set to 0 (zero) to
+        *                never clean-up the state.
+        */
+       public void setIdleStateRetentionDuration(Duration duration){
+               configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, 
duration);
        }
 
        /**
         * @return The minimum time until state which was not updated will be 
retained.
         */
        public long getMinIdleStateRetentionTime() {

Review comment:
       Add a new method `getIdleStateRetentionTime` and deprecate 
`getMinIdleStateRetentionTime` and `getMaxIdleStateRetentionTime`.

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TableConfig}.
+ */
+public class TableConfigTest {
+       @Test
+       public void testGetAndSetTableConfigOptions() throws Exception{
+               Class<?> configClass = TableConfig.class;
+               TableConfig config = new TableConfig();
+               for (TestSpec<?> spec: testSpecList){
+                       configClass.getMethod("set" + spec.fieldName, 
spec.inputClass).invoke(config, spec.inputValue);
+                       assertEquals(spec.expectedValue, 
configClass.getMethod("get" + spec.fieldName).invoke(config));
+               }
+       }
+
+       @Test
+       public void testGetAndSetIdleStateRetentionDuration(){
+               TableConfig config = new TableConfig();
+               config.setIdleStateRetentionDuration(Duration.ofHours(1));
+               assertEquals(Duration.ofHours(1).toMillis(), 
config.getMinIdleStateRetentionTime());
+               assertEquals(Duration.ofHours(1).toMillis() * 3 / 2, 
config.getMaxIdleStateRetentionTime());

Review comment:
       Please also test the string option. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -49,6 +51,15 @@
                                "tasks to advance their watermarks without the 
need to wait for " +
                                "watermarks from this source while it is 
idle.");
 
+       @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+       public static final ConfigOption<Duration> IDLE_STATE_RETENTION =
+               key("table.exec.state.ttl")
+                       .durationType()
+                       .defaultValue(Duration.ofMillis(0))
+                       .withDescription("A time-to-live (TTL) can be assigned 
to the keyed state of any type. " +
+                               "If a TTL is configured and a state value has 
expired, " +
+                               "the stored value will be cleaned up on a best 
effort basis.");
+

Review comment:
       Add description for default value, for example: `Default is 0 (zero) 
which will never clean-up state`. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -274,9 +263,10 @@ public void setMaxGeneratedCodeLength(Integer 
maxGeneratedCodeLength) {
         *
         * @param minTime The minimum time interval for which idle state is 
retained. Set to 0 (zero) to
         *                never clean-up the state.
-        * @param maxTime The maximum time interval for which idle state is 
retained. Must be at least
-        *                5 minutes greater than minTime. Set to 0 (zero) to 
never clean-up the state.
+        * @param maxTime Currently maxTime will be ignored and it will 
automatically derived from minTime
+        *                as 1.5 x minTime.
         */
+       @Deprecated

Review comment:
       Add deprecate javadoc on this method, and suggest to use which method 
instead. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -49,6 +51,15 @@
                                "tasks to advance their watermarks without the 
need to wait for " +
                                "watermarks from this source while it is 
idle.");
 
+       @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+       public static final ConfigOption<Duration> IDLE_STATE_RETENTION =
+               key("table.exec.state.ttl")
+                       .durationType()
+                       .defaultValue(Duration.ofMillis(0))
+                       .withDescription("A time-to-live (TTL) can be assigned 
to the keyed state of any type. " +
+                               "If a TTL is configured and a state value has 
expired, " +
+                               "the stored value will be cleaned up on a best 
effort basis.");
+

Review comment:
       Move this before `Source Options`? And add comment `State Options` on it?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time 
maxTime) {
                }
                minIdleStateRetentionTime = minTime.toMilliseconds();
                maxIdleStateRetentionTime = maxTime.toMilliseconds();
+               
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+       }
+
+       /**
+        * Specifies a retention time interval for how long idle state, i.e., 
state which
+        * was not updated, will be retained.
+        * State will never be cleared until it was idle for less than the 
retention time and will never
+        * be kept if it was idle for more than the 1.5 * retention time.
+        *
+        * <p>When new data arrives for previously cleaned-up state, the new 
data will be handled as if it
+        * was the first data. This can result in previous results being 
overwritten.
+        *
+        * <p>Set to 0 (zero) to never clean-up the state.
+        *
+        * <p>NOTE: Cleaning up state requires additional bookkeeping which 
becomes less expensive for
+        * larger differences of minTime and maxTime. The difference between 
minTime and maxTime must be
+        * at least 5 minutes.
+        *
+        * @param duration The retention time interval for which idle state is 
retained. Set to 0 (zero) to
+        *                never clean-up the state.
+        */
+       public void setIdleStateRetentionDuration(Duration duration){

Review comment:
       I prefer to use the same method as before `setIdleStateRetentionTime` or 
without the `Time`/`Duration` suffix. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time 
maxTime) {
                }
                minIdleStateRetentionTime = minTime.toMilliseconds();
                maxIdleStateRetentionTime = maxTime.toMilliseconds();
+               
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+       }
+
+       /**
+        * Specifies a retention time interval for how long idle state, i.e., 
state which
+        * was not updated, will be retained.
+        * State will never be cleared until it was idle for less than the 
retention time and will never
+        * be kept if it was idle for more than the 1.5 * retention time.
+        *
+        * <p>When new data arrives for previously cleaned-up state, the new 
data will be handled as if it
+        * was the first data. This can result in previous results being 
overwritten.
+        *
+        * <p>Set to 0 (zero) to never clean-up the state.
+        *
+        * <p>NOTE: Cleaning up state requires additional bookkeeping which 
becomes less expensive for
+        * larger differences of minTime and maxTime. The difference between 
minTime and maxTime must be
+        * at least 5 minutes.

Review comment:
       Remove the NOTE.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time 
maxTime) {
                }
                minIdleStateRetentionTime = minTime.toMilliseconds();
                maxIdleStateRetentionTime = maxTime.toMilliseconds();
+               
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+       }
+
+       /**
+        * Specifies a retention time interval for how long idle state, i.e., 
state which
+        * was not updated, will be retained.
+        * State will never be cleared until it was idle for less than the 
retention time and will never
+        * be kept if it was idle for more than the 1.5 * retention time.

Review comment:
       ```suggestion
         * State will never be cleared until it was idle for less than the 
retention time and will be cleared on a best effort basis after the retention 
time.
   ```

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -274,9 +263,10 @@ public void setMaxGeneratedCodeLength(Integer 
maxGeneratedCodeLength) {
         *
         * @param minTime The minimum time interval for which idle state is 
retained. Set to 0 (zero) to
         *                never clean-up the state.
-        * @param maxTime The maximum time interval for which idle state is 
retained. Must be at least
-        *                5 minutes greater than minTime. Set to 0 (zero) to 
never clean-up the state.
+        * @param maxTime Currently maxTime will be ignored and it will 
automatically derived from minTime

Review comment:
       I would suggest to keep the original description for `maxTime`, but add 
a NOTE to explain the `maxTime` is ignored now and is inferred from `minTime` 
with a 1.5 multiplier. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time 
maxTime) {
                }
                minIdleStateRetentionTime = minTime.toMilliseconds();
                maxIdleStateRetentionTime = maxTime.toMilliseconds();
+               
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+       }
+
+       /**
+        * Specifies a retention time interval for how long idle state, i.e., 
state which
+        * was not updated, will be retained.
+        * State will never be cleared until it was idle for less than the 
retention time and will never
+        * be kept if it was idle for more than the 1.5 * retention time.
+        *
+        * <p>When new data arrives for previously cleaned-up state, the new 
data will be handled as if it
+        * was the first data. This can result in previous results being 
overwritten.
+        *
+        * <p>Set to 0 (zero) to never clean-up the state.
+        *
+        * <p>NOTE: Cleaning up state requires additional bookkeeping which 
becomes less expensive for
+        * larger differences of minTime and maxTime. The difference between 
minTime and maxTime must be
+        * at least 5 minutes.
+        *
+        * @param duration The retention time interval for which idle state is 
retained. Set to 0 (zero) to
+        *                never clean-up the state.

Review comment:
       Add a `@see` comment:
   
   ```
   * @see org.apache.flink.api.common.state.StateTtlConfig
   ```
   
   

##########
File path: 
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
##########
@@ -63,7 +63,7 @@ public void testAppendStreamDoesNotOverwriteTableConfig() {
                        equalTo(minRetention.toMilliseconds()));
                assertThat(
                        tEnv.getConfig().getMaxIdleStateRetentionTime(),
-                       equalTo(maxRetention.toMilliseconds()));
+                       equalTo(minRetention.toMilliseconds() * 3 / 2));

Review comment:
       Please use the new method `setIdleStateRetentionTime` to set idle state 
in tests. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -51,4 +52,24 @@ private TableConfigOptions() {}
                        .withDescription("The SQL dialect defines how to parse 
a SQL query. " +
                                        "A different SQL dialect may support 
different SQL grammar. " +
                                        "Currently supported dialects are: 
default and hive");
+
+       @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+       public static final ConfigOption<String> LOCAL_TIME_ZONE = 
key("table.local-time-zone")
+                       .stringType()
+                       // special value to decide whether to use 
ZoneId.systemDefault() in TableConfig.getLocalTimeZone()
+                       .defaultValue("System")
+                       .withDescription("The local time zone defines current 
session time zone id. It is used when converting to/from " +
+                               "TIMESTAMP_WITH_LOCAL_TIME_ZONE. Internally, 
timestamps with local time zone are always represented in the UTC time zone. " +

Review comment:
       ```suggestion
                                "<code>TIMESTAMP WITH LOCAL TIME ZONE</code>. 
Internally, timestamps with local time zone are always represented in the UTC 
time zone. " +
   ```

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TableConfig}.
+ */
+public class TableConfigTest {
+       @Test
+       public void testGetAndSetTableConfigOptions() throws Exception{
+               Class<?> configClass = TableConfig.class;
+               TableConfig config = new TableConfig();
+               for (TestSpec<?> spec: testSpecList){
+                       configClass.getMethod("set" + spec.fieldName, 
spec.inputClass).invoke(config, spec.inputValue);
+                       assertEquals(spec.expectedValue, 
configClass.getMethod("get" + spec.fieldName).invoke(config));

Review comment:
       I think we should avoid use the Java reflection here, otherwise, it's 
hard to maintain when TableConfig is refactored again in the future. 
   
   I think we don't have much tests here, we can have a test for each 
method/option. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to