godfreyhe commented on code in PR #21641:
URL: https://github.com/apache/flink/pull/21641#discussion_r1095618248


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java:
##########
@@ -39,220 +41,281 @@
  */
 @Internal
 public class DirectConvertRule implements CallExpressionConvertRule {
+    private static Map<Boolean, DirectConvertRule> cachedInstances = new 
HashMap<>();
 
-    private static final Map<FunctionDefinition, SqlOperator> 
DEFINITION_OPERATOR_MAP =
+    public static synchronized DirectConvertRule instance(boolean isBatchMode) 
{
+        DirectConvertRule instance = cachedInstances.get(isBatchMode);
+        if (instance == null) {
+            instance = new DirectConvertRule();
+            instance.init();
+            instance.initDynamicFunctions(isBatchMode);
+            cachedInstances.put(isBatchMode, instance);
+        }
+        return instance;
+    }
+
+    private final Map<FunctionDefinition, SqlOperator> 
definitionSqlOperatorHashMap =
             new HashMap<>();
 
-    static {
+    void initDynamicFunctions(boolean isBatchMode) {
+        FlinkSqlOperatorTable.dynamicFunctions(isBatchMode)
+                .forEach(
+                        func -> {
+                            if (func.getName()
+                                    
.equalsIgnoreCase(SqlStdOperatorTable.CURRENT_DATE.getName())) {
+                                definitionSqlOperatorHashMap.put(
+                                        
BuiltInFunctionDefinitions.CURRENT_DATE, func);
+                            } else if (func.getName()
+                                    
.equalsIgnoreCase(SqlStdOperatorTable.CURRENT_TIME.getName())) {
+                                definitionSqlOperatorHashMap.put(
+                                        
BuiltInFunctionDefinitions.CURRENT_TIME, func);
+                            } else if (func.getName()
+                                    
.equalsIgnoreCase(SqlStdOperatorTable.LOCALTIME.getName())) {
+                                definitionSqlOperatorHashMap.put(
+                                        BuiltInFunctionDefinitions.LOCAL_TIME, 
func);
+                            } else {
+                                throw new TableException(
+                                        String.format(
+                                                "Unsupported mapping for 
dynamic function: %s",
+                                                func.getName()));
+                            }
+                        });
+    }
+
+    void init() {

Review Comment:
   initNonDynamicFunctions ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java:
##########
@@ -39,220 +41,281 @@
  */
 @Internal
 public class DirectConvertRule implements CallExpressionConvertRule {
+    private static Map<Boolean, DirectConvertRule> cachedInstances = new 
HashMap<>();
 
-    private static final Map<FunctionDefinition, SqlOperator> 
DEFINITION_OPERATOR_MAP =
+    public static synchronized DirectConvertRule instance(boolean isBatchMode) 
{
+        DirectConvertRule instance = cachedInstances.get(isBatchMode);
+        if (instance == null) {
+            instance = new DirectConvertRule();
+            instance.init();
+            instance.initDynamicFunctions(isBatchMode);
+            cachedInstances.put(isBatchMode, instance);
+        }
+        return instance;
+    }
+
+    private final Map<FunctionDefinition, SqlOperator> 
definitionSqlOperatorHashMap =
             new HashMap<>();
 
-    static {
+    void initDynamicFunctions(boolean isBatchMode) {
+        FlinkSqlOperatorTable.dynamicFunctions(isBatchMode)
+                .forEach(
+                        func -> {
+                            if (func.getName()
+                                    
.equalsIgnoreCase(SqlStdOperatorTable.CURRENT_DATE.getName())) {
+                                definitionSqlOperatorHashMap.put(
+                                        
BuiltInFunctionDefinitions.CURRENT_DATE, func);
+                            } else if (func.getName()
+                                    
.equalsIgnoreCase(SqlStdOperatorTable.CURRENT_TIME.getName())) {
+                                definitionSqlOperatorHashMap.put(
+                                        
BuiltInFunctionDefinitions.CURRENT_TIME, func);
+                            } else if (func.getName()
+                                    
.equalsIgnoreCase(SqlStdOperatorTable.LOCALTIME.getName())) {
+                                definitionSqlOperatorHashMap.put(
+                                        BuiltInFunctionDefinitions.LOCAL_TIME, 
func);

Review Comment:
   It's better all dynamic functions should be defined here, including 
CURRENT_TIMESTAMP, LOCALTIMESTAMP, NOW



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java:
##########
@@ -60,20 +63,52 @@
 public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 
     /** The table of contains Flink-specific operators. */
-    private static FlinkSqlOperatorTable instance;
+    private static final Map<Boolean, FlinkSqlOperatorTable> cachedInstances = 
new HashMap<>();
 
     /** Returns the Flink operator table, creating it if necessary. */
-    public static synchronized FlinkSqlOperatorTable instance() {
+    public static synchronized FlinkSqlOperatorTable instance(boolean 
isBatchMode) {
+        FlinkSqlOperatorTable instance = cachedInstances.get(isBatchMode);
         if (instance == null) {
             // Creates and initializes the standard operator table.
             // Uses two-phase construction, because we can't initialize the
             // table until the constructor of the sub-class has completed.
             instance = new FlinkSqlOperatorTable();
             instance.init();
+
+            // ensure no dynamic function declares directly
+            validateNoDynamicFunction(instance);
+
+            // register functions based on batch or streaming mode
+            final FlinkSqlOperatorTable finalInstance = instance;
+            dynamicFunctions(isBatchMode).forEach(f -> 
finalInstance.register(f));
+            cachedInstances.put(isBatchMode, finalInstance);
         }
         return instance;
     }
 
+    public static List<SqlFunction> dynamicFunctions(boolean isBatchMode) {
+        return Arrays.asList(
+                new FlinkTimestampDynamicFunction(
+                        SqlStdOperatorTable.LOCALTIME.getName(), 
SqlTypeName.TIME, isBatchMode),
+                new FlinkTimestampDynamicFunction(
+                        SqlStdOperatorTable.CURRENT_TIME.getName(), 
SqlTypeName.TIME, isBatchMode),
+                new FlinkCurrentDateFunction(isBatchMode));

Review Comment:
   ditto



##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -647,15 +647,15 @@ The following system functions are always 
non-deterministic(evaluated per record
 - RAND_INTEGER
 - CURRENT_DATABASE
 - UNIX_TIMESTAMP
+- CURRENT_TIMESTAMP
 - CURRENT_ROW_TIMESTAMP
+- LOCALTIMESTAMP
+- NOW

Review Comment:
   These methods are dynamic function, see CurrentTimePointCallGen



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/DynamicFunctionPlanTestBase.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common;
+
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Plan test for queries contain dynamic functions. */
+public abstract class DynamicFunctionPlanTestBase extends TableTestBase {
+
+    private TableTestUtil util;
+
+    protected abstract boolean isBatchMode();
+
+    protected abstract TableTestUtil getTableTestUtil();
+
+    @Before
+    public void setup() {
+        util = getTableTestUtil();
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE src (\n"
+                                + " a INTEGER,\n"
+                                + " b VARCHAR,\n"
+                                + " cat VARCHAR,\n"
+                                + " gmt_date DATE,\n"
+                                + " cnt BIGINT,\n"
+                                + " ts TIME,\n"
+                                + " PRIMARY KEY (cat) NOT ENFORCED\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values'\n"
+                                + " ,'bounded' = '"
+                                + isBatchMode()
+                                + "'\n"
+                                + ")");
+    }
+
+    @Test
+    public void testAggregateReduceConstants() {
+        util.verifyExecPlan(
+                "SELECT\n"
+                        + "     cat, gmt_date, SUM(cnt), count(*)\n"
+                        + "FROM src\n"
+                        + "WHERE gmt_date = current_date\n"
+                        + "GROUP BY cat, gmt_date");
+    }
+
+    @Test
+    public void testAggregateReduceConstants2() {
+        // current RelMdPredicates only look at columns that are projected 
without any function
+        // applied, so 'SUBSTR(CAST(LOCALTIME AS VARCHAR), 1, 2)' will never 
be inferred as constant
+        util.verifyExecPlan(
+                "SELECT\n"
+                        + "cat, hh, SUM(cnt), COUNT(*)\n"
+                        + "FROM (SELECT *, SUBSTR(CAST(LOCALTIME AS VARCHAR), 
1, 2) hh FROM src)\n"
+                        + "WHERE SUBSTR(CAST(ts AS VARCHAR), 1, 2) = hh\n"
+                        + "GROUP BY cat, hh");
+    }
+
+    @Test
+    public void testAggregateReduceConstants3() {
+        util.verifyExecPlan(
+                "SELECT\n"
+                        + "     gmt_date, ts, cat, SUBSTR(CAST(ts AS VARCHAR), 
1, 2), SUM(cnt)\n"
+                        + "FROM src\n"
+                        + "WHERE gmt_date = CURRENT_DATE\n"
+                        + "  AND cat = 'fruit' AND ts = CURRENT_TIME\n"
+                        + "GROUP BY gmt_date, ts, cat");
+    }
+
+    @Test
+    public void testCalcMerge() {
+        // TODO after FLINK-30841 fixed, this plan should be updated

Review Comment:
   FLINK-30841  is accepted



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala:
##########
@@ -544,6 +541,23 @@ class FunctionGenerator private (tableConfig: 
ReadableConfig) {
     Seq(VARCHAR),
     new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR, 
argsNullable = true)))
 
+  FlinkSqlOperatorTable
+    .dynamicFunctions(!isStreamingMode)
+    .forEach(
+      func => {
+        if (func.getName == SqlStdOperatorTable.LOCALTIME.getName) {
+          addSqlFunction(func, Seq(), new CurrentTimePointCallGen(true, 
isStreamingMode))
+        } else if (
+          func.getName == SqlStdOperatorTable.CURRENT_DATE.getName
+          || func.getName == SqlStdOperatorTable.CURRENT_TIME.getName
+        ) {

Review Comment:
   diito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to