matriv commented on a change in pull request #17522:
URL: https://github.com/apache/flink/pull/17522#discussion_r736455472



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/CastExecutor.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+
+import javax.annotation.Nullable;
+
+/**
+ * Interface to model a function that performs the casting of a value from one 
type to another.
+ *
+ * <p>This interface is serializable in order to be embedded in the runtime 
codegen.

Review comment:
       I guess this is not valid anymore?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.utils.CastExecutor;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRule;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.runtime.generated.CompileUtils;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for {@link CastRule} supporting code generation. This base class 
implements {@link
+ * #create(CastRule.Context, LogicalType, LogicalType)} compiling the 
generated code block into a
+ * {@link CastExecutor} implementation.
+ *
+ * <p>We suggest implementing {@link CodeGeneratorCastRule} starting from 
{@link
+ * AbstractNullAwareCodeGeneratorCastRule}, which provides nullability checks, 
or from {@link
+ * AbstractExpressionCodeGeneratorCastRule} to generate simple expression 
casts.
+ */
+public abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends 
AbstractCastRule<IN, OUT>
+        implements CodeGeneratorCastRule<IN, OUT> {
+
+    protected AbstractCodeGeneratorCastRule(CastRulePredicate predicate) {
+        super(predicate);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CastExecutor<IN, OUT> create(
+            CastRule.Context castRuleContext,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final String inputTerm = "_myInput";
+        final String className = CodeGenUtils.newName("GeneratedCastExecutor");
+        final String inputTypeTerm = 
CodeGenUtils.boxedTypeTermForType(inputLogicalType);
+        final String targetTypeTerm = 
CodeGenUtils.boxedTypeTermForType(targetLogicalType);
+
+        final CastExecutorCodeGeneratorContext ctx =
+                new CastExecutorCodeGeneratorContext(castRuleContext);
+        final CastCodeBlock codeBlock =
+                generateCodeBlock(ctx, inputTerm, inputLogicalType, 
targetLogicalType);
+
+        // Class fields can contain type serializers
+        final String classFieldDecls =
+                ctx.getDeclaredTypeSerializers().stream()
+                        .map(name -> "private final TypeSerializer " + name + 
";\n")
+                        .collect(Collectors.joining());
+
+        final String constructorSignature =
+                "public "
+                        + className
+                        + "("
+                        + ctx.getDeclaredTypeSerializers().stream()
+                                .map(name -> "TypeSerializer " + name)
+                                .collect(Collectors.joining(", "))
+                        + ")";
+        final String constructorBody =
+                ctx.getDeclaredTypeSerializers().stream()
+                        .map(name -> "this." + name + " = " + name + ";\n")
+                        .collect(Collectors.joining());
+
+        // Because janino doesn't support generics, we need to manually cast 
the input variable of
+        // the cast method
+        final String functionSignature =
+                "@Override public Object cast(Object _myInputObj) throws "
+                        + CodeGenUtils.className(TableException.class);
+        final String inputVarDecl =
+                inputTypeTerm + " " + inputTerm + " = (" + inputTypeTerm + ") 
_myInputObj;\n";
+
+        final String returnStmt = "return " + codeBlock.getReturnTerm() + 
";\n";
+
+        final String classCode =

Review comment:
       Just a remark: this part is a good example imho, to disable the code 
format checks, and ident the parts of the string in a more readable way.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastCodeBlock.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/**
+ * Generated cast code block result. This POJO contains the Java code of the 
block performing the
+ * cast, the output isNull term and the output variable containing the cast 
result. It is guaranteed
+ * that the result and isNull variables can be accessed within the outside 
scope of the code.

Review comment:
       Maybe: 
   ```suggestion
    * that the result and isNull variables can be accessed within the external 
scope of the code.
   ```
   reads better.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/CastExecutor.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+
+import javax.annotation.Nullable;
+
+/**
+ * Interface to model a function that performs the casting of a value from one 
type to another.
+ *
+ * <p>This interface is serializable in order to be embedded in the runtime 
codegen.
+ *
+ * @param <IN> Input internal type
+ * @param <OUT> Output internal type
+ */
+@Internal
+@FunctionalInterface
+public interface CastExecutor<IN, OUT> {
+    /**
+     * Cast the input value. The output is null only and only if the input is 
null. The method

Review comment:
       Imho we should add a TODO and link to 
https://issues.apache.org/jira/browse/FLINK-24385, since only then normal cast 
would throw errors.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.utils.CastExecutor;
+import org.apache.flink.table.planner.functions.casting.CastRule;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.runtime.generated.CompileUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import java.util.Collections;
+
+/**
+ * Base class for cast rules that supports code generation, requiring only an 
expression to perform
+ * the cast. If the casting logic requires to generate several statements, 
look at {@link
+ * AbstractNullAwareCodeGeneratorCastRule}.
+ */
+@Internal
+public abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
+        extends AbstractNullAwareCodeGeneratorCastRule<IN, OUT> {
+
+    protected AbstractExpressionCodeGeneratorCastRule(CastRulePredicate 
predicate) {
+        super(predicate);
+    }
+
+    abstract String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType);
+
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return returnVariable
+                + " = "
+                + generateExpression(context, inputTerm, inputLogicalType, 
targetLogicalType)
+                + ";";
+    }
+
+    @Override
+    public CastExecutor<IN, OUT> create(
+            CastRule.Context context, LogicalType inputLogicalType, 
LogicalType targetLogicalType) {
+        final String inputArgumentName = "inputValue";
+
+        final String expression =
+                generateExpression(
+                        createCodeGeneratorCastRuleContext(context),
+                        inputArgumentName,
+                        inputLogicalType,
+                        targetLogicalType);
+
+        return new CodeGeneratedExpressionCastExecutor<>(
+                CompileUtils.compileExpression(
+                        expression,
+                        Collections.singletonList(inputArgumentName),
+                        Collections.singletonList(
+                                
LogicalTypeUtils.toInternalConversionClass(inputLogicalType)),
+                        
LogicalTypeUtils.toInternalConversionClass(targetLogicalType)));
+    }
+
+    private static CodeGeneratorCastRule.Context 
createCodeGeneratorCastRuleContext(
+            CastRule.Context ctx) {
+        return new CodeGeneratorCastRule.Context() {
+            @Override
+            public String getSessionTimeZoneTerm() {
+                return "java.util.TimeZone.getTimeZone(\"" + 
ctx.getSessionZoneId().getId() + "\")";
+            }
+
+            @Override
+            public String declareVariable(String type, String variablePrefix) {
+                throw new IllegalStateException(

Review comment:
       Here and below, maybe `UnsupportedOperationException` is more suitable?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -875,6 +876,27 @@ protected Configuration configuration() {
                         // single type
                         // .fromCase(INT(), 10, new int[] {10})
                         .fromCase(ARRAY(INT()), new int[] {1, 2, 3}, new 
Integer[] {1, 2, 3})
+                        .build(),
+                CastTestSpecBuilder.testCastTo(ARRAY(STRING().nullable()))

Review comment:
       With this implementation probably you can remove above the comments 
referring to: https://issues.apache.org/jira/browse/FLINK-17321?

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
##########
@@ -71,7 +71,7 @@ public T newInstance(ClassLoader classLoader) {
                     // Because Constructor.newInstance(Object... initargs), we 
need to load
                     // references into a new Object[], otherwise it cannot be 
compiled.
                     .newInstance(new Object[] {references});
-        } catch (Exception e) {
+        } catch (Throwable e) {

Review comment:
       Why is this change needed?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+class CastRuleUtils {

Review comment:
       Wouldn't be better for this class to be `final`, and the methods package 
private, is it expected to extend this class in the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to