twalthr commented on a change in pull request #17658:
URL: https://github.com/apache/flink/pull/17658#discussion_r742622302



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
##########
@@ -44,10 +54,25 @@
     private static final CastRuleProvider INSTANCE = new CastRuleProvider();
 
     static {
-        INSTANCE.addRule(TimestampToStringCastRule.INSTANCE)
-                .addRule(IdentityCastRule.INSTANCE)
+        INSTANCE
+                // Numeric rules
                 .addRule(UpcastToBigIntCastRule.INSTANCE)
-                .addRule(ArrayToArrayCastRule.INSTANCE);
+                // To string rules
+                .addRule(NumberToStringCastRule.INSTANCE)

Review comment:
       nit: can we use our type root and type family terminology if possible? 
`NumericToVarchar` or at least `NumericToString`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
##########
@@ -36,4 +49,141 @@ static String functionCall(String functionName, Object... 
args) {
     static String functionCall(Method staticMethod, Object... args) {
         return functionCall(CodeGenUtils.qualifyMethod(staticMethod), args);
     }
+
+    static String constructorCall(Class<?> clazz, Object... args) {
+        return functionCall("new " + className(clazz), args);
+    }
+
+    static String methodCall(String instanceTerm, String methodName, Object... 
args) {
+        return functionCall(instanceTerm + "." + methodName, args);
+    }
+
+    static String newArray(String innerType, String arraySize) {
+        return "new " + innerType + "[" + arraySize + "]";
+    }
+
+    static String stringConcat(Object... args) {
+        return 
Arrays.stream(args).map(Object::toString).collect(Collectors.joining(" + "));
+    }
+
+    static String accessStaticField(Class<?> clazz, String fieldName) {
+        return className(clazz) + "." + fieldName;
+    }
+
+    static String ternaryOperator(String condition, String ifTrue, String 
ifFalse) {
+        return "((" + condition + ") ? (" + ifTrue + ") : (" + ifFalse + "))";
+    }
+
+    static String strLiteral() {
+        return "\"\"";
+    }
+
+    static String strLiteral(String str) {
+        return "\"" + StringEscapeUtils.escapeJava(str) + "\"";
+    }
+
+    static final class CodeWriter {

Review comment:
       very nice!

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,39 +112,142 @@
                         .fromCase(SMALLINT(), (short) 10, 10L)
                         .fromCase(TINYINT(), (byte) 10, 10L),
                 CastTestSpecBuilder.testCastTo(STRING())
+                        .fromCase(STRING(), null, null)
+                        .fromCase(
+                                CHAR(3), StringData.fromString("foo"), 
StringData.fromString("foo"))
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                StringData.fromString("Flink"))
+                        .fromCase(
+                                VARCHAR(10),
+                                StringData.fromString("Flink"),
+                                StringData.fromString("Flink"))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache Flink"),
+                                StringData.fromString("Apache Flink"))
+                        .fromCase(BOOLEAN(), true, 
StringData.fromString("true"))
+                        .fromCase(BOOLEAN(), false, 
StringData.fromString("false"))
+                        .fromCase(
+                                BINARY(2), new byte[] {0, 1}, 
StringData.fromString("\u0000\u0001"))
+                        .fromCase(
+                                VARBINARY(3),
+                                new byte[] {0, 1, 2},
+                                StringData.fromString("\u0000\u0001\u0002"))
+                        .fromCase(
+                                VARBINARY(5),
+                                new byte[] {0, 1, 2},
+                                StringData.fromString("\u0000\u0001\u0002"))
+                        .fromCase(
+                                BYTES(),
+                                new byte[] {0, 1, 2, 3, 4},
+                                
StringData.fromString("\u0000\u0001\u0002\u0003\u0004"))
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new 
BigDecimal("9.87"), 4, 3),
+                                StringData.fromString("9.870"))
+                        .fromCase(
+                                DECIMAL(5, 3),
+                                DecimalData.fromBigDecimal(new 
BigDecimal("9.87"), 5, 3),
+                                StringData.fromString("9.870"))

Review comment:
       This should have leading zeros, no?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java
##########
@@ -68,63 +70,61 @@ protected String generateCodeBlockInternal(
             String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
-        LogicalType innerTargetType = ((ArrayType) 
targetLogicalType).getElementType();
-
-        String innerTargetTypeTerm = arrayElementType(innerTargetType);
-        String arraySize = inputTerm + ".size()";
-        String objArrayTerm = newName("objArray");
-
-        StringBuilder result = new StringBuilder();
-        result.append(
-                innerTargetTypeTerm
-                        + "[] "
-                        + objArrayTerm
-                        + " = new "
-                        + innerTargetTypeTerm
-                        + "["
-                        + arraySize
-                        + "];\n");
-
-        result.append("for (int i = 0; i < " + arraySize + "; i++) {\n");
-        CastCodeBlock codeBlock =
-                CastRuleProvider.generateCodeBlock(
-                        context,
-                        rowFieldReadAccess("i", inputTerm, innerInputType),
-                        functionCall(inputTerm + ".isNullAt", "i"),
-                        innerInputType.copy(false), // Null check is done at 
the array access level
-                        innerTargetType);
-
-        String innerElementCode =
-                codeBlock.getCode()
-                        + "\n"
-                        + objArrayTerm
-                        + "[i] = "
-                        + codeBlock.getReturnTerm()
-                        + ";\n";
+        final LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
+        final LogicalType innerTargetType =
+                sanitizeTargetType((ArrayType) inputLogicalType, (ArrayType) 
targetLogicalType);
+
+        final String innerTargetTypeTerm = arrayElementType(innerTargetType);
+        final String arraySize = methodCall(inputTerm, "size");
+        final String objArrayTerm = newName("objArray");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(
+                        innerTargetTypeTerm + "[]",
+                        objArrayTerm,
+                        newArray(innerTargetTypeTerm, arraySize))
+                .forStmt(
+                        arraySize,
+                        (index, loopWriter) -> {
+                            CastCodeBlock codeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(index, 
inputTerm, innerInputType),
+                                            "false",
+                                            // Null check is done at the array 
access level
+                                            innerInputType.copy(false),
+                                            innerTargetType);
+
+                            if (innerTargetType.isNullable()) {
+                                loopWriter.ifStmt(
+                                        "!" + functionCall(inputTerm + 
".isNullAt", index),

Review comment:
       I find the name `functionCall` vs. `methodCall` confusing. How about 
`staticCall` for `functionCall`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} 
cast rule. */
+public class RowToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final RowToStringCastRule INSTANCE = new 
RowToStringCastRule();
+
+    private RowToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ROW)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && ((RowType) input)
+                                                        .getFields().stream()
+                                                                .allMatch(
+                                                                        field 
->
+                                                                               
 CastRuleProvider
+                                                                               
         .exists(
+                                                                               
                 field
+                                                                               
                         .getType(),
+                                                                               
                 target)))
+                        .build());
+    }
+
+    /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.delete(0, builder$1.length());
+        builder$1.append("(");
+        int f0Value$2 = -1;
+        boolean f0IsNull$3 = _myInput.isNullAt(0);
+        if (!f0IsNull$3) {
+            f0Value$2 = _myInput.getInt(0);
+            result$2 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$2);
+            builder$1.append(result$2);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(",");
+        org.apache.flink.table.data.binary.BinaryStringData f1Value$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+        boolean f1IsNull$5 = _myInput.isNullAt(1);
+        if (!f1IsNull$5) {
+            f1Value$4 = ((org.apache.flink.table.data.binary.BinaryStringData) 
_myInput.getString(1));
+            builder$1.append(f1Value$4);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(")");
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final List<LogicalType> fields =
+                ((RowType) inputLogicalType)
+                        .getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .collect(Collectors.toList());
+
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, 
constructorCall(StringBuilder.class));
+
+        final CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(
+                                methodCall(
+                                        builderTerm,
+                                        "delete",
+                                        0,
+                                        methodCall(builderTerm, "length")))
+                        .stmt(methodCall(builderTerm, "append", 
strLiteral("(")));

Review comment:
       maybe document why we chose this representation, because I was just 
thinking about if we should synchronize this with the `Row.toString`. but maybe 
`( )` is ok to distinguish it from arrays and rows with `+I[]` where we also 
have a change flag

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+
+/**
+ * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
+ */
+public class BinaryToStringCastRule extends 
AbstractCharacterFamilyTargetRule<byte[]> {

Review comment:
       don't forget to mark all rules `@Internal` or even better package private

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
##########
@@ -23,17 +23,32 @@
 import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
 import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
-import java.util.Objects;
-
-/** Identity casting rule. */
+/**
+ * Identity cast rule. For more details on when the rule is applied, check 
{@link
+ * #isIdentityCast(LogicalType, LogicalType)}
+ */
 @Internal
 public class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, 
Object> {
 
     public static final IdentityCastRule INSTANCE = new IdentityCastRule();
 
     private IdentityCastRule() {
-        super(CastRulePredicate.builder().predicate(Objects::equals).build());
+        
super(CastRulePredicate.builder().predicate(IdentityCastRule::isIdentityCast).build());
+    }
+
+    private static boolean isIdentityCast(
+            LogicalType inputLogicalType, LogicalType targetLogicalType) {
+        // TODO string to string casting now behaves like string casting.
+        //  the discussion in FLINK-24413 will address it
+        if (inputLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)
+                && targetLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+            return true;
+        }
+
+        // Identity cast applies if the two types are equals, except 
nullability
+        return 
inputLogicalType.copy(true).equals(targetLogicalType.copy(true));

Review comment:
       use 
`org.apache.flink.table.types.logical.utils.LogicalTypeCasts#supportsAvoidingCast`
 here. Nullability is not the only property that can be skipped.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
##########
@@ -36,4 +49,141 @@ static String functionCall(String functionName, Object... 
args) {
     static String functionCall(Method staticMethod, Object... args) {
         return functionCall(CodeGenUtils.qualifyMethod(staticMethod), args);
     }
+
+    static String constructorCall(Class<?> clazz, Object... args) {
+        return functionCall("new " + className(clazz), args);
+    }
+
+    static String methodCall(String instanceTerm, String methodName, Object... 
args) {
+        return functionCall(instanceTerm + "." + methodName, args);
+    }
+
+    static String newArray(String innerType, String arraySize) {
+        return "new " + innerType + "[" + arraySize + "]";
+    }
+
+    static String stringConcat(Object... args) {
+        return 
Arrays.stream(args).map(Object::toString).collect(Collectors.joining(" + "));
+    }
+
+    static String accessStaticField(Class<?> clazz, String fieldName) {
+        return className(clazz) + "." + fieldName;
+    }
+
+    static String ternaryOperator(String condition, String ifTrue, String 
ifFalse) {
+        return "((" + condition + ") ? (" + ifTrue + ") : (" + ifFalse + "))";
+    }
+
+    static String strLiteral() {
+        return "\"\"";
+    }
+
+    static String strLiteral(String str) {
+        return "\"" + StringEscapeUtils.escapeJava(str) + "\"";
+    }
+
+    static final class CodeWriter {
+        StringBuilder builder = new StringBuilder();
+
+        public CodeWriter declStmt(String varType, String varName, String 
value) {
+            return stmt(varType + " " + varName + " = " + value);
+        }
+
+        public CodeWriter declStmt(Class<?> clazz, String varName, String 
value) {
+            return declStmt(className(clazz), varName, value);
+        }
+
+        public CodeWriter declPrimitiveStmt(LogicalType logicalType, String 
varName, String value) {
+            return declStmt(primitiveTypeTermForType(logicalType), varName, 
value);
+        }
+
+        public CodeWriter declPrimitiveStmt(LogicalType logicalType, String 
varName) {
+            return declStmt(
+                    primitiveTypeTermForType(logicalType),
+                    varName,
+                    primitiveDefaultValue(logicalType));
+        }
+
+        public CodeWriter declStmt(String varType, String varName) {
+            return stmt(varType + " " + varName);
+        }
+
+        public CodeWriter declStmt(Class<?> clazz, String varName) {
+            return declStmt(className(clazz), varName);
+        }
+
+        public CodeWriter assignStmt(String varName, String value) {
+            return stmt(varName + " = " + value);
+        }
+
+        public CodeWriter assignArrayStmt(String varName, String index, String 
value) {
+            return stmt(varName + "[" + index + "] = " + value);
+        }
+
+        public CodeWriter stmt(String stmt) {
+            builder.append(stmt).append(';').append('\n');
+            return this;
+        }
+
+        public CodeWriter forStmt(
+                String upperBound, BiConsumer<String, CodeWriter> 
bodyWriterConsumer) {
+            final String indexTerm = newName("i");
+            final CodeWriter innerWriter = new CodeWriter();
+
+            builder.append("for (int ")
+                    .append(indexTerm)
+                    .append(" = 0; ")
+                    .append(indexTerm)
+                    .append(" < ")
+                    .append(upperBound)
+                    .append("; ")
+                    .append(indexTerm)
+                    .append("++) {\n");
+            bodyWriterConsumer.accept(indexTerm, innerWriter);
+            builder.append(innerWriter).append("}\n");
+
+            return this;
+        }
+
+        public CodeWriter ifStmt(String condition, Consumer<CodeWriter> 
bodyWriterConsumer) {
+            final CodeWriter innerWriter = new CodeWriter();
+
+            builder.append("if (").append(condition).append(") {\n");
+            bodyWriterConsumer.accept(innerWriter);
+            builder.append(innerWriter).append("}\n");
+
+            return this;
+        }
+
+        public CodeWriter ifStmt(
+                String condition,
+                Consumer<CodeWriter> thenWriterConsumer,
+                Consumer<CodeWriter> elseWriterConsumer) {
+            final CodeWriter thenWriter = new CodeWriter();
+            final CodeWriter elseWriter = new CodeWriter();
+
+            builder.append("if (").append(condition).append(") {\n");
+            thenWriterConsumer.accept(thenWriter);
+            builder.append(thenWriter).append("} else {\n");
+            elseWriterConsumer.accept(elseWriter);
+            builder.append(elseWriter).append("}\n");
+
+            return this;
+        }
+
+        public CodeWriter append(CastCodeBlock codeBlock) {
+            builder.append(codeBlock.getCode());
+            return this;
+        }
+
+        public CodeWriter append(String codeBlock) {

Review comment:
       `appendBlock`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BooleanToStringCastRule.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.stringConcat;
+
+/** {@link LogicalTypeRoot#BOOLEAN} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class BooleanToStringCastRule extends 
AbstractCharacterFamilyTargetRule<Object> {
+
+    public static final BooleanToStringCastRule INSTANCE = new 
BooleanToStringCastRule();
+
+    private BooleanToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeRoot.BOOLEAN)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .build());
+    }
+
+    @Override
+    public String generateStringExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return stringConcat(strLiteral(), inputTerm);

Review comment:
       maybe a constant `EMPTY_STRING` or `strLiteralEmpty()`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
##########
@@ -36,4 +49,141 @@ static String functionCall(String functionName, Object... 
args) {
     static String functionCall(Method staticMethod, Object... args) {
         return functionCall(CodeGenUtils.qualifyMethod(staticMethod), args);
     }
+
+    static String constructorCall(Class<?> clazz, Object... args) {
+        return functionCall("new " + className(clazz), args);
+    }
+
+    static String methodCall(String instanceTerm, String methodName, Object... 
args) {
+        return functionCall(instanceTerm + "." + methodName, args);
+    }
+
+    static String newArray(String innerType, String arraySize) {
+        return "new " + innerType + "[" + arraySize + "]";
+    }
+
+    static String stringConcat(Object... args) {
+        return 
Arrays.stream(args).map(Object::toString).collect(Collectors.joining(" + "));
+    }
+
+    static String accessStaticField(Class<?> clazz, String fieldName) {
+        return className(clazz) + "." + fieldName;
+    }
+
+    static String ternaryOperator(String condition, String ifTrue, String 
ifFalse) {
+        return "((" + condition + ") ? (" + ifTrue + ") : (" + ifFalse + "))";
+    }
+
+    static String strLiteral() {
+        return "\"\"";
+    }
+
+    static String strLiteral(String str) {
+        return "\"" + StringEscapeUtils.escapeJava(str) + "\"";

Review comment:
       we actually try to avoid to many external deps, you can use 
`org.apache.flink.table.utils.EncodingUtils#escapeJava`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final ArrayToStringCastRule INSTANCE = new 
ArrayToStringCastRule();
+
+    private ArrayToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ARRAY)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && CastRuleProvider.exists(
+                                                        ((ArrayType) 
input).getElementType(),
+                                                        target))
+                        .build());
+    }
+
+    /* Example generated code for ARRAY<INT>:
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.delete(0, builder$1.length());
+        builder$1.append("[");
+        for (int i$2 = 0; i$2 < _myInput.size(); i$2++) {
+            if (i$2 != 0) {
+                builder$1.append(", ");
+            }
+            int element$3 = -1;
+            boolean elementIsNull$4 = _myInput.isNullAt(i$2);
+            if (!elementIsNull$4) {
+                element$3 = _myInput.getInt(i$2);
+                result$2 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + element$3);
+                builder$1.append(result$2);
+            } else {
+                builder$1.append("NULL");
+            }
+        }
+        builder$1.append("]");
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
+
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, 
constructorCall(StringBuilder.class));
+
+        return new CastRuleUtils.CodeWriter()
+                .stmt(methodCall(builderTerm, "delete", 0, 
methodCall(builderTerm, "length")))
+                .stmt(methodCall(builderTerm, "append", strLiteral("[")))
+                .forStmt(
+                        methodCall(inputTerm, "size"),
+                        (indexTerm, loopBodyWriter) -> {
+                            String elementTerm = newName("element");
+                            String elementIsNullTerm = 
newName("elementIsNull");
+
+                            CastCodeBlock codeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            elementTerm,
+                                            "false",
+                                            // Null check is done at the array 
access level
+                                            innerInputType.copy(false),
+                                            targetLogicalType);
+
+                            loopBodyWriter
+                                    // Write the comma
+                                    .ifStmt(
+                                            indexTerm + " != 0",
+                                            thenBodyWriter ->
+                                                    thenBodyWriter.stmt(
+                                                            methodCall(
+                                                                    
builderTerm,
+                                                                    "append",
+                                                                    
strLiteral(", "))))
+                                    // Extract element from array
+                                    .declPrimitiveStmt(innerInputType, 
elementTerm)
+                                    .declStmt(
+                                            boolean.class,
+                                            elementIsNullTerm,
+                                            methodCall(inputTerm, "isNullAt", 
indexTerm))
+                                    .ifStmt(
+                                            "!" + elementIsNullTerm,
+                                            thenBodyWriter ->
+                                                    thenBodyWriter
+                                                            // If element not 
null, extract it and
+                                                            // execute the cast
+                                                            .assignStmt(
+                                                                    
elementTerm,
+                                                                    
rowFieldReadAccess(
+                                                                            
indexTerm,
+                                                                            
inputTerm,
+                                                                            
innerInputType))
+                                                            .append(codeBlock)
+                                                            .stmt(
+                                                                    methodCall(
+                                                                            
builderTerm,
+                                                                            
"append",
+                                                                            
codeBlock
+                                                                               
     .getReturnTerm())),
+                                            elseBodyWriter ->
+                                                    // If element is null, 
just write NULL
+                                                    elseBodyWriter.stmt(
+                                                            methodCall(
+                                                                    
builderTerm,
+                                                                    "append",
+                                                                    
strLiteral("NULL"))));

Review comment:
       global constant for this? maybe in `CastRuleUtils`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final ArrayToStringCastRule INSTANCE = new 
ArrayToStringCastRule();
+
+    private ArrayToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ARRAY)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && CastRuleProvider.exists(
+                                                        ((ArrayType) 
input).getElementType(),
+                                                        target))
+                        .build());
+    }
+
+    /* Example generated code for ARRAY<INT>:

Review comment:
       very good idea. make the code understandable!

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final ArrayToStringCastRule INSTANCE = new 
ArrayToStringCastRule();
+
+    private ArrayToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ARRAY)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && CastRuleProvider.exists(
+                                                        ((ArrayType) 
input).getElementType(),
+                                                        target))
+                        .build());
+    }
+
+    /* Example generated code for ARRAY<INT>:
+
+    isNull$0 = _myInputIsNull;

Review comment:
       use `<pre>`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimestampToStringCastRule.java
##########
@@ -53,11 +53,11 @@ public String generateStringExpression(
             String inputTerm,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        String zoneId =
+        final String zoneId =
                 
(inputLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE))
                         ? context.getSessionTimeZoneTerm()
-                        : className(DateTimeUtils.class) + ".UTC_ZONE";

Review comment:
       Maybe use the old method for performance? I don't see a reason to 
declare and access a time zone for a time zone less type.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} 
cast rule. */
+public class RowToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final RowToStringCastRule INSTANCE = new 
RowToStringCastRule();
+
+    private RowToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ROW)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && ((RowType) input)
+                                                        .getFields().stream()
+                                                                .allMatch(
+                                                                        field 
->
+                                                                               
 CastRuleProvider
+                                                                               
         .exists(
+                                                                               
                 field
+                                                                               
                         .getType(),
+                                                                               
                 target)))
+                        .build());
+    }
+
+    /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.delete(0, builder$1.length());
+        builder$1.append("(");
+        int f0Value$2 = -1;
+        boolean f0IsNull$3 = _myInput.isNullAt(0);
+        if (!f0IsNull$3) {
+            f0Value$2 = _myInput.getInt(0);
+            result$2 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$2);
+            builder$1.append(result$2);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(",");
+        org.apache.flink.table.data.binary.BinaryStringData f1Value$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+        boolean f1IsNull$5 = _myInput.isNullAt(1);
+        if (!f1IsNull$5) {
+            f1Value$4 = ((org.apache.flink.table.data.binary.BinaryStringData) 
_myInput.getString(1));
+            builder$1.append(f1Value$4);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(")");
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final List<LogicalType> fields =
+                ((RowType) inputLogicalType)
+                        .getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .collect(Collectors.toList());
+
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, 
constructorCall(StringBuilder.class));
+
+        final CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(
+                                methodCall(
+                                        builderTerm,
+                                        "delete",
+                                        0,
+                                        methodCall(builderTerm, "length")))
+                        .stmt(methodCall(builderTerm, "append", 
strLiteral("(")));
+
+        for (int i = 0; i < fields.size(); i++) {
+            final int fieldIndex = i;
+            final LogicalType fieldType = fields.get(fieldIndex);
+
+            final String fieldTerm = newName("f" + fieldIndex + "Value");
+            final String fieldIsNullTerm = newName("f" + fieldIndex + 
"IsNull");
+
+            final CastCodeBlock codeBlock =
+                    CastRuleProvider.generateCodeBlock(
+                            context,
+                            fieldTerm,
+                            fieldIsNullTerm,
+                            // Null check is done at the row access level
+                            fieldType.copy(false),
+                            targetLogicalType);
+
+            // Write the comma
+            if (fieldIndex != 0) {
+                writer.stmt(methodCall(builderTerm, "append", 
strLiteral(",")));

Review comment:
       use a space after the comma

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java
##########
@@ -68,63 +70,61 @@ protected String generateCodeBlockInternal(
             String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
-        LogicalType innerTargetType = ((ArrayType) 
targetLogicalType).getElementType();
-
-        String innerTargetTypeTerm = arrayElementType(innerTargetType);
-        String arraySize = inputTerm + ".size()";
-        String objArrayTerm = newName("objArray");
-
-        StringBuilder result = new StringBuilder();
-        result.append(
-                innerTargetTypeTerm
-                        + "[] "
-                        + objArrayTerm
-                        + " = new "
-                        + innerTargetTypeTerm
-                        + "["
-                        + arraySize
-                        + "];\n");
-
-        result.append("for (int i = 0; i < " + arraySize + "; i++) {\n");
-        CastCodeBlock codeBlock =
-                CastRuleProvider.generateCodeBlock(
-                        context,
-                        rowFieldReadAccess("i", inputTerm, innerInputType),
-                        functionCall(inputTerm + ".isNullAt", "i"),
-                        innerInputType.copy(false), // Null check is done at 
the array access level
-                        innerTargetType);
-
-        String innerElementCode =
-                codeBlock.getCode()
-                        + "\n"
-                        + objArrayTerm
-                        + "[i] = "
-                        + codeBlock.getReturnTerm()
-                        + ";\n";
+        final LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
+        final LogicalType innerTargetType =
+                sanitizeTargetType((ArrayType) inputLogicalType, (ArrayType) 
targetLogicalType);
+
+        final String innerTargetTypeTerm = arrayElementType(innerTargetType);
+        final String arraySize = methodCall(inputTerm, "size");
+        final String objArrayTerm = newName("objArray");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(
+                        innerTargetTypeTerm + "[]",
+                        objArrayTerm,
+                        newArray(innerTargetTypeTerm, arraySize))
+                .forStmt(
+                        arraySize,
+                        (index, loopWriter) -> {
+                            CastCodeBlock codeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(index, 
inputTerm, innerInputType),
+                                            "false",
+                                            // Null check is done at the array 
access level
+                                            innerInputType.copy(false),
+                                            innerTargetType);
+
+                            if (innerTargetType.isNullable()) {
+                                loopWriter.ifStmt(
+                                        "!" + functionCall(inputTerm + 
".isNullAt", index),
+                                        thenWriter ->
+                                                thenWriter
+                                                        .append(codeBlock)
+                                                        .assignArrayStmt(
+                                                                objArrayTerm,
+                                                                index,
+                                                                
codeBlock.getReturnTerm()));
+                            } else {
+                                loopWriter
+                                        .append(codeBlock)
+                                        .assignArrayStmt(
+                                                objArrayTerm, index, 
codeBlock.getReturnTerm());
+                            }
+                        })
+                .assignStmt(returnVariable, 
constructorCall(GenericArrayData.class, objArrayTerm))
+                .toString();
+    }
 
-        // Add null check if inner type is nullable
-        if (innerInputType.isNullable()) {
-            result.append("if (" + inputTerm + ".isNullAt(i)) {\n")
-                    .append(objArrayTerm + "[i] = null;\n")
-                    .append("} else {\n")
-                    .append(innerElementCode)
-                    .append("}\n");
-        } else {
-            result.append(innerElementCode);
+    private static LogicalType sanitizeTargetType(
+            ArrayType inputArrayType, ArrayType targetArrayType) {
+        LogicalType innerTargetType = targetArrayType.getElementType();
+        // TODO this seems rather a bug of the planner that 
generates/allows/doesn't sanitize

Review comment:
       See `LogicalTypeCasts`:
   
   ```
       private static boolean supportsCasting(
               LogicalType sourceType, LogicalType targetType, boolean 
allowExplicit) {
           // a NOT NULL type cannot store a NULL type
           // but it might be useful to cast explicitly with knowledge about 
the data
           if (sourceType.isNullable() && !targetType.isNullable() && 
!allowExplicit) {
               return false;
           }
           // ignore nullability during compare
           if (sourceType.copy(true).equals(targetType.copy(true))) {
               return true;
           }
   ```
   
   For explicit casts, this is totally fine if the user knows the data.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} 
cast rule. */
+public class RowToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final RowToStringCastRule INSTANCE = new 
RowToStringCastRule();
+
+    private RowToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->

Review comment:
       this is very hard to read. I know this is due to our coding format but 
you could introduce a little static helper method for readability in this case.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+
+/**
+ * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
+ */
+public class BinaryToStringCastRule extends 
AbstractCharacterFamilyTargetRule<byte[]> {
+
+    public static final BinaryToStringCastRule INSTANCE = new 
BinaryToStringCastRule();
+
+    private BinaryToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.BINARY_STRING)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .build());
+    }
+
+    @Override
+    public String generateStringExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return constructorCall(

Review comment:
       this is not helpful for the user, we should print the byte array instead 
similar to `ArrayToString`.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,39 +112,142 @@
                         .fromCase(SMALLINT(), (short) 10, 10L)
                         .fromCase(TINYINT(), (byte) 10, 10L),
                 CastTestSpecBuilder.testCastTo(STRING())
+                        .fromCase(STRING(), null, null)
+                        .fromCase(
+                                CHAR(3), StringData.fromString("foo"), 
StringData.fromString("foo"))
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                StringData.fromString("Flink"))
+                        .fromCase(
+                                VARCHAR(10),
+                                StringData.fromString("Flink"),
+                                StringData.fromString("Flink"))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache Flink"),
+                                StringData.fromString("Apache Flink"))
+                        .fromCase(BOOLEAN(), true, 
StringData.fromString("true"))
+                        .fromCase(BOOLEAN(), false, 
StringData.fromString("false"))
+                        .fromCase(
+                                BINARY(2), new byte[] {0, 1}, 
StringData.fromString("\u0000\u0001"))
+                        .fromCase(
+                                VARBINARY(3),
+                                new byte[] {0, 1, 2},
+                                StringData.fromString("\u0000\u0001\u0002"))
+                        .fromCase(
+                                VARBINARY(5),
+                                new byte[] {0, 1, 2},
+                                StringData.fromString("\u0000\u0001\u0002"))
+                        .fromCase(
+                                BYTES(),
+                                new byte[] {0, 1, 2, 3, 4},
+                                
StringData.fromString("\u0000\u0001\u0002\u0003\u0004"))
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new 
BigDecimal("9.87"), 4, 3),
+                                StringData.fromString("9.870"))
+                        .fromCase(
+                                DECIMAL(5, 3),
+                                DecimalData.fromBigDecimal(new 
BigDecimal("9.87"), 5, 3),
+                                StringData.fromString("9.870"))
+                        .fromCase(TINYINT(), (byte) -125, 
StringData.fromString("-125"))
+                        .fromCase(SMALLINT(), (short) 32767, 
StringData.fromString("32767"))
+                        .fromCase(INT(), -12345678, 
StringData.fromString("-12345678"))
+                        .fromCase(BIGINT(), 1234567891234L, 
StringData.fromString("1234567891234"))
+                        .fromCase(FLOAT(), -123.456f, 
StringData.fromString("-123.456"))
+                        .fromCase(DOUBLE(), 12345.678901d, 
StringData.fromString("12345.678901"))
+                        .fromCase(
+                                FLOAT(),
+                                Float.MAX_VALUE,
+                                
StringData.fromString(String.valueOf(Float.MAX_VALUE)))
+                        .fromCase(
+                                DOUBLE(),
+                                Double.MAX_VALUE,
+                                
StringData.fromString(String.valueOf(Double.MAX_VALUE)))
                         .fromCase(
                                 STRING(),
                                 StringData.fromString("Hello"),
                                 StringData.fromString("Hello"))
+                        .fromCase(TIMESTAMP(), TIMESTAMP, TIMESTAMP_STRING)
+                        .fromCase(TIMESTAMP_LTZ(), CET_CONTEXT, TIMESTAMP, 
TIMESTAMP_STRING_CET)
+                        .fromCase(DATE(), DATE, DATE_STRING)
+                        .fromCase(TIME(5), TIME, TIME_STRING)
+                        .fromCase(INTERVAL(YEAR()), 84, 
StringData.fromString("+7-00"))
+                        .fromCase(INTERVAL(MONTH()), 5, 
StringData.fromString("+0-05"))
+                        .fromCase(INTERVAL(MONTH()), 123, 
StringData.fromString("+10-03"))
+                        .fromCase(INTERVAL(MONTH()), 12334, 
StringData.fromString("+1027-10"))
+                        .fromCase(INTERVAL(DAY()), 10L, 
StringData.fromString("+0 00:00:00.010"))
+                        .fromCase(
+                                INTERVAL(DAY()),
+                                123456789L,
+                                StringData.fromString("+1 10:17:36.789"))
+                        .fromCase(
+                                INTERVAL(DAY()),
+                                Duration.ofHours(36).toMillis(),
+                                StringData.fromString("+1 12:00:00.000"))
+                        .fromCase(
+                                ARRAY(INTERVAL(MONTH())),
+                                new GenericArrayData(new int[] {-123, 123}),
+                                StringData.fromString("[-10-03, +10-03]"))
                         .fromCase(
-                                TIMESTAMP(),
-                                TimestampData.fromLocalDateTime(
-                                        
LocalDateTime.parse("2021-09-24T12:34:56.123456")),
-                                StringData.fromString("2021-09-24 
12:34:56.123456"))
-                        .fromCase(
-                                TIMESTAMP_LTZ(),
-                                CastRule.Context.create(
-                                        ZoneId.of("CET"),
-                                        
Thread.currentThread().getContextClassLoader()),
-                                TimestampData.fromLocalDateTime(
-                                        
LocalDateTime.parse("2021-09-24T12:34:56.123456")),
-                                StringData.fromString("2021-09-24 
14:34:56.123456")),
+                                ARRAY(INT()),
+                                new GenericArrayData(new int[] {-123, 456}),
+                                StringData.fromString("[-123, 456]"))
+                        .fromCase(
+                                ARRAY(INT().nullable()),
+                                new GenericArrayData(new Integer[] {null, 
456}),
+                                StringData.fromString("[NULL, 456]"))
+                        .fromCase(
+                                ARRAY(INT()),
+                                new GenericArrayData(new Integer[] {}),
+                                StringData.fromString("[]"))
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())),
+                                mapData(
+                                        entry(StringData.fromString("a"), 
-123),
+                                        entry(StringData.fromString("b"), 
123)),
+                                StringData.fromString("{a=-10-03, b=+10-03}"))
+                        .fromCase(
+                                MAP(STRING().nullable(), 
INTERVAL(MONTH()).nullable()),
+                                mapData(entry(null, -123), 
entry(StringData.fromString("b"), null)),
+                                StringData.fromString("{NULL=-10-03, b=NULL}"))
+                        .fromCase(
+                                MAP(STRING().nullable(), 
INTERVAL(MONTH()).nullable()),
+                                mapData(entry(null, null)),
+                                StringData.fromString("{NULL=NULL}"))
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())),
+                                mapData(),
+                                StringData.fromString("{}"))
+                        .fromCase(
+                                ROW(FIELD("f0", INT()), FIELD("f1", STRING())),
+                                GenericRowData.of(123, 
StringData.fromString("abc")),
+                                StringData.fromString("(123,abc)"))
+                        .fromCase(
+                                ROW(FIELD("f0", INT().nullable()), FIELD("f1", 
STRING())),
+                                GenericRowData.of(null, 
StringData.fromString("abc")),
+                                StringData.fromString("(NULL,abc)"))
+                        .fromCase(ROW(), GenericRowData.of(), 
StringData.fromString("()"))
+                        .fromCase(
+                                RAW(LocalDateTime.class, new 
LocalDateTimeSerializer()),
+                                RawValueData.fromObject(
+                                        
LocalDateTime.parse("2020-11-11T18:08:01.123")),
+                                
StringData.fromString("2020-11-11T18:08:01.123")),

Review comment:
       please also add test for the `NULL` type, `MULTISET`, and structured 
types. Structured types have the speciality that they can either be backed by a 
POJO (in which case we call `toString` on them) or `Row` in which case I would 
use the `RowData` representation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to