fsk119 commented on code in PR #20000:
URL: https://github.com/apache/flink/pull/20000#discussion_r928861091


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java:
##########
@@ -82,6 +88,7 @@ public class HiveParser extends ParserImpl {
     private static final Method getCurrentTSMethod =
             HiveReflectionUtils.tryGetMethod(
                     SessionState.class, "getQueryCurrentTimestamp", new 
Class[0]);
+    private static final String HIVE_VARIABLE_PREFIX = "__hive.variable__";

Review Comment:
   It's better to move to HiveInternalOptions and describe this in the 
descriptions.
   
   I think it's better to use `__hive.variables__`? WDYT?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java:
##########
@@ -207,6 +225,66 @@ public List<Operation> parse(String statement) {
         }
     }
 
+    private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf 
hiveConf, String statement) {
+        String[] commandTokens = statement.split("\\s+");
+        HiveCommand hiveCommand = HiveCommand.find(commandTokens);
+        if (hiveCommand != null) {
+            if (hiveCommand == HiveCommand.SET) {
+                return Optional.of(
+                        processSetCmd(
+                                hiveConf, 
statement.substring(commandTokens[0].length()).trim()));
+            } else {
+                throw new UnsupportedOperationException();
+            }
+        }
+        return Optional.empty();
+    }
+
+    private Operation processSetCmd(HiveConf hiveConf, String setCmdArgs) {
+        String nwcmd = setCmdArgs.trim();
+        // the set command may end with ";" as it won't be removed by Flink 
SQL CLI,
+        // so, we need to remove ";"
+        if (nwcmd.endsWith(";")) {
+            nwcmd = nwcmd.substring(0, nwcmd.length() - 1);
+        }
+
+        if (nwcmd.equals("")) {
+            String options =
+                    HiveSetProcessor.dumpOptions(
+                            hiveConf.getChangedProperties(), hiveConf, 
hiveVariables);
+            // todo show the options
+            throw new UnsupportedOperationException("Command 'set' isn't 
supported currently.");
+        }
+        if (nwcmd.equals("-v")) {
+            String options =
+                    HiveSetProcessor.dumpOptions(
+                            hiveConf.getAllProperties(), hiveConf, 
hiveVariables);
+            throw new UnsupportedOperationException("Command 'set -v' isn't 
supported currently.");
+        }
+
+        String[] part = new String[2];
+        int eqIndex = nwcmd.indexOf('=');
+        if (nwcmd.contains("=")) {
+            if (eqIndex == nwcmd.length() - 1) { // x=
+                part[0] = nwcmd.substring(0, nwcmd.length() - 1);
+                part[1] = "";
+            } else { // x=y
+                part[0] = nwcmd.substring(0, eqIndex).trim();
+                part[1] = nwcmd.substring(eqIndex + 1).trim();
+            }
+            if (part[0].equals("silent")) {
+                throw new UnsupportedOperationException("Unsupported command 
'set silent'.");
+            }
+            HiveSetProcessor.setVariable(hiveConf, tableConfig, hiveVariables, 
part[0], part[1]);

Review Comment:
   It better to let SessionContext to do the set.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java:
##########
@@ -207,6 +225,66 @@ public List<Operation> parse(String statement) {
         }
     }
 
+    private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf 
hiveConf, String statement) {
+        String[] commandTokens = statement.split("\\s+");
+        HiveCommand hiveCommand = HiveCommand.find(commandTokens);
+        if (hiveCommand != null) {
+            if (hiveCommand == HiveCommand.SET) {
+                return Optional.of(
+                        processSetCmd(
+                                hiveConf, 
statement.substring(commandTokens[0].length()).trim()));
+            } else {
+                throw new UnsupportedOperationException();

Review Comment:
   nit: It's better to tell users what is unsupported.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java:
##########
@@ -903,6 +904,36 @@ public void testMacro() throws Exception {
                         badMacroName);
     }
 
+    @Test
+    public void testSetCommand() throws Exception {
+        // test set system:
+        tableEnv.executeSql("set system:xxx=5");
+        assertThat(System.getProperty("xxx")).isEqualTo("5");
+        // test set hiveconf:
+        tableEnv.executeSql("set hiveconf:yyy=${system:xxx}");
+        assertThat(hiveCatalog.getHiveConf().get("yyy")).isEqualTo("5");
+        // test set hivevar:
+        tableEnv.executeSql("set hivevar:a=1");
+        tableEnv.executeSql("set hiveconf:zzz=${hivevar:a}");

Review Comment:
   I think we should renew a sql parser to test whether the test config 
memorizes hive-vars.
   
   For example,
   
   ```
           tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
           tableEnv.executeSql("show tables");
           tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java:
##########
@@ -207,6 +225,66 @@ public List<Operation> parse(String statement) {
         }
     }
 
+    private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf 
hiveConf, String statement) {
+        String[] commandTokens = statement.split("\\s+");
+        HiveCommand hiveCommand = HiveCommand.find(commandTokens);
+        if (hiveCommand != null) {
+            if (hiveCommand == HiveCommand.SET) {
+                return Optional.of(
+                        processSetCmd(
+                                hiveConf, 
statement.substring(commandTokens[0].length()).trim()));
+            } else {
+                throw new UnsupportedOperationException();
+            }
+        }
+        return Optional.empty();
+    }
+
+    private Operation processSetCmd(HiveConf hiveConf, String setCmdArgs) {
+        String nwcmd = setCmdArgs.trim();
+        // the set command may end with ";" as it won't be removed by Flink 
SQL CLI,
+        // so, we need to remove ";"
+        if (nwcmd.endsWith(";")) {
+            nwcmd = nwcmd.substring(0, nwcmd.length() - 1);
+        }
+
+        if (nwcmd.equals("")) {
+            String options =
+                    HiveSetProcessor.dumpOptions(
+                            hiveConf.getChangedProperties(), hiveConf, 
hiveVariables);

Review Comment:
   remove if we don't support this.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive.copy;
+
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.api.TableConfig;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.VariableSubstitution;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
+
+/** Counterpart of hive's {@link 
org.apache.hadoop.hive.ql.processors.SetProcessor}. */
+public class HiveSetProcessor {
+
+    private static final String[] PASSWORD_STRINGS = new String[] {"password", 
"paswd", "pswd"};
+
+    public static void setVariable(
+            HiveConf hiveConf,
+            TableConfig tableConfig,
+            Map<String, String> hiveVariables,
+            String varname,
+            String varvalue) {
+        if (varname.startsWith(ENV_PREFIX)) {
+            throw new UnsupportedOperationException("env:* variables can not 
be set.");
+        } else if (varname.startsWith(SYSTEM_PREFIX)) {
+            String propName = varname.substring(SYSTEM_PREFIX.length());
+            System.getProperties()
+                    .setProperty(
+                            propName,
+                            new VariableSubstitution(() -> hiveVariables)
+                                    .substitute(hiveConf, varvalue));
+        } else if (varname.startsWith(HIVECONF_PREFIX)) {
+            String propName = varname.substring(HIVECONF_PREFIX.length());
+            setConf(hiveConf, hiveVariables, varname, propName, varvalue);
+        } else if (varname.startsWith(HIVEVAR_PREFIX)) {
+            String propName = varname.substring(HIVEVAR_PREFIX.length());
+            hiveVariables.put(
+                    propName,
+                    new VariableSubstitution(() -> 
hiveVariables).substitute(hiveConf, varvalue));
+        } else if (varname.startsWith(METACONF_PREFIX)) {
+            String propName = varname.substring(METACONF_PREFIX.length());
+            try {
+                Hive hive = Hive.get(hiveConf);
+                hive.setMetaConf(
+                        propName,
+                        new VariableSubstitution(() -> hiveVariables)
+                                .substitute(hiveConf, varvalue));
+            } catch (HiveException e) {
+                throw new FlinkHiveException(
+                        String.format("'SET %s=%s' FAILED.", varname, 
varvalue), e);
+            }
+        } else {
+            setConf(hiveConf, hiveVariables, varname, varname, varvalue);
+            // we also try to set the value to Flink's table config. 
Otherwise, we have no way to
+            // change the table config of Flink when using Hive dialect.
+            String value =
+                    new VariableSubstitution(() -> 
hiveVariables).substitute(hiveConf, varvalue);
+            tableConfig.set(varname, value);
+        }
+    }
+
+    private static void setConf(
+            HiveConf hiveConf,
+            Map<String, String> hiveVariables,
+            String varname,
+            String key,
+            String varvalue) {
+        String value = new VariableSubstitution(() -> 
hiveVariables).substitute(hiveConf, varvalue);
+        if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) {
+            HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
+            if (confVars != null) {
+                if (!confVars.isType(value)) {
+                    String message =
+                            String.format(
+                                    "'SET %s=%s' FAILED because %s expects %s 
type value.",
+                                    varname, varvalue, key, 
confVars.typeString());
+                    throw new IllegalArgumentException(message);
+                }
+                String fail = confVars.validate(value);
+                if (fail != null) {
+                    String message =
+                            String.format(
+                                    "'SET %s=%s' FAILED in validation : %s.",
+                                    varname, varvalue, fail);
+                    throw new IllegalArgumentException(message);
+                }
+            }
+        }
+        hiveConf.verifyAndSet(key, value);
+    }
+
+    public static String getVariable(

Review Comment:
   Do you want to support to show variables using `SET`? If not, I think we can 
add this when needs.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java:
##########
@@ -207,6 +225,66 @@ public List<Operation> parse(String statement) {
         }
     }
 
+    private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf 
hiveConf, String statement) {
+        String[] commandTokens = statement.split("\\s+");
+        HiveCommand hiveCommand = HiveCommand.find(commandTokens);
+        if (hiveCommand != null) {
+            if (hiveCommand == HiveCommand.SET) {
+                return Optional.of(
+                        processSetCmd(
+                                hiveConf, 
statement.substring(commandTokens[0].length()).trim()));
+            } else {
+                throw new UnsupportedOperationException();
+            }
+        }
+        return Optional.empty();
+    }
+
+    private Operation processSetCmd(HiveConf hiveConf, String setCmdArgs) {
+        String nwcmd = setCmdArgs.trim();
+        // the set command may end with ";" as it won't be removed by Flink 
SQL CLI,
+        // so, we need to remove ";"
+        if (nwcmd.endsWith(";")) {
+            nwcmd = nwcmd.substring(0, nwcmd.length() - 1);
+        }
+
+        if (nwcmd.equals("")) {
+            String options =
+                    HiveSetProcessor.dumpOptions(
+                            hiveConf.getChangedProperties(), hiveConf, 
hiveVariables);
+            // todo show the options
+            throw new UnsupportedOperationException("Command 'set' isn't 
supported currently.");
+        }
+        if (nwcmd.equals("-v")) {
+            String options =
+                    HiveSetProcessor.dumpOptions(
+                            hiveConf.getAllProperties(), hiveConf, 
hiveVariables);
+            throw new UnsupportedOperationException("Command 'set -v' isn't 
supported currently.");
+        }
+
+        String[] part = new String[2];
+        int eqIndex = nwcmd.indexOf('=');
+        if (nwcmd.contains("=")) {
+            if (eqIndex == nwcmd.length() - 1) { // x=
+                part[0] = nwcmd.substring(0, nwcmd.length() - 1);
+                part[1] = "";
+            } else { // x=y
+                part[0] = nwcmd.substring(0, eqIndex).trim();
+                part[1] = nwcmd.substring(eqIndex + 1).trim();
+            }
+            if (part[0].equals("silent")) {
+                throw new UnsupportedOperationException("Unsupported command 
'set silent'.");
+            }
+            HiveSetProcessor.setVariable(hiveConf, tableConfig, hiveVariables, 
part[0], part[1]);
+            return new NopOperation();
+        }
+
+        // todo show the option
+        String option = HiveSetProcessor.getVariable(hiveConf, hiveVariables, 
nwcmd);
+        // for the variable
+        throw new UnsupportedOperationException("Unsupported SET command which 
misses '='.");

Review Comment:
   Do we need this? I think we can remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to