fsk119 commented on a change in pull request #15332:
URL: https://github.com/apache/flink/pull/15332#discussion_r599200403



##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -153,6 +154,188 @@ public void testSetAndResetOption() throws Exception {
                         .getBoolean(OBJECT_REUSE));
     }
 
+    @Test

Review comment:
       Please also add a test about `SetWithConfigOptionAndResetWithYamlKey`.
   
   

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -153,6 +154,188 @@ public void testSetAndResetOption() throws Exception {
                         .getBoolean(OBJECT_REUSE));
     }
 
+    @Test
+    public void testSetAndResetKeyInYamlKey() throws Exception {
+        SessionContext sessionContext = createSessionContext();
+        sessionContext.set("execution.max-table-result-rows", "200000");
+        sessionContext.set("execution.result-mode", "table");
+
+        Assert.assertEquals(
+                "200000",
+                sessionContext
+                        .getExecutionContext()
+                        .getTableEnvironment()
+                        .getConfig()
+                        .getConfiguration()
+                        .toMap()
+                        .get("execution.max-table-result-rows"));

Review comment:
       Add a helper method like
   ```
       private Map<String, String> getMap(SessionContext context) {
           return context
                   .getExecutionContext()
                   .getTableEnvironment()
                   .getConfig()
                   .getConfiguration()
                   .toMap();
       }
   
   ```
   
   I think it will deduplicate a lot of codes.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -412,14 +413,44 @@ private void callClear() {
         clearTerminal();
     }
 
-    private void callReset() {
+    private void callReset(SqlCommandCall cmdCall) {
         try {
-            executor.resetSessionProperties(sessionId);
+            // reset all session properties
+            if (cmdCall.operands.length == 0) {
+                executor.resetSessionProperties(sessionId);
+                printInfo(CliStrings.MESSAGE_RESET);
+            }
+            // reset a session property
+            else {
+                String key = cmdCall.operands[0].trim();
+                executor.resetSessionProperty(sessionId, key);
+                if (YamlConfigUtils.isRemovedKey(key)) {
+                    terminal.writer()
+                            
.println(CliStrings.messageWarning(MESSAGE_SET_REMOVED_KEY).toAnsi());
+                } else {
+                    if (YamlConfigUtils.isDeprecatedKey(key)) {
+                        terminal.writer()
+                                .println(
+                                        CliStrings.messageWarning(
+                                                        String.format(
+                                                                
MESSAGE_SET_DEPRECATED_KEY,
+                                                                key,
+                                                                YamlConfigUtils
+                                                                        
.getOptionNameWithDeprecatedKey(
+                                                                               
 key)))
+                                                .toAnsi());
+                    }
+                    terminal.writer()
+                            .println(
+                                    
CliStrings.messageInfo(String.format(MESSAGE_RESET_KEY, key))
+                                            .toAnsi());
+                }
+                terminal.flush();
+            }

Review comment:
       It seems the logic is similar to the `set` part? Could you add a method 
to reuse these codes?
   
   

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/YamlConfigUtils.java
##########
@@ -170,6 +170,14 @@ public static String getOptionNameWithDeprecatedKey(String 
key) {
         return ENTRY_TO_OPTION.get(key);
     }
 
+    public static boolean isOptionKey(String key) {
+        return OPTION_TO_ENTRY.containsKey(key);
+    }

Review comment:
       We just record part of the options in the Map. The map records the 
mapping between option to entry. For example, `PipelineOptions.Name` is also an 
Option but it is not in the map.

##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -104,3 +104,70 @@ Was expecting one of:
     "," ...
 
 !error
+
+# test reset remove key
+reset execution.max-idle-state-retention;
+[WARNING] The specified key is not supported anymore.
+!warning
+
+set execution.max-table-result-rows=100;
+[WARNING] The specified key 'execution.max-table-result-rows' is deprecated. 
Please use 'sql-client.execution.max-table-result.rows' instead.
+[INFO] Session property has been set.
+!warning
+
+# test reset the deprecated key
+reset execution.max-table-result-rows;

Review comment:
       use `reset sql-client.execution.max-table-result.rows` instead. 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
##########
@@ -173,6 +173,9 @@ public void testCommands() throws Exception {
                                 .cannotParseComment(),
                         // reset
                         TestItem.validSql("reset;", 
SqlCommand.RESET).cannotParseComment(),
+                        // reset xx
+                        TestItem.validSql("reset xx.yy;", SqlCommand.RESET, 
"xx.yy")

Review comment:
       Add a case `reset   xx-yy   ;`

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -412,14 +413,44 @@ private void callClear() {
         clearTerminal();
     }
 
-    private void callReset() {
+    private void callReset(SqlCommandCall cmdCall) {
         try {
-            executor.resetSessionProperties(sessionId);
+            // reset all session properties
+            if (cmdCall.operands.length == 0) {
+                executor.resetSessionProperties(sessionId);
+                printInfo(CliStrings.MESSAGE_RESET);
+            }
+            // reset a session property
+            else {
+                String key = cmdCall.operands[0].trim();
+                executor.resetSessionProperty(sessionId, key);
+                if (YamlConfigUtils.isRemovedKey(key)) {
+                    terminal.writer()
+                            
.println(CliStrings.messageWarning(MESSAGE_SET_REMOVED_KEY).toAnsi());

Review comment:
       `MESSAGE_RESET_REMOVED_KEY`?

##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -104,3 +104,70 @@ Was expecting one of:
     "," ...
 
 !error
+
+# test reset remove key
+reset execution.max-idle-state-retention;
+[WARNING] The specified key is not supported anymore.
+!warning
+
+set execution.max-table-result-rows=100;
+[WARNING] The specified key 'execution.max-table-result-rows' is deprecated. 
Please use 'sql-client.execution.max-table-result.rows' instead.
+[INFO] Session property has been set.
+!warning
+
+# test reset the deprecated key
+reset execution.max-table-result-rows;
+[WARNING] The specified key 'execution.max-table-result-rows' is deprecated. 
Please use 'sql-client.execution.max-table-result.rows' instead.
+[INFO] Session property has been reset.
+!warning
+
+set;
+execution.attached=true
+execution.savepoint.ignore-unclaimed-state=false
+execution.shutdown-on-attached-exit=false
+execution.target=remote
+jobmanager.rpc.address=$VAR_JOBMANAGER_RPC_ADDRESS
+pipeline.classpaths=
+pipeline.jars=$VAR_PIPELINE_JARS
+rest.port=$VAR_REST_PORT
+!ok
+
+
+set parallelism.default=3;
+[INFO] Session property has been set.
+!info
+
+# test reset option key
+reset parallelism.default;

Review comment:
       reset `execution.parallelism`

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -412,14 +413,44 @@ private void callClear() {
         clearTerminal();
     }
 
-    private void callReset() {
+    private void callReset(SqlCommandCall cmdCall) {
         try {
-            executor.resetSessionProperties(sessionId);
+            // reset all session properties
+            if (cmdCall.operands.length == 0) {
+                executor.resetSessionProperties(sessionId);
+                printInfo(CliStrings.MESSAGE_RESET);
+            }
+            // reset a session property
+            else {
+                String key = cmdCall.operands[0].trim();
+                executor.resetSessionProperty(sessionId, key);
+                if (YamlConfigUtils.isRemovedKey(key)) {
+                    terminal.writer()
+                            
.println(CliStrings.messageWarning(MESSAGE_SET_REMOVED_KEY).toAnsi());
+                } else {
+                    if (YamlConfigUtils.isDeprecatedKey(key)) {
+                        terminal.writer()
+                                .println(
+                                        CliStrings.messageWarning(
+                                                        String.format(
+                                                                
MESSAGE_SET_DEPRECATED_KEY,

Review comment:
       ditto

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -114,6 +114,35 @@ public void reset() {
         this.executionContext = new ExecutionContext(executionContext);
     }
 
+    /**
+     * Reset key's property to default. It will rebuild a new {@link 
ExecutionContext}.
+     *
+     * <p>If key is not defined in default config file, remove it from session 
configuration.
+     */
+    public void reset(String key) {
+        Configuration configuration = defaultContext.getFlinkConfig();
+        // If the key exist in default yaml , reset to default
+        if (configuration.containsKey(key)) {
+            String defaultValue =
+                    
configuration.get(ConfigOptions.key(key).stringType().noDefaultValue());
+            this.set(key, defaultValue);
+        } else {
+            ConfigOption<String> keyToDelete = 
ConfigOptions.key(key).stringType().noDefaultValue();
+            sessionConfiguration.removeConfig(keyToDelete);
+            // need to remove compatible key
+            if (YamlConfigUtils.isDeprecatedKey(key)) {
+                String optionKey = 
YamlConfigUtils.getOptionNameWithDeprecatedKey(key);
+                sessionConfiguration.removeConfig(
+                        
ConfigOptions.key(optionKey).stringType().noDefaultValue());
+            } else if (YamlConfigUtils.isOptionKey(key)) {
+                String deprecatedKey = 
YamlConfigUtils.getDeprecatedNameWithOptionKey(key);
+                sessionConfiguration.removeConfig(
+                        
ConfigOptions.key(deprecatedKey).stringType().noDefaultValue());
+            }
+            this.executionContext = new ExecutionContext(executionContext);

Review comment:
       add a comment:
   
   ```
   // It's safe to build ExecutionContext directly because origin configuration 
is legal.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to