This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 083b343 [Improve]Improve the length of generated stream load label
(#18)
083b343 is described below
commit 083b343cbf880a90b60dffab3ff7397b34f7ae22
Author: wudongliang <[email protected]>
AuthorDate: Fri May 17 16:36:56 2024 +0800
[Improve]Improve the length of generated stream load label (#18)
---
.../doris/kafka/connector/utils/FileNameUtils.java | 2 +-
.../kafka/connector/writer/LabelGenerator.java | 39 +++++++---------------
.../kafka/connector/writer/StreamLoadWriter.java | 11 +-----
.../connector/writer/TestStreamLoadWriter.java | 8 ++---
4 files changed, 18 insertions(+), 42 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
index 052f03c..8c00f43 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
@@ -89,7 +89,7 @@ public class FileNameUtils {
}
public static long labelToEndOffset(String label) {
- return Long.parseLong(readFromFileName(label, 5));
+ return Long.parseLong(readFromFileName(label, 3));
}
/**
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
index 349db71..daca8b1 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
@@ -19,55 +19,40 @@
package org.apache.doris.kafka.connector.writer;
-import java.util.UUID;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
/** Generator label for stream load. */
public class LabelGenerator {
- private final String labelPrefix;
private String topic;
private int partition;
- private final boolean enable2PC;
private String tableIdentifier;
- private int subtaskId;
+ // The label of doris stream load cannot be repeated when loading.
+ // Under special circumstances (usually load failure) when
doris-kafka-connector is started,
+ // stream load is performed at the same offset every time, which will
cause label duplication.
+ // For this reason, we use labelRandomSuffix to generate a random suffix
at startup.
+ private final AtomicLong labelRandomSuffix;
- public LabelGenerator(String labelPrefix, boolean enable2PC) {
- this.labelPrefix = labelPrefix;
- this.enable2PC = enable2PC;
- }
-
- public LabelGenerator(
- String labelPrefix,
- boolean enable2PC,
- String topic,
- int partition,
- String tableIdentifier,
- int subtaskId) {
- this(labelPrefix, enable2PC);
+ public LabelGenerator(String topic, int partition, String tableIdentifier)
{
// The label of stream load can not contain `.`
this.tableIdentifier = tableIdentifier.replaceAll("\\.", "_");
this.topic = topic.replaceAll("\\.", "_");
- this.subtaskId = subtaskId;
this.partition = partition;
+ Random random = new Random();
+ labelRandomSuffix = new AtomicLong(random.nextInt(1000));
}
public String generateLabel(long lastOffset) {
StringBuilder sb = new StringBuilder();
- sb.append(labelPrefix)
- .append(LoadConstants.FILE_DELIM_DEFAULT)
- .append(topic)
+ sb.append(topic)
.append(LoadConstants.FILE_DELIM_DEFAULT)
.append(partition)
.append(LoadConstants.FILE_DELIM_DEFAULT)
.append(tableIdentifier)
.append(LoadConstants.FILE_DELIM_DEFAULT)
- .append(subtaskId)
- .append(LoadConstants.FILE_DELIM_DEFAULT)
.append(lastOffset)
.append(LoadConstants.FILE_DELIM_DEFAULT)
- .append(System.currentTimeMillis());
- if (!enable2PC) {
-
sb.append(LoadConstants.FILE_DELIM_DEFAULT).append(UUID.randomUUID());
- }
+ .append(labelRandomSuffix.getAndIncrement());
return sb.toString();
}
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index a8fc00a..54f102a 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -62,14 +62,7 @@ public class StreamLoadWriter extends DorisWriter {
DorisConnectMonitor connectMonitor) {
super(topic, partition, dorisOptions, connectionProvider,
connectMonitor);
this.taskId = dorisOptions.getTaskId();
- this.labelGenerator =
- new LabelGenerator(
- dorisOptions.getLabelPrefix(),
- true,
- topic,
- partition,
- tableIdentifier,
- taskId);
+ this.labelGenerator = new LabelGenerator(topic, partition,
tableIdentifier);
BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions,
LOG);
this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils);
this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions,
topic);
@@ -105,8 +98,6 @@ public class StreamLoadWriter extends DorisWriter {
String tmpTopic = topic.replaceAll("\\.", "_");
String querySQL =
queryPatten
- + dorisOptions.getLabelPrefix()
- + LoadConstants.FILE_DELIM_DEFAULT
+ tmpTopic
+ LoadConstants.FILE_DELIM_DEFAULT
+ partition
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index 0665a09..6f09bf1 100644
---
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -67,16 +67,16 @@ public class TestStreamLoadWriter {
private void fillLabel2Status() {
label2Status.put(
-
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_321__KC_1706149860395",
+
"KC_avro-complex10__KC_2__KC_test_person_complex__KC_321__KC_1706149860395",
"ABORT");
label2Status.put(
-
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_983__KC_1706149860395",
+
"KC_avro-complex10__KC_2__KC_test_person_complex__KC_983__KC_1706149860395",
"ABORT");
label2Status.put(
-
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_781__KC_1706149860395",
+
"avro-complex10__KC_2__KC_test_person_complex__KC_781__KC_1706149860395",
"VISIBLE");
label2Status.put(
-
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_832__KC_1706149860395",
+
"avro-complex10__KC_2__KC_test_person_complex__KC_832__KC_1706149860395",
"VISIBLE");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]