dawidwys commented on a change in pull request #13340:
URL: https://github.com/apache/flink/pull/13340#discussion_r484394635



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ConstantFunctionContext.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.api.TableException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link FunctionContext} for constant expression reduction. It is used 
when a function is called
+ * with constant expressions or constant expressions can be derived from the 
given statement.
+ *
+ * <p>Since constant expression reduction happens during planning, methods 
that reference Flink's runtime
+ * context are not available.
+ *
+ * @see FunctionDefinition#isDeterministic()
+ */
+@Internal
+public final class ConstantFunctionContext extends FunctionContext {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ConstantFunctionContext.class);
+
+       private static UnregisteredMetricsGroup metricsGroup = new 
UnregisteredMetricsGroup();

Review comment:
       nit: `final`?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ConstantFunctionContext.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.api.TableException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link FunctionContext} for constant expression reduction. It is used 
when a function is called
+ * with constant expressions or constant expressions can be derived from the 
given statement.
+ *
+ * <p>Since constant expression reduction happens during planning, methods 
that reference Flink's runtime
+ * context are not available.
+ *
+ * @see FunctionDefinition#isDeterministic()
+ */
+@Internal
+public final class ConstantFunctionContext extends FunctionContext {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ConstantFunctionContext.class);
+
+       private static UnregisteredMetricsGroup metricsGroup = new 
UnregisteredMetricsGroup();
+
+       private final Map<String, String> jobParameters;
+
+       public ConstantFunctionContext(Configuration configuration) {
+               super(null);
+               this.jobParameters = 
configuration.getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
+                       .map(HashMap::new)
+                       .orElseGet(HashMap::new);
+       }
+
+       @Override
+       public MetricGroup getMetricGroup() {
+               LOG.warn(
+                       "Calls to FunctionContext.getMetricGroup will have no 
effect during constant expression reduction.");
+               return metricsGroup;

Review comment:
       I am undecided myself, just mentioning it... Not sure if that's the best 
choice to in the end ignore the metrics registration. It will be rather hard to 
track why the metrics are not visible. Maybe we should throw a helpful 
exception as well?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
##########
@@ -77,11 +77,13 @@ public File getCachedFile(String name) {
         */
        public String getJobParameter(String key, String defaultValue) {
                final GlobalJobParameters conf = 
context.getExecutionConfig().getGlobalJobParameters();
-               if (conf != null && conf.toMap().containsKey(key)) {
-                       return conf.toMap().get(key);
-               } else {
-                       return defaultValue;
+               if (conf != null) {
+                       final String value = conf.toMap().get(key);

Review comment:
       nit: `return conf.toMap().getOrDefault(key, defaultValue);`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ConstantFunctionContext.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.api.TableException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link FunctionContext} for constant expression reduction. It is used 
when a function is called
+ * with constant expressions or constant expressions can be derived from the 
given statement.
+ *
+ * <p>Since constant expression reduction happens during planning, methods 
that reference Flink's runtime
+ * context are not available.
+ *
+ * @see FunctionDefinition#isDeterministic()
+ */
+@Internal
+public final class ConstantFunctionContext extends FunctionContext {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ConstantFunctionContext.class);
+
+       private static UnregisteredMetricsGroup metricsGroup = new 
UnregisteredMetricsGroup();
+
+       private final Map<String, String> jobParameters;
+
+       public ConstantFunctionContext(Configuration configuration) {
+               super(null);
+               this.jobParameters = 
configuration.getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
+                       .map(HashMap::new)
+                       .orElseGet(HashMap::new);
+       }
+
+       @Override
+       public MetricGroup getMetricGroup() {
+               LOG.warn(
+                       "Calls to FunctionContext.getMetricGroup will have no 
effect during constant expression reduction.");
+               return metricsGroup;
+       }
+
+       @Override
+       public File getCachedFile(String name) {
+               throw new TableException(
+                       "Calls to FunctionContext.getCachedFile are not 
available during constant expression reduction.");

Review comment:
       How about we add a hint what to do if users do need that method? Shall 
we redirect to `FunctionDefinition#isDeterministic`? 
   
   Something like: `If your function requires the ... to work, you should tell 
Flink the function is not deterministic via FunctionDefinition#isDeterministic`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to