This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f26c8f5 [Improve] add label replace when label not meet the 
specifications  (#498)
2f26c8f5 is described below

commit 2f26c8f54b1824d8ff504d3a222c63dcba50f5dd
Author: wudi <[email protected]>
AuthorDate: Wed Dec 4 11:55:16 2024 +0800

    [Improve] add label replace when label not meet the specifications  (#498)
---
 .../doris/flink/sink/writer/LabelGenerator.java    | 30 ++++++++-
 .../flink/sink/writer/TestLabelGenerator.java      | 73 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 4f7e8074..d80315f5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -20,9 +20,13 @@ package org.apache.doris.flink.sink.writer;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
+import java.util.regex.Pattern;
 
 /** Generator label for stream load. */
 public class LabelGenerator {
+    // doris default label regex
+    private static final String LABEL_REGEX = "^[-_A-Za-z0-9:]{1,128}$";
+    private static final Pattern LABEL_PATTERN = Pattern.compile(LABEL_REGEX);
     private String labelPrefix;
     private boolean enable2PC;
     private String tableIdentifier;
@@ -50,11 +54,33 @@ public class LabelGenerator {
     public String generateTableLabel(long chkId) {
         Preconditions.checkState(tableIdentifier != null);
         String label = String.format("%s_%s_%s_%s", labelPrefix, 
tableIdentifier, subtaskId, chkId);
-        return enable2PC ? label : label + "_" + UUID.randomUUID();
+
+        if (!enable2PC) {
+            label = label + "_" + UUID.randomUUID();
+        }
+
+        if (LABEL_PATTERN.matcher(label).matches()) {
+            // The unicode table name or length exceeds the limit
+            return label;
+        }
+
+        if (enable2PC) {
+            // In 2pc, replace uuid with the table name. This will cause some 
txns to fail to be
+            // aborted when aborting.
+            // Later, the label needs to be stored in the state and aborted 
through label
+            return String.format("%s_%s_%s_%s", labelPrefix, 
UUID.randomUUID(), subtaskId, chkId);
+        } else {
+            return String.format("%s_%s_%s_%s", labelPrefix, subtaskId, chkId, 
UUID.randomUUID());
+        }
     }
 
     public String generateBatchLabel(String table) {
-        return String.format("%s_%s_%s", labelPrefix, table, 
UUID.randomUUID());
+        String uuid = UUID.randomUUID().toString();
+        String label = String.format("%s_%s_%s", labelPrefix, table, uuid);
+        if (!LABEL_PATTERN.matcher(label).matches()) {
+            return labelPrefix + "_" + uuid;
+        }
+        return label;
     }
 
     public String generateCopyBatchLabel(String table, long chkId, int 
fileNum) {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java
new file mode 100644
index 00000000..c06b1966
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLabelGenerator {
+
+    private static String UUID_REGEX_WITHOUT_LINE =
+            "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
+
+    @Test
+    public void generateTableLabelTest() {
+        LabelGenerator labelGenerator = new LabelGenerator("test001", true, 
"db.table", 0);
+        String label = labelGenerator.generateTableLabel(1);
+        Assert.assertEquals("test001_db_table_0_1", label);
+
+        // mock label length more than 128
+        labelGenerator =
+                new LabelGenerator(
+                        "test001",
+                        false,
+                        
"db.tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable",
+                        0);
+        label = labelGenerator.generateTableLabel(1);
+        Assert.assertTrue(label.matches("test001_0_1_" + 
UUID_REGEX_WITHOUT_LINE));
+
+        // mock table name chinese
+        labelGenerator = new LabelGenerator("test001", false, "数据库.数据表", 0);
+        label = labelGenerator.generateTableLabel(1);
+        Assert.assertTrue(label.matches("test001_0_1_" + 
UUID_REGEX_WITHOUT_LINE));
+
+        // mock table name chinese and 2pc
+        labelGenerator = new LabelGenerator("test001", true, "数据库.数据表", 0);
+        label = labelGenerator.generateTableLabel(1);
+        Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE + 
"_0_1"));
+    }
+
+    @Test
+    public void generateBatchLabelTest() {
+        LabelGenerator labelGenerator = new LabelGenerator("test001", false);
+        String label = labelGenerator.generateBatchLabel("table");
+        Assert.assertTrue(label.matches("test001_table_" + 
UUID_REGEX_WITHOUT_LINE));
+
+        // mock label length more than 128
+        labelGenerator = new LabelGenerator("test001", false);
+        label =
+                labelGenerator.generateBatchLabel(
+                        
"tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable");
+        Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE));
+
+        // mock table name chinese
+        labelGenerator = new LabelGenerator("test001", false);
+        label = labelGenerator.generateBatchLabel("数据库.数据表");
+        Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to