matriv commented on a change in pull request #17522: URL: https://github.com/apache/flink/pull/17522#discussion_r736455472
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/CastExecutor.java ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; + +import javax.annotation.Nullable; + +/** + * Interface to model a function that performs the casting of a value from one type to another. + * + * <p>This interface is serializable in order to be embedded in the runtime codegen. Review comment: I guess this is not valid anymore? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.utils.CastExecutor; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRule; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.generated.CompileUtils; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.InvocationTargetException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Base class for {@link CastRule} supporting code generation. This base class implements {@link + * #create(CastRule.Context, LogicalType, LogicalType)} compiling the generated code block into a + * {@link CastExecutor} implementation. + * + * <p>We suggest implementing {@link CodeGeneratorCastRule} starting from {@link + * AbstractNullAwareCodeGeneratorCastRule}, which provides nullability checks, or from {@link + * AbstractExpressionCodeGeneratorCastRule} to generate simple expression casts. + */ +public abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<IN, OUT> + implements CodeGeneratorCastRule<IN, OUT> { + + protected AbstractCodeGeneratorCastRule(CastRulePredicate predicate) { + super(predicate); + } + + @SuppressWarnings("unchecked") + @Override + public CastExecutor<IN, OUT> create( + CastRule.Context castRuleContext, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + final String inputTerm = "_myInput"; + final String className = CodeGenUtils.newName("GeneratedCastExecutor"); + final String inputTypeTerm = CodeGenUtils.boxedTypeTermForType(inputLogicalType); + final String targetTypeTerm = CodeGenUtils.boxedTypeTermForType(targetLogicalType); + + final CastExecutorCodeGeneratorContext ctx = + new CastExecutorCodeGeneratorContext(castRuleContext); + final CastCodeBlock codeBlock = + generateCodeBlock(ctx, inputTerm, inputLogicalType, targetLogicalType); + + // Class fields can contain type serializers + final String classFieldDecls = + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "private final TypeSerializer " + name + ";\n") + .collect(Collectors.joining()); + + final String constructorSignature = + "public " + + className + + "(" + + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "TypeSerializer " + name) + .collect(Collectors.joining(", ")) + + ")"; + final String constructorBody = + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "this." + name + " = " + name + ";\n") + .collect(Collectors.joining()); + + // Because janino doesn't support generics, we need to manually cast the input variable of + // the cast method + final String functionSignature = + "@Override public Object cast(Object _myInputObj) throws " + + CodeGenUtils.className(TableException.class); + final String inputVarDecl = + inputTypeTerm + " " + inputTerm + " = (" + inputTypeTerm + ") _myInputObj;\n"; + + final String returnStmt = "return " + codeBlock.getReturnTerm() + ";\n"; + + final String classCode = Review comment: Just a remark: this part is a good example imho, to disable the code format checks, and ident the parts of the string in a more readable way. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastCodeBlock.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** + * Generated cast code block result. This POJO contains the Java code of the block performing the + * cast, the output isNull term and the output variable containing the cast result. It is guaranteed + * that the result and isNull variables can be accessed within the outside scope of the code. Review comment: Maybe: ```suggestion * that the result and isNull variables can be accessed within the external scope of the code. ``` reads better. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/CastExecutor.java ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; + +import javax.annotation.Nullable; + +/** + * Interface to model a function that performs the casting of a value from one type to another. + * + * <p>This interface is serializable in order to be embedded in the runtime codegen. + * + * @param <IN> Input internal type + * @param <OUT> Output internal type + */ +@Internal +@FunctionalInterface +public interface CastExecutor<IN, OUT> { + /** + * Cast the input value. The output is null only and only if the input is null. The method Review comment: Imho we should add a TODO and link to https://issues.apache.org/jira/browse/FLINK-24385, since only then normal cast would throw errors. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.utils.CastExecutor; +import org.apache.flink.table.planner.functions.casting.CastRule; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.generated.CompileUtils; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import java.util.Collections; + +/** + * Base class for cast rules that supports code generation, requiring only an expression to perform + * the cast. If the casting logic requires to generate several statements, look at {@link + * AbstractNullAwareCodeGeneratorCastRule}. + */ +@Internal +public abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT> + extends AbstractNullAwareCodeGeneratorCastRule<IN, OUT> { + + protected AbstractExpressionCodeGeneratorCastRule(CastRulePredicate predicate) { + super(predicate); + } + + abstract String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType); + + @Override + protected String generateCodeBlockInternal( + CodeGeneratorCastRule.Context context, + String inputTerm, + String returnVariable, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + return returnVariable + + " = " + + generateExpression(context, inputTerm, inputLogicalType, targetLogicalType) + + ";"; + } + + @Override + public CastExecutor<IN, OUT> create( + CastRule.Context context, LogicalType inputLogicalType, LogicalType targetLogicalType) { + final String inputArgumentName = "inputValue"; + + final String expression = + generateExpression( + createCodeGeneratorCastRuleContext(context), + inputArgumentName, + inputLogicalType, + targetLogicalType); + + return new CodeGeneratedExpressionCastExecutor<>( + CompileUtils.compileExpression( + expression, + Collections.singletonList(inputArgumentName), + Collections.singletonList( + LogicalTypeUtils.toInternalConversionClass(inputLogicalType)), + LogicalTypeUtils.toInternalConversionClass(targetLogicalType))); + } + + private static CodeGeneratorCastRule.Context createCodeGeneratorCastRuleContext( + CastRule.Context ctx) { + return new CodeGeneratorCastRule.Context() { + @Override + public String getSessionTimeZoneTerm() { + return "java.util.TimeZone.getTimeZone(\"" + ctx.getSessionZoneId().getId() + "\")"; + } + + @Override + public String declareVariable(String type, String variablePrefix) { + throw new IllegalStateException( Review comment: Here and below, maybe `UnsupportedOperationException` is more suitable? ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java ########## @@ -875,6 +876,27 @@ protected Configuration configuration() { // single type // .fromCase(INT(), 10, new int[] {10}) .fromCase(ARRAY(INT()), new int[] {1, 2, 3}, new Integer[] {1, 2, 3}) + .build(), + CastTestSpecBuilder.testCastTo(ARRAY(STRING().nullable())) Review comment: With this implementation probably you can remove above the comments referring to: https://issues.apache.org/jira/browse/FLINK-17321? ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java ########## @@ -71,7 +71,7 @@ public T newInstance(ClassLoader classLoader) { // Because Constructor.newInstance(Object... initargs), we need to load // references into a new Object[], otherwise it cannot be compiled. .newInstance(new Object[] {references}); - } catch (Exception e) { + } catch (Throwable e) { Review comment: Why is this change needed? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.table.planner.codegen.CodeGenUtils; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.stream.Collectors; + +class CastRuleUtils { Review comment: Wouldn't be better for this class to be `final`, and the methods package private, is it expected to extend this class in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
