This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 56c5034  [hotfix][table] Expose table.exec.rank.topn-cache-size config 
option
56c5034 is described below

commit 56c5034fdf0f0b5a26c9a29e5bfb29386875b834
Author: Marios Trivyzas <[email protected]>
AuthorDate: Thu Feb 3 12:09:44 2022 +0200

    [hotfix][table] Expose table.exec.rank.topn-cache-size config option
    
    `table.exec.topn.cache-size` config option was previously defined inside
    `StreamExecRank` and still marked as experimental. Move it to
    `ExecutionConfigOptions` and generate the corresponding docs.
    
    This closes #18540.
---
 .../generated/execution_config_configuration.html        |  6 ++++++
 .../52c5c6a1-204f-462d-9efa-4ffcb100fb4d                 |  1 -
 .../flink/table/api/config/ExecutionConfigOptions.java   | 12 ++++++++++++
 .../planner/plan/nodes/exec/stream/StreamExecRank.java   | 16 ++--------------
 4 files changed, 20 insertions(+), 15 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 3d3163d..bbf913e 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -47,6 +47,12 @@ By default no operator is disabled.</td>
             <td>The maximum number of input records can be buffered for 
MiniBatch. MiniBatch is an optimization to buffer input records to reduce state 
access. MiniBatch is triggered with the allowed latency interval and when the 
maximum number of buffered records reached. NOTE: MiniBatch only works for 
non-windowed aggregations currently. If table.exec.mini-batch.enabled is set 
true, its value must be positive.</td>
         </tr>
         <tr>
+            <td><h5>table.exec.rank.topn-cache-size</h5><br> <span 
class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Long</td>
+            <td>Rank operators have a cache which caches partial state 
contents to reduce state access. Cache size is the number of records in each 
ranking task.</td>
+        </tr>
+        <tr>
             <td><h5>table.exec.resource.default-parallelism</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>Integer</td>
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/52c5c6a1-204f-462d-9efa-4ffcb100fb4d
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/52c5c6a1-204f-462d-9efa-4ffcb100fb4d
index 56dcfaa..75de48d 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/52c5c6a1-204f-462d-9efa-4ffcb100fb4d
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/52c5c6a1-204f-462d-9efa-4ffcb100fb4d
@@ -1,4 +1,3 @@
 Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES>
 is not declared in classes that have simple name ending with 'Options' in 
(StreamExecDeduplicate.java:0) and Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES>
 is not declared in org.apache.flink.table.factories.FactoryUtil in 
(StreamExecDeduplicate.java:0)
 Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate.TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE>
 is not declared in classes that have simple name ending with 'Options' in 
(StreamExecDeduplicate.java:0) and Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate.TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE>
 is not declared in org.apache.flink.table.factories.FactoryUtil in 
(StreamExecDeduplicate.java:0)
-Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank.TABLE_EXEC_TOPN_CACHE_SIZE>
 is not declared in classes that have simple name ending with 'Options' in 
(StreamExecRank.java:0) and Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank.TABLE_EXEC_TOPN_CACHE_SIZE>
 is not declared in org.apache.flink.table.factories.FactoryUtil in 
(StreamExecRank.java:0)
 Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED>
 is not declared in classes that have simple name ending with 'Options' in 
(StreamExecSort.java:0) and Field 
<org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED>
 is not declared in org.apache.flink.table.factories.FactoryUtil in 
(StreamExecSort.java:0)
\ No newline at end of file
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 43a9b46..22f39cc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -414,6 +415,17 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following 
the legacy behaviour "
                                     + "or the new one that introduces various 
fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long> TABLE_EXEC_RANK_TOPN_CACHE_SIZE =
+            ConfigOptions.key("table.exec.rank.topn-cache-size")
+                    .longType()
+                    .defaultValue(10000L)
+                    .withDeprecatedKeys("table.exec.topn-cache-size")
+                    .withDescription(
+                            "Rank operators have a cache which caches partial 
state contents "
+                                    + "to reduce state access. Cache size is 
the number of records "
+                                    + "in each ranking task.");
+
     // 
------------------------------------------------------------------------------------------
     // Enum option types
     // 
------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
index d702d4d..86efb09 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
@@ -19,11 +19,8 @@
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.FlinkVersion;
-import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.TableConfig;
@@ -70,6 +67,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RANK_TOPN_CACHE_SIZE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -82,16 +80,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class StreamExecRank extends ExecNodeBase<RowData>
         implements StreamExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
-    // It is a experimental config, will may be removed later.
-    @Experimental
-    public static final ConfigOption<Long> TABLE_EXEC_TOPN_CACHE_SIZE =
-            ConfigOptions.key("table.exec.topn.cache-size")
-                    .longType()
-                    .defaultValue(10000L)
-                    .withDescription(
-                            "TopN operator has a cache which caches partial 
state contents to reduce"
-                                    + " state access. Cache size is the number 
of records in each TopN task.");
-
     public static final String FIELD_NAME_RANK_TYPE = "rankType";
     public static final String FIELD_NAME_PARTITION_SPEC = "partition";
     public static final String FIELD_NAME_SORT_SPEC = "orderBy";
@@ -216,7 +204,7 @@ public class StreamExecRank extends ExecNodeBase<RowData>
                         "StreamExecSortComparator",
                         RowType.of(sortSpec.getFieldTypes(inputType)),
                         sortSpecInSortKey);
-        long cacheSize = 
tableConfig.getConfiguration().getLong(TABLE_EXEC_TOPN_CACHE_SIZE);
+        long cacheSize = 
tableConfig.getConfiguration().getLong(TABLE_EXEC_RANK_TOPN_CACHE_SIZE);
         StateTtlConfig ttlConfig =
                 
StateConfigUtil.createTtlConfig(tableConfig.getIdleStateRetention().toMillis());
 

Reply via email to