wuchong commented on a change in pull request #12880:
URL: https://github.com/apache/flink/pull/12880#discussion_r460697474
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -51,4 +52,24 @@ private TableConfigOptions() {}
.withDescription("The SQL dialect defines how to parse
a SQL query. " +
"A different SQL dialect may support
different SQL grammar. " +
"Currently supported dialects are:
default and hive");
+
+ @Documentation.TableOption(execMode =
Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<String> LOCAL_TIME_ZONE =
key("table.local-time-zone")
+ .stringType()
+ // special value to decide whether to use
ZoneId.systemDefault() in TableConfig.getLocalTimeZone()
+ .defaultValue("System")
Review comment:
What about to use `(system-default)` here? It will be more like a system
default value like `(none)`.
Besides, please store the "(system-default)" in a static final variable.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time
maxTime) {
}
minIdleStateRetentionTime = minTime.toMilliseconds();
maxIdleStateRetentionTime = maxTime.toMilliseconds();
Review comment:
The member variables `minIdleStateRetentionTime`,
`maxIdleStateRetentionTime` can be removed?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time
maxTime) {
}
minIdleStateRetentionTime = minTime.toMilliseconds();
maxIdleStateRetentionTime = maxTime.toMilliseconds();
+
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+ }
+
+ /**
+ * Specifies a retention time interval for how long idle state, i.e.,
state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the
retention time and will never
+ * be kept if it was idle for more than the 1.5 * retention time.
+ *
+ * <p>When new data arrives for previously cleaned-up state, the new
data will be handled as if it
+ * was the first data. This can result in previous results being
overwritten.
+ *
+ * <p>Set to 0 (zero) to never clean-up the state.
+ *
+ * <p>NOTE: Cleaning up state requires additional bookkeeping which
becomes less expensive for
+ * larger differences of minTime and maxTime. The difference between
minTime and maxTime must be
+ * at least 5 minutes.
+ *
+ * @param duration The retention time interval for which idle state is
retained. Set to 0 (zero) to
+ * never clean-up the state.
+ */
+ public void setIdleStateRetentionDuration(Duration duration){
+ configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION,
duration);
}
/**
* @return The minimum time until state which was not updated will be
retained.
*/
public long getMinIdleStateRetentionTime() {
Review comment:
Add a new method `getIdleStateRetentionTime` and deprecate
`getMinIdleStateRetentionTime` and `getMaxIdleStateRetentionTime`.
##########
File path:
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TableConfig}.
+ */
+public class TableConfigTest {
+ @Test
+ public void testGetAndSetTableConfigOptions() throws Exception{
+ Class<?> configClass = TableConfig.class;
+ TableConfig config = new TableConfig();
+ for (TestSpec<?> spec: testSpecList){
+ configClass.getMethod("set" + spec.fieldName,
spec.inputClass).invoke(config, spec.inputValue);
+ assertEquals(spec.expectedValue,
configClass.getMethod("get" + spec.fieldName).invoke(config));
+ }
+ }
+
+ @Test
+ public void testGetAndSetIdleStateRetentionDuration(){
+ TableConfig config = new TableConfig();
+ config.setIdleStateRetentionDuration(Duration.ofHours(1));
+ assertEquals(Duration.ofHours(1).toMillis(),
config.getMinIdleStateRetentionTime());
+ assertEquals(Duration.ofHours(1).toMillis() * 3 / 2,
config.getMaxIdleStateRetentionTime());
Review comment:
Please also test the string option.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -49,6 +51,15 @@
"tasks to advance their watermarks without the
need to wait for " +
"watermarks from this source while it is
idle.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Duration> IDLE_STATE_RETENTION =
+ key("table.exec.state.ttl")
+ .durationType()
+ .defaultValue(Duration.ofMillis(0))
+ .withDescription("A time-to-live (TTL) can be assigned
to the keyed state of any type. " +
+ "If a TTL is configured and a state value has
expired, " +
+ "the stored value will be cleaned up on a best
effort basis.");
+
Review comment:
Add description for default value, for example: `Default is 0 (zero)
which will never clean-up state`.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -274,9 +263,10 @@ public void setMaxGeneratedCodeLength(Integer
maxGeneratedCodeLength) {
*
* @param minTime The minimum time interval for which idle state is
retained. Set to 0 (zero) to
* never clean-up the state.
- * @param maxTime The maximum time interval for which idle state is
retained. Must be at least
- * 5 minutes greater than minTime. Set to 0 (zero) to
never clean-up the state.
+ * @param maxTime Currently maxTime will be ignored and it will
automatically derived from minTime
+ * as 1.5 x minTime.
*/
+ @Deprecated
Review comment:
Add deprecate javadoc on this method, and suggest to use which method
instead.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -49,6 +51,15 @@
"tasks to advance their watermarks without the
need to wait for " +
"watermarks from this source while it is
idle.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Duration> IDLE_STATE_RETENTION =
+ key("table.exec.state.ttl")
+ .durationType()
+ .defaultValue(Duration.ofMillis(0))
+ .withDescription("A time-to-live (TTL) can be assigned
to the keyed state of any type. " +
+ "If a TTL is configured and a state value has
expired, " +
+ "the stored value will be cleaned up on a best
effort basis.");
+
Review comment:
Move this before `Source Options`? And add comment `State Options` on it?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time
maxTime) {
}
minIdleStateRetentionTime = minTime.toMilliseconds();
maxIdleStateRetentionTime = maxTime.toMilliseconds();
+
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+ }
+
+ /**
+ * Specifies a retention time interval for how long idle state, i.e.,
state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the
retention time and will never
+ * be kept if it was idle for more than the 1.5 * retention time.
+ *
+ * <p>When new data arrives for previously cleaned-up state, the new
data will be handled as if it
+ * was the first data. This can result in previous results being
overwritten.
+ *
+ * <p>Set to 0 (zero) to never clean-up the state.
+ *
+ * <p>NOTE: Cleaning up state requires additional bookkeeping which
becomes less expensive for
+ * larger differences of minTime and maxTime. The difference between
minTime and maxTime must be
+ * at least 5 minutes.
+ *
+ * @param duration The retention time interval for which idle state is
retained. Set to 0 (zero) to
+ * never clean-up the state.
+ */
+ public void setIdleStateRetentionDuration(Duration duration){
Review comment:
I prefer to use the same method as before `setIdleStateRetentionTime` or
without the `Time`/`Duration` suffix.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time
maxTime) {
}
minIdleStateRetentionTime = minTime.toMilliseconds();
maxIdleStateRetentionTime = maxTime.toMilliseconds();
+
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+ }
+
+ /**
+ * Specifies a retention time interval for how long idle state, i.e.,
state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the
retention time and will never
+ * be kept if it was idle for more than the 1.5 * retention time.
+ *
+ * <p>When new data arrives for previously cleaned-up state, the new
data will be handled as if it
+ * was the first data. This can result in previous results being
overwritten.
+ *
+ * <p>Set to 0 (zero) to never clean-up the state.
+ *
+ * <p>NOTE: Cleaning up state requires additional bookkeeping which
becomes less expensive for
+ * larger differences of minTime and maxTime. The difference between
minTime and maxTime must be
+ * at least 5 minutes.
Review comment:
Remove the NOTE.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time
maxTime) {
}
minIdleStateRetentionTime = minTime.toMilliseconds();
maxIdleStateRetentionTime = maxTime.toMilliseconds();
+
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+ }
+
+ /**
+ * Specifies a retention time interval for how long idle state, i.e.,
state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the
retention time and will never
+ * be kept if it was idle for more than the 1.5 * retention time.
Review comment:
```suggestion
* State will never be cleared until it was idle for less than the
retention time and will be cleared on a best effort basis after the retention
time.
```
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -274,9 +263,10 @@ public void setMaxGeneratedCodeLength(Integer
maxGeneratedCodeLength) {
*
* @param minTime The minimum time interval for which idle state is
retained. Set to 0 (zero) to
* never clean-up the state.
- * @param maxTime The maximum time interval for which idle state is
retained. Must be at least
- * 5 minutes greater than minTime. Set to 0 (zero) to
never clean-up the state.
+ * @param maxTime Currently maxTime will be ignored and it will
automatically derived from minTime
Review comment:
I would suggest to keep the original description for `maxTime`, but add
a NOTE to explain the `maxTime` is ignored now and is inferred from `minTime`
with a 1.5 multiplier.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time
maxTime) {
}
minIdleStateRetentionTime = minTime.toMilliseconds();
maxIdleStateRetentionTime = maxTime.toMilliseconds();
+
setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+ }
+
+ /**
+ * Specifies a retention time interval for how long idle state, i.e.,
state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the
retention time and will never
+ * be kept if it was idle for more than the 1.5 * retention time.
+ *
+ * <p>When new data arrives for previously cleaned-up state, the new
data will be handled as if it
+ * was the first data. This can result in previous results being
overwritten.
+ *
+ * <p>Set to 0 (zero) to never clean-up the state.
+ *
+ * <p>NOTE: Cleaning up state requires additional bookkeeping which
becomes less expensive for
+ * larger differences of minTime and maxTime. The difference between
minTime and maxTime must be
+ * at least 5 minutes.
+ *
+ * @param duration The retention time interval for which idle state is
retained. Set to 0 (zero) to
+ * never clean-up the state.
Review comment:
Add a `@see` comment:
```
* @see org.apache.flink.api.common.state.StateTtlConfig
```
##########
File path:
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
##########
@@ -63,7 +63,7 @@ public void testAppendStreamDoesNotOverwriteTableConfig() {
equalTo(minRetention.toMilliseconds()));
assertThat(
tEnv.getConfig().getMaxIdleStateRetentionTime(),
- equalTo(maxRetention.toMilliseconds()));
+ equalTo(minRetention.toMilliseconds() * 3 / 2));
Review comment:
Please use the new method `setIdleStateRetentionTime` to set idle state
in tests.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -51,4 +52,24 @@ private TableConfigOptions() {}
.withDescription("The SQL dialect defines how to parse
a SQL query. " +
"A different SQL dialect may support
different SQL grammar. " +
"Currently supported dialects are:
default and hive");
+
+ @Documentation.TableOption(execMode =
Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<String> LOCAL_TIME_ZONE =
key("table.local-time-zone")
+ .stringType()
+ // special value to decide whether to use
ZoneId.systemDefault() in TableConfig.getLocalTimeZone()
+ .defaultValue("System")
+ .withDescription("The local time zone defines current
session time zone id. It is used when converting to/from " +
+ "TIMESTAMP_WITH_LOCAL_TIME_ZONE. Internally,
timestamps with local time zone are always represented in the UTC time zone. " +
Review comment:
```suggestion
"<code>TIMESTAMP WITH LOCAL TIME ZONE</code>.
Internally, timestamps with local time zone are always represented in the UTC
time zone. " +
```
##########
File path:
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TableConfig}.
+ */
+public class TableConfigTest {
+ @Test
+ public void testGetAndSetTableConfigOptions() throws Exception{
+ Class<?> configClass = TableConfig.class;
+ TableConfig config = new TableConfig();
+ for (TestSpec<?> spec: testSpecList){
+ configClass.getMethod("set" + spec.fieldName,
spec.inputClass).invoke(config, spec.inputValue);
+ assertEquals(spec.expectedValue,
configClass.getMethod("get" + spec.fieldName).invoke(config));
Review comment:
I think we should avoid use the Java reflection here, otherwise, it's
hard to maintain when TableConfig is refactored again in the future.
I think we don't have much tests here, we can have a test for each
method/option.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]