lincoln-lil commented on code in PR #25122:
URL: https://github.com/apache/flink/pull/25122#discussion_r1698592670


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java:
##########
@@ -964,6 +964,14 @@ private static BinaryStringData 
trimLeftSlow(BinaryStringData str) {
         }
     }
 
+    public static boolean isEmpty(BinaryStringData str) {
+        if (str.javaObject != null) {
+            return str.javaObject.isEmpty();
+        } else {
+            return str.binarySection == null || 
str.binarySection.getSizeInBytes() == 0;

Review Comment:
   We'd better add a comment here to explain why call 
`str.binarySection.getSizeInBytes()` directly instead of 
`BinaryStringData#getSizeInBytes`, and also add a test for this utility method.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/TranslateFunction.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#TRANSLATE}. */
+@Internal
+public class TranslateFunction extends BuiltInScalarFunction {
+
+    private static String lastFrom = "";
+    private static String lastTo = "";
+    private static Map<Integer, String> dict;

Review Comment:
   Use static variable as cache may encounter correctness issue caused by 
concurrent accessing(e.g., multi task threads deployed in a multi-slot TM) 
   We can use a `ThreadLocalCache` (similar to 
SqlFunctionUtils#REGEXP_PATTERN_CACHE) to achieve the accelerating  paying 
acceptable memory.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##########
@@ -878,6 +879,23 @@ public OutType substr(InType beginIndex) {
                 unresolvedCall(SUBSTR, toExpr(), 
objectToExpression(beginIndex)));
     }
 
+    /**
+     * Translate an {@code expr} where all characters in {@code fromStr} have 
been replaced with
+     * those in {@code toStr}. <br>
+     * If {@code toStr} has a shorter length than {@code fromStr}, unmatched 
characters are removed.

Review Comment:
   nit: -> '<p>NOTE: If ...'



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/TranslateFunction.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#TRANSLATE}. */
+@Internal
+public class TranslateFunction extends BuiltInScalarFunction {
+
+    private static String lastFrom = "";
+    private static String lastTo = "";
+    private static Map<Integer, String> dict;
+
+    public TranslateFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.TRANSLATE, context);
+    }
+
+    public @Nullable StringData eval(
+            @Nullable StringData expr, @Nullable StringData fromStr, @Nullable 
StringData toStr) {
+        if (expr == null
+                || fromStr == null
+                || BinaryStringDataUtil.isEmpty((BinaryStringData) expr)
+                || BinaryStringDataUtil.isEmpty((BinaryStringData) fromStr)) {
+            return expr;
+        }
+
+        final String source = expr.toString();
+        final String from = fromStr.toString();
+        final String to = toStr == null ? "" : toStr.toString();
+
+        if (!from.equals(lastFrom) || !to.equals(lastTo)) {
+            lastFrom = from;
+            lastTo = to;
+            dict = buildDict(from, to);
+        }
+
+        return BinaryStringData.fromString(translate(source));
+    }
+
+    private String translate(String expr) {
+        StringBuilder res = new StringBuilder();
+        for (int i = 0; i < expr.length(); ) {
+            int codePoint = expr.codePointAt(i);
+            i += Character.charCount(codePoint);
+            String ch = dict.get(codePoint);
+            if (ch == null) {
+                res.append(Character.toChars(codePoint));

Review Comment:
   Have you compared the performance of `Character.toChars(codePoint)` and 
`String.substring`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -970,6 +970,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
                     .build();
 
+    // By default, Calcite parse TRANSLATE as TRANSLATE3, hence it is 
necessary to modify the
+    // name filed to ensure it is called correctly.

Review Comment:
   filed -> field? seems a typo 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to