slinkydeveloper commented on a change in pull request #17658:
URL: https://github.com/apache/flink/pull/17658#discussion_r743568048



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final ArrayToStringCastRule INSTANCE = new 
ArrayToStringCastRule();
+
+    private ArrayToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ARRAY)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && CastRuleProvider.exists(
+                                                        ((ArrayType) 
input).getElementType(),
+                                                        target))
+                        .build());
+    }
+
+    /* Example generated code for ARRAY<INT>:
+
+    isNull$0 = _myInputIsNull;

Review comment:
       Why? This is not a javadoc

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+
+/**
+ * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
+ */
+public class BinaryToStringCastRule extends 
AbstractCharacterFamilyTargetRule<byte[]> {

Review comment:
       They can't be package private because `CastRuleProvider` is in rules 
package.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+
+/**
+ * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
+ */
+public class BinaryToStringCastRule extends 
AbstractCharacterFamilyTargetRule<byte[]> {
+
+    public static final BinaryToStringCastRule INSTANCE = new 
BinaryToStringCastRule();
+
+    private BinaryToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.BINARY_STRING)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .build());
+    }
+
+    @Override
+    public String generateStringExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return constructorCall(

Review comment:
       This logic was copied over from `ScalarOperatorGens`, perhaps should we 
open an issue in https://issues.apache.org/jira/browse/FLINK-24403 about that?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} 
cast rule. */
+public class RowToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final RowToStringCastRule INSTANCE = new 
RowToStringCastRule();
+
+    private RowToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ROW)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && ((RowType) input)
+                                                        .getFields().stream()
+                                                                .allMatch(
+                                                                        field 
->
+                                                                               
 CastRuleProvider
+                                                                               
         .exists(
+                                                                               
                 field
+                                                                               
                         .getType(),
+                                                                               
                 target)))
+                        .build());
+    }
+
+    /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.delete(0, builder$1.length());
+        builder$1.append("(");
+        int f0Value$2 = -1;
+        boolean f0IsNull$3 = _myInput.isNullAt(0);
+        if (!f0IsNull$3) {
+            f0Value$2 = _myInput.getInt(0);
+            result$2 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$2);
+            builder$1.append(result$2);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(",");
+        org.apache.flink.table.data.binary.BinaryStringData f1Value$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+        boolean f1IsNull$5 = _myInput.isNullAt(1);
+        if (!f1IsNull$5) {
+            f1Value$4 = ((org.apache.flink.table.data.binary.BinaryStringData) 
_myInput.getString(1));
+            builder$1.append(f1Value$4);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(")");
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final List<LogicalType> fields =
+                ((RowType) inputLogicalType)
+                        .getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .collect(Collectors.toList());
+
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, 
constructorCall(StringBuilder.class));
+
+        final CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(
+                                methodCall(
+                                        builderTerm,
+                                        "delete",
+                                        0,
+                                        methodCall(builderTerm, "length")))
+                        .stmt(methodCall(builderTerm, "append", 
strLiteral("(")));
+
+        for (int i = 0; i < fields.size(); i++) {
+            final int fieldIndex = i;
+            final LogicalType fieldType = fields.get(fieldIndex);
+
+            final String fieldTerm = newName("f" + fieldIndex + "Value");
+            final String fieldIsNullTerm = newName("f" + fieldIndex + 
"IsNull");
+
+            final CastCodeBlock codeBlock =
+                    CastRuleProvider.generateCodeBlock(
+                            context,
+                            fieldTerm,
+                            fieldIsNullTerm,
+                            // Null check is done at the row access level
+                            fieldType.copy(false),
+                            targetLogicalType);
+
+            // Write the comma
+            if (fieldIndex != 0) {
+                writer.stmt(methodCall(builderTerm, "append", 
strLiteral(",")));

Review comment:
       As for https://github.com/apache/flink/pull/17658#discussion_r743574564, 
choosing which representation to use is out of scope of this PR. We have an 
issue open for that already: https://issues.apache.org/jira/browse/FLINK-17321

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} 
cast rule. */
+public class RowToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final RowToStringCastRule INSTANCE = new 
RowToStringCastRule();
+
+    private RowToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ROW)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && ((RowType) input)
+                                                        .getFields().stream()
+                                                                .allMatch(
+                                                                        field 
->
+                                                                               
 CastRuleProvider
+                                                                               
         .exists(
+                                                                               
                 field
+                                                                               
                         .getType(),
+                                                                               
                 target)))
+                        .build());
+    }
+
+    /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.delete(0, builder$1.length());
+        builder$1.append("(");
+        int f0Value$2 = -1;
+        boolean f0IsNull$3 = _myInput.isNullAt(0);
+        if (!f0IsNull$3) {
+            f0Value$2 = _myInput.getInt(0);
+            result$2 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$2);
+            builder$1.append(result$2);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(",");
+        org.apache.flink.table.data.binary.BinaryStringData f1Value$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+        boolean f1IsNull$5 = _myInput.isNullAt(1);
+        if (!f1IsNull$5) {
+            f1Value$4 = ((org.apache.flink.table.data.binary.BinaryStringData) 
_myInput.getString(1));
+            builder$1.append(f1Value$4);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(")");
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final List<LogicalType> fields =
+                ((RowType) inputLogicalType)
+                        .getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .collect(Collectors.toList());
+
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, 
constructorCall(StringBuilder.class));
+
+        final CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(
+                                methodCall(
+                                        builderTerm,
+                                        "delete",
+                                        0,
+                                        methodCall(builderTerm, "length")))
+                        .stmt(methodCall(builderTerm, "append", 
strLiteral("(")));

Review comment:
       Same as above

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimestampToStringCastRule.java
##########
@@ -53,11 +53,11 @@ public String generateStringExpression(
             String inputTerm,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        String zoneId =
+        final String zoneId =
                 
(inputLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE))
                         ? context.getSessionTimeZoneTerm()
-                        : className(DateTimeUtils.class) + ".UTC_ZONE";

Review comment:
       The old method was invoking the same method defaulting the zone to 
`UTC_ZONE`. Note that we don't declare any variable for `UTC_ZONE`, we just 
statically access to it. The variable is only declared for timestamp_ltz

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java
##########
@@ -68,63 +70,61 @@ protected String generateCodeBlockInternal(
             String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
-        LogicalType innerTargetType = ((ArrayType) 
targetLogicalType).getElementType();
-
-        String innerTargetTypeTerm = arrayElementType(innerTargetType);
-        String arraySize = inputTerm + ".size()";
-        String objArrayTerm = newName("objArray");
-
-        StringBuilder result = new StringBuilder();
-        result.append(
-                innerTargetTypeTerm
-                        + "[] "
-                        + objArrayTerm
-                        + " = new "
-                        + innerTargetTypeTerm
-                        + "["
-                        + arraySize
-                        + "];\n");
-
-        result.append("for (int i = 0; i < " + arraySize + "; i++) {\n");
-        CastCodeBlock codeBlock =
-                CastRuleProvider.generateCodeBlock(
-                        context,
-                        rowFieldReadAccess("i", inputTerm, innerInputType),
-                        functionCall(inputTerm + ".isNullAt", "i"),
-                        innerInputType.copy(false), // Null check is done at 
the array access level
-                        innerTargetType);
-
-        String innerElementCode =
-                codeBlock.getCode()
-                        + "\n"
-                        + objArrayTerm
-                        + "[i] = "
-                        + codeBlock.getReturnTerm()
-                        + ";\n";
+        final LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
+        final LogicalType innerTargetType =
+                sanitizeTargetType((ArrayType) inputLogicalType, (ArrayType) 
targetLogicalType);
+
+        final String innerTargetTypeTerm = arrayElementType(innerTargetType);
+        final String arraySize = methodCall(inputTerm, "size");
+        final String objArrayTerm = newName("objArray");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(
+                        innerTargetTypeTerm + "[]",
+                        objArrayTerm,
+                        newArray(innerTargetTypeTerm, arraySize))
+                .forStmt(
+                        arraySize,
+                        (index, loopWriter) -> {
+                            CastCodeBlock codeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(index, 
inputTerm, innerInputType),
+                                            "false",
+                                            // Null check is done at the array 
access level
+                                            innerInputType.copy(false),
+                                            innerTargetType);
+
+                            if (innerTargetType.isNullable()) {
+                                loopWriter.ifStmt(
+                                        "!" + functionCall(inputTerm + 
".isNullAt", index),
+                                        thenWriter ->
+                                                thenWriter
+                                                        .append(codeBlock)
+                                                        .assignArrayStmt(
+                                                                objArrayTerm,
+                                                                index,
+                                                                
codeBlock.getReturnTerm()));
+                            } else {
+                                loopWriter
+                                        .append(codeBlock)
+                                        .assignArrayStmt(
+                                                objArrayTerm, index, 
codeBlock.getReturnTerm());
+                            }
+                        })
+                .assignStmt(returnVariable, 
constructorCall(GenericArrayData.class, objArrayTerm))
+                .toString();
+    }
 
-        // Add null check if inner type is nullable
-        if (innerInputType.isNullable()) {
-            result.append("if (" + inputTerm + ".isNullAt(i)) {\n")
-                    .append(objArrayTerm + "[i] = null;\n")
-                    .append("} else {\n")
-                    .append(innerElementCode)
-                    .append("}\n");
-        } else {
-            result.append(innerElementCode);
+    private static LogicalType sanitizeTargetType(
+            ArrayType inputArrayType, ArrayType targetArrayType) {
+        LogicalType innerTargetType = targetArrayType.getElementType();
+        // TODO this seems rather a bug of the planner that 
generates/allows/doesn't sanitize

Review comment:
       Gotcha, I've modified the comment to explain why we need that logic.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final ArrayToStringCastRule INSTANCE = new 
ArrayToStringCastRule();
+
+    private ArrayToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ARRAY)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && CastRuleProvider.exists(
+                                                        ((ArrayType) 
input).getElementType(),
+                                                        target))
+                        .build());
+    }
+
+    /* Example generated code for ARRAY<INT>:
+
+    isNull$0 = _myInputIsNull;

Review comment:
       Why? This is not a javadoc

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+
+/**
+ * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
+ */
+public class BinaryToStringCastRule extends 
AbstractCharacterFamilyTargetRule<byte[]> {

Review comment:
       They can't be package private because `CastRuleProvider` is in rules 
package.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+
+/**
+ * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
+ */
+public class BinaryToStringCastRule extends 
AbstractCharacterFamilyTargetRule<byte[]> {
+
+    public static final BinaryToStringCastRule INSTANCE = new 
BinaryToStringCastRule();
+
+    private BinaryToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.BINARY_STRING)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .build());
+    }
+
+    @Override
+    public String generateStringExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return constructorCall(

Review comment:
       This logic was copied over from `ScalarOperatorGens`, perhaps should we 
open an issue in https://issues.apache.org/jira/browse/FLINK-24403 about that?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} 
cast rule. */
+public class RowToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final RowToStringCastRule INSTANCE = new 
RowToStringCastRule();
+
+    private RowToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ROW)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && ((RowType) input)
+                                                        .getFields().stream()
+                                                                .allMatch(
+                                                                        field 
->
+                                                                               
 CastRuleProvider
+                                                                               
         .exists(
+                                                                               
                 field
+                                                                               
                         .getType(),
+                                                                               
                 target)))
+                        .build());
+    }
+
+    /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.delete(0, builder$1.length());
+        builder$1.append("(");
+        int f0Value$2 = -1;
+        boolean f0IsNull$3 = _myInput.isNullAt(0);
+        if (!f0IsNull$3) {
+            f0Value$2 = _myInput.getInt(0);
+            result$2 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$2);
+            builder$1.append(result$2);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(",");
+        org.apache.flink.table.data.binary.BinaryStringData f1Value$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+        boolean f1IsNull$5 = _myInput.isNullAt(1);
+        if (!f1IsNull$5) {
+            f1Value$4 = ((org.apache.flink.table.data.binary.BinaryStringData) 
_myInput.getString(1));
+            builder$1.append(f1Value$4);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(")");
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final List<LogicalType> fields =
+                ((RowType) inputLogicalType)
+                        .getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .collect(Collectors.toList());
+
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, 
constructorCall(StringBuilder.class));
+
+        final CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(
+                                methodCall(
+                                        builderTerm,
+                                        "delete",
+                                        0,
+                                        methodCall(builderTerm, "length")))
+                        .stmt(methodCall(builderTerm, "append", 
strLiteral("(")));
+
+        for (int i = 0; i < fields.size(); i++) {
+            final int fieldIndex = i;
+            final LogicalType fieldType = fields.get(fieldIndex);
+
+            final String fieldTerm = newName("f" + fieldIndex + "Value");
+            final String fieldIsNullTerm = newName("f" + fieldIndex + 
"IsNull");
+
+            final CastCodeBlock codeBlock =
+                    CastRuleProvider.generateCodeBlock(
+                            context,
+                            fieldTerm,
+                            fieldIsNullTerm,
+                            // Null check is done at the row access level
+                            fieldType.copy(false),
+                            targetLogicalType);
+
+            // Write the comma
+            if (fieldIndex != 0) {
+                writer.stmt(methodCall(builderTerm, "append", 
strLiteral(",")));

Review comment:
       As for https://github.com/apache/flink/pull/17658#discussion_r743574564, 
choosing which representation to use is out of scope of this PR. We have an 
issue open for that already: https://issues.apache.org/jira/browse/FLINK-17321

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} 
cast rule. */
+public class RowToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+    public static final RowToStringCastRule INSTANCE = new 
RowToStringCastRule();
+
+    private RowToStringCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (input, target) ->
+                                        input.is(LogicalTypeRoot.ROW)
+                                                && 
target.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && ((RowType) input)
+                                                        .getFields().stream()
+                                                                .allMatch(
+                                                                        field 
->
+                                                                               
 CastRuleProvider
+                                                                               
         .exists(
+                                                                               
                 field
+                                                                               
                         .getType(),
+                                                                               
                 target)))
+                        .build());
+    }
+
+    /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.delete(0, builder$1.length());
+        builder$1.append("(");
+        int f0Value$2 = -1;
+        boolean f0IsNull$3 = _myInput.isNullAt(0);
+        if (!f0IsNull$3) {
+            f0Value$2 = _myInput.getInt(0);
+            result$2 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$2);
+            builder$1.append(result$2);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(",");
+        org.apache.flink.table.data.binary.BinaryStringData f1Value$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+        boolean f1IsNull$5 = _myInput.isNullAt(1);
+        if (!f1IsNull$5) {
+            f1Value$4 = ((org.apache.flink.table.data.binary.BinaryStringData) 
_myInput.getString(1));
+            builder$1.append(f1Value$4);
+        } else {
+            builder$1.append("NULL");
+        }
+        builder$1.append(")");
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final List<LogicalType> fields =
+                ((RowType) inputLogicalType)
+                        .getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .collect(Collectors.toList());
+
+        final String builderTerm = newName("builder");
+        context.declareClassField(
+                className(StringBuilder.class), builderTerm, 
constructorCall(StringBuilder.class));
+
+        final CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(
+                                methodCall(
+                                        builderTerm,
+                                        "delete",
+                                        0,
+                                        methodCall(builderTerm, "length")))
+                        .stmt(methodCall(builderTerm, "append", 
strLiteral("(")));

Review comment:
       Same as above

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimestampToStringCastRule.java
##########
@@ -53,11 +53,11 @@ public String generateStringExpression(
             String inputTerm,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        String zoneId =
+        final String zoneId =
                 
(inputLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE))
                         ? context.getSessionTimeZoneTerm()
-                        : className(DateTimeUtils.class) + ".UTC_ZONE";

Review comment:
       The old method was invoking the same method defaulting the zone to 
`UTC_ZONE`. Note that we don't declare any variable for `UTC_ZONE`, we just 
statically access to it. The variable is only declared for timestamp_ltz

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java
##########
@@ -68,63 +70,61 @@ protected String generateCodeBlockInternal(
             String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
-        LogicalType innerTargetType = ((ArrayType) 
targetLogicalType).getElementType();
-
-        String innerTargetTypeTerm = arrayElementType(innerTargetType);
-        String arraySize = inputTerm + ".size()";
-        String objArrayTerm = newName("objArray");
-
-        StringBuilder result = new StringBuilder();
-        result.append(
-                innerTargetTypeTerm
-                        + "[] "
-                        + objArrayTerm
-                        + " = new "
-                        + innerTargetTypeTerm
-                        + "["
-                        + arraySize
-                        + "];\n");
-
-        result.append("for (int i = 0; i < " + arraySize + "; i++) {\n");
-        CastCodeBlock codeBlock =
-                CastRuleProvider.generateCodeBlock(
-                        context,
-                        rowFieldReadAccess("i", inputTerm, innerInputType),
-                        functionCall(inputTerm + ".isNullAt", "i"),
-                        innerInputType.copy(false), // Null check is done at 
the array access level
-                        innerTargetType);
-
-        String innerElementCode =
-                codeBlock.getCode()
-                        + "\n"
-                        + objArrayTerm
-                        + "[i] = "
-                        + codeBlock.getReturnTerm()
-                        + ";\n";
+        final LogicalType innerInputType = ((ArrayType) 
inputLogicalType).getElementType();
+        final LogicalType innerTargetType =
+                sanitizeTargetType((ArrayType) inputLogicalType, (ArrayType) 
targetLogicalType);
+
+        final String innerTargetTypeTerm = arrayElementType(innerTargetType);
+        final String arraySize = methodCall(inputTerm, "size");
+        final String objArrayTerm = newName("objArray");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(
+                        innerTargetTypeTerm + "[]",
+                        objArrayTerm,
+                        newArray(innerTargetTypeTerm, arraySize))
+                .forStmt(
+                        arraySize,
+                        (index, loopWriter) -> {
+                            CastCodeBlock codeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(index, 
inputTerm, innerInputType),
+                                            "false",
+                                            // Null check is done at the array 
access level
+                                            innerInputType.copy(false),
+                                            innerTargetType);
+
+                            if (innerTargetType.isNullable()) {
+                                loopWriter.ifStmt(
+                                        "!" + functionCall(inputTerm + 
".isNullAt", index),
+                                        thenWriter ->
+                                                thenWriter
+                                                        .append(codeBlock)
+                                                        .assignArrayStmt(
+                                                                objArrayTerm,
+                                                                index,
+                                                                
codeBlock.getReturnTerm()));
+                            } else {
+                                loopWriter
+                                        .append(codeBlock)
+                                        .assignArrayStmt(
+                                                objArrayTerm, index, 
codeBlock.getReturnTerm());
+                            }
+                        })
+                .assignStmt(returnVariable, 
constructorCall(GenericArrayData.class, objArrayTerm))
+                .toString();
+    }
 
-        // Add null check if inner type is nullable
-        if (innerInputType.isNullable()) {
-            result.append("if (" + inputTerm + ".isNullAt(i)) {\n")
-                    .append(objArrayTerm + "[i] = null;\n")
-                    .append("} else {\n")
-                    .append(innerElementCode)
-                    .append("}\n");
-        } else {
-            result.append(innerElementCode);
+    private static LogicalType sanitizeTargetType(
+            ArrayType inputArrayType, ArrayType targetArrayType) {
+        LogicalType innerTargetType = targetArrayType.getElementType();
+        // TODO this seems rather a bug of the planner that 
generates/allows/doesn't sanitize

Review comment:
       Gotcha, I've modified the comment to explain why we need that logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to