matriv commented on a change in pull request #18221:
URL: https://github.com/apache/flink/pull/18221#discussion_r783125705



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
##########
@@ -20,35 +20,137 @@
 
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import java.nio.charset.StandardCharsets;
 
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.arrayElement;
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.arrayLength;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
+import static 
org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;
+import static 
org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.stringExceedsLength;
 
 /**
  * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
  */
-class BinaryToStringCastRule extends AbstractCharacterFamilyTargetRule<byte[]> 
{
+class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte[], String> {
 
     static final BinaryToStringCastRule INSTANCE = new 
BinaryToStringCastRule();
 
     private BinaryToStringCastRule() {
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.BINARY_STRING)
-                        .target(STRING_TYPE)
+                        .target(LogicalTypeFamily.CHARACTER_STRING)
                         .build());
     }
 
+    /* Example generated code
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        builder$1.setLength(0);
+        builder$1.append("[");
+        for (int i$2 = 0; i$2 < _myInput.length; i$2++) {
+            if (i$2 != 0) {
+                builder$1.append(", ");
+            }
+            builder$1.append(java.lang.String.format("%02X", _myInput[i$2]));
+        }
+        builder$1.append("]");
+        java.lang.String resultString$152;
+        resultString$152 = builder$153.toString();
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$152);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+     */
+
     @Override
-    public String generateStringExpression(
+    protected String generateCodeBlockInternal(
             CodeGeneratorCastRule.Context context,
             String inputTerm,
+            String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        return constructorCall(
-                String.class, inputTerm, 
accessStaticField(StandardCharsets.class, "UTF_8"));
+        final String resultStringTerm = newName("resultString");
+        CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
+        if (context.legacyBehaviour()) {
+            writer.declStmt(String.class, resultStringTerm)
+                    .assignStmt(
+                            resultStringTerm,
+                            constructorCall(
+                                    String.class,
+                                    inputTerm,
+                                    accessStaticField(StandardCharsets.class, 
"UTF_8")));
+        } else {
+            final int length = LogicalTypeChecks.getLength(targetLogicalType);
+
+            final String builderTerm = newName("builder");
+            context.declareClassField(
+                    className(StringBuilder.class),
+                    builderTerm,
+                    constructorCall(StringBuilder.class));
+
+            writer.stmt(methodCall(builderTerm, "setLength", 0))
+                    .stmt(methodCall(builderTerm, "append", strLiteral("[")))
+                    .forStmt(
+                            arrayLength(inputTerm),
+                            (indexTerm, loopBodyWriter) -> {
+                                if (!context.legacyBehaviour() && 
couldTrim(length)) {

Review comment:
       true, thx!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to