This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 2f26c8f5 [Improve] add label replace when label not meet the
specifications (#498)
2f26c8f5 is described below
commit 2f26c8f54b1824d8ff504d3a222c63dcba50f5dd
Author: wudi <[email protected]>
AuthorDate: Wed Dec 4 11:55:16 2024 +0800
[Improve] add label replace when label not meet the specifications (#498)
---
.../doris/flink/sink/writer/LabelGenerator.java | 30 ++++++++-
.../flink/sink/writer/TestLabelGenerator.java | 73 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 2 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 4f7e8074..d80315f5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -20,9 +20,13 @@ package org.apache.doris.flink.sink.writer;
import org.apache.flink.util.Preconditions;
import java.util.UUID;
+import java.util.regex.Pattern;
/** Generator label for stream load. */
public class LabelGenerator {
+ // doris default label regex
+ private static final String LABEL_REGEX = "^[-_A-Za-z0-9:]{1,128}$";
+ private static final Pattern LABEL_PATTERN = Pattern.compile(LABEL_REGEX);
private String labelPrefix;
private boolean enable2PC;
private String tableIdentifier;
@@ -50,11 +54,33 @@ public class LabelGenerator {
public String generateTableLabel(long chkId) {
Preconditions.checkState(tableIdentifier != null);
String label = String.format("%s_%s_%s_%s", labelPrefix,
tableIdentifier, subtaskId, chkId);
- return enable2PC ? label : label + "_" + UUID.randomUUID();
+
+ if (!enable2PC) {
+ label = label + "_" + UUID.randomUUID();
+ }
+
+ if (LABEL_PATTERN.matcher(label).matches()) {
+ // The unicode table name or length exceeds the limit
+ return label;
+ }
+
+ if (enable2PC) {
+ // In 2pc, replace uuid with the table name. This will cause some
txns to fail to be
+ // aborted when aborting.
+ // Later, the label needs to be stored in the state and aborted
through label
+ return String.format("%s_%s_%s_%s", labelPrefix,
UUID.randomUUID(), subtaskId, chkId);
+ } else {
+ return String.format("%s_%s_%s_%s", labelPrefix, subtaskId, chkId,
UUID.randomUUID());
+ }
}
public String generateBatchLabel(String table) {
- return String.format("%s_%s_%s", labelPrefix, table,
UUID.randomUUID());
+ String uuid = UUID.randomUUID().toString();
+ String label = String.format("%s_%s_%s", labelPrefix, table, uuid);
+ if (!LABEL_PATTERN.matcher(label).matches()) {
+ return labelPrefix + "_" + uuid;
+ }
+ return label;
}
public String generateCopyBatchLabel(String table, long chkId, int
fileNum) {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java
new file mode 100644
index 00000000..c06b1966
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLabelGenerator {
+
+ private static String UUID_REGEX_WITHOUT_LINE =
+ "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
+
+ @Test
+ public void generateTableLabelTest() {
+ LabelGenerator labelGenerator = new LabelGenerator("test001", true,
"db.table", 0);
+ String label = labelGenerator.generateTableLabel(1);
+ Assert.assertEquals("test001_db_table_0_1", label);
+
+ // mock label length more than 128
+ labelGenerator =
+ new LabelGenerator(
+ "test001",
+ false,
+
"db.tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable",
+ 0);
+ label = labelGenerator.generateTableLabel(1);
+ Assert.assertTrue(label.matches("test001_0_1_" +
UUID_REGEX_WITHOUT_LINE));
+
+ // mock table name chinese
+ labelGenerator = new LabelGenerator("test001", false, "数据库.数据表", 0);
+ label = labelGenerator.generateTableLabel(1);
+ Assert.assertTrue(label.matches("test001_0_1_" +
UUID_REGEX_WITHOUT_LINE));
+
+ // mock table name chinese and 2pc
+ labelGenerator = new LabelGenerator("test001", true, "数据库.数据表", 0);
+ label = labelGenerator.generateTableLabel(1);
+ Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE +
"_0_1"));
+ }
+
+ @Test
+ public void generateBatchLabelTest() {
+ LabelGenerator labelGenerator = new LabelGenerator("test001", false);
+ String label = labelGenerator.generateBatchLabel("table");
+ Assert.assertTrue(label.matches("test001_table_" +
UUID_REGEX_WITHOUT_LINE));
+
+ // mock label length more than 128
+ labelGenerator = new LabelGenerator("test001", false);
+ label =
+ labelGenerator.generateBatchLabel(
+
"tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable");
+ Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE));
+
+ // mock table name chinese
+ labelGenerator = new LabelGenerator("test001", false);
+ label = labelGenerator.generateBatchLabel("数据库.数据表");
+ Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]