twalthr commented on a change in pull request #18624:
URL: https://github.com/apache/flink/pull/18624#discussion_r800521950
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -414,6 +415,36 @@
"Determines whether CAST will operate following
the legacy behaviour "
+ "or the new one that introduces various
fixes and improvements.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Boolean>
+ TABLE_EXEC_DEDUPLICATE_INSERT_AND_UPDATE_AFTER_SENSITIVE_ENABLED =
+
key("table.exec.deduplicate.insert-and-updateafter-sensitive.enabled")
Review comment:
maybe try to shorten this a bit:
`table.exec.deduplicate.insert-update-after-sensitive`
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -414,6 +415,36 @@
"Determines whether CAST will operate following
the legacy behaviour "
+ "or the new one that introduces various
fixes and improvements.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Boolean>
+ TABLE_EXEC_DEDUPLICATE_INSERT_AND_UPDATE_AFTER_SENSITIVE_ENABLED =
+
key("table.exec.deduplicate.insert-and-updateafter-sensitive.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Set whether the job (especially the
sinks) is sensitive to "
+ + "INSERT messages and
UPDATE_AFTER messages. "
+ + "If false, Flink may, some times
(e.g. deduplication "
+ + "for last row), send
UPDATE_AFTER instead of INSERT "
+ + "for the first row. If true,
Flink will guarantee to "
+ + "send INSERT for the first row,
in that case there "
+ + "will be additional overhead.
Default is true.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Boolean>
+ TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED =
+
ConfigOptions.key("table.exec.deduplicate.mini-batch.compact-changes.enabled")
Review comment:
`table.exec.deduplicate.mini-batch.compact-changes`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
##########
@@ -105,6 +105,15 @@
@ExecNodeMetadata(
name = "stream-exec-group-window-aggregate",
version = 1,
+ consumedOptions = {
+ "table.optimizer.simplify-operator-name-enabled",
+ "table.local-time-zone",
+ "table.exec.state.ttl",
+ "table.exec.mini-batch.enabled",
+ "table.exec.mini-batch.size",
+ "table.generated-code.max-length",
Review comment:
this we can also skip? code gen should not affect the topology? it
should be safe to change this without side effects?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java
##########
@@ -50,4 +51,13 @@
+ " as UTC+0 milliseconds since epoch for
simplification, this config will be used by"
+ " some temporal functions like
LOCAL_TIMESTAMP in batch job to make sure these"
+ " temporal functions has query-start
semantics.");
+
+ @Experimental
+ public static final ConfigOption<Boolean>
TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED =
+ key("table.exec.sort.non-temporal.enabled")
Review comment:
Which tests are using this? TBH this rather looks like a hidden feature?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
##########
@@ -42,6 +42,7 @@
@ExecNodeMetadata(
name = "stream-exec-calc",
version = 1,
+ consumedOptions = {"table.optimizer.simplify-operator-name-enabled"},
Review comment:
TBH we should rediscuss the key
`table.optimizer.simplify-operator-name-enabled` this has nothing to do with
optimizer. The optimization is already done at this point. I would vote to call
it `table.exec.simplify-operator-name`.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
##########
@@ -78,6 +78,13 @@
@ExecNodeMetadata(
name = "stream-exec-global-group-aggregate",
version = 1,
+ consumedOptions = {
+ "table.optimizer.simplify-operator-name-enabled",
+ "table.local-time-zone",
Review comment:
this we can skip, we can assume that every operator needs this
information esp when dealing with expressions. how do we use the time zone in
this node?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -414,6 +415,36 @@
"Determines whether CAST will operate following
the legacy behaviour "
+ "or the new one that introduces various
fixes and improvements.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Boolean>
+ TABLE_EXEC_DEDUPLICATE_INSERT_AND_UPDATE_AFTER_SENSITIVE_ENABLED =
+
key("table.exec.deduplicate.insert-and-updateafter-sensitive.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Set whether the job (especially the
sinks) is sensitive to "
+ + "INSERT messages and
UPDATE_AFTER messages. "
+ + "If false, Flink may, some times
(e.g. deduplication "
Review comment:
`some times` -> `sometimes`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
##########
@@ -57,6 +57,13 @@
@ExecNodeMetadata(
name = "stream-exec-sink",
version = 1,
+ consumedOptions = {
+ "table.optimizer.simplify-operator-name-enabled",
+ "table.exec.rank.topn-cache-size",
Review comment:
These options are clearly wrong. Where is
`table.exec.sink.not-null-enforcer` etc?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]