This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7db491f  [improve] Update config for ConfigDef (#85)
7db491f is described below

commit 7db491fd0f4731722bbe1322d0deae11187640ea
Author: wudi <[email protected]>
AuthorDate: Wed Aug 13 11:32:26 2025 +0800

    [improve] Update config for ConfigDef (#85)
---
 .../connector/cfg/DorisSinkConnectorConfig.java    | 118 +++++++++++++++------
 .../writer/load/AsyncDorisStreamLoad.java          |   2 +-
 2 files changed, 84 insertions(+), 36 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 97bc125..6705f1d 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.doris.kafka.connector.cfg;
 
+import io.confluent.kafka.schemaregistry.utils.EnumRecommender;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Locale;
@@ -61,7 +62,7 @@ public class DorisSinkConnectorConfig {
     public static final long BUFFER_FLUSH_TIME_SEC_DEFAULT = 120;
     public static final String BUFFER_FLUSH_TIME_SEC = "buffer.flush.time";
 
-    private static final String DORIS_INFO = "Doris Info";
+    private static final String DORIS_INFO = "Doris Cluster Info";
 
     // doris config
     public static final String DORIS_URLS = "doris.urls";
@@ -88,10 +89,12 @@ public class DorisSinkConnectorConfig {
 
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+    public static final String DEBEZIUM_CONFIG = "Debezium Config";
     public static final String DEBEZIUM_SCHEMA_EVOLUTION = 
"debezium.schema.evolution";
     public static final String DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT =
             SchemaEvolutionMode.NONE.getName();
 
+    public static final String RETRIES_GROUP = "Retries";
     public static final String MAX_RETRIES = "max.retries";
     public static final int MAX_RETRIES_DEFAULT = 10;
 
@@ -161,7 +164,7 @@ public class DorisSinkConnectorConfig {
                         null,
                         nonEmptyStringValidator,
                         Importance.HIGH,
-                        "Doris account url",
+                        "Doris host url",
                         DORIS_INFO,
                         0,
                         ConfigDef.Width.NONE,
@@ -174,7 +177,7 @@ public class DorisSinkConnectorConfig {
                         DORIS_INFO,
                         1,
                         ConfigDef.Width.NONE,
-                        DORIS_URLS)
+                        DORIS_QUERY_PORT)
                 .define(
                         DORIS_HTTP_PORT,
                         Type.INT,
@@ -212,7 +215,7 @@ public class DorisSinkConnectorConfig {
                         Importance.HIGH,
                         "Doris database name",
                         DORIS_INFO,
-                        6,
+                        5,
                         ConfigDef.Width.NONE,
                         DORIS_DATABASE)
                 .define(
@@ -220,11 +223,11 @@ public class DorisSinkConnectorConfig {
                         Type.STRING,
                         "",
                         topicToTableValidator,
-                        Importance.LOW,
+                        Importance.HIGH,
                         "Map of topics to tables (optional). Format : 
comma-separated tuples, e.g."
                                 + " 
<topic-1>:<table-1>,<topic-2>:<table-2>,... ",
                         CONNECTOR_CONFIG,
-                        0,
+                        1,
                         ConfigDef.Width.NONE,
                         TOPICS_TABLES_MAP)
                 .define(
@@ -232,10 +235,10 @@ public class DorisSinkConnectorConfig {
                         Type.LONG,
                         BUFFER_COUNT_RECORDS_DEFAULT,
                         ConfigDef.Range.atLeast(1),
-                        Importance.LOW,
-                        "Number of records buffered in memory per partition 
before triggering",
+                        Importance.HIGH,
+                        "Number of records buffered in memory before 
triggering",
                         CONNECTOR_CONFIG,
-                        1,
+                        2,
                         ConfigDef.Width.NONE,
                         BUFFER_COUNT_RECORDS)
                 .define(
@@ -243,10 +246,10 @@ public class DorisSinkConnectorConfig {
                         Type.LONG,
                         BUFFER_SIZE_BYTES_DEFAULT,
                         ConfigDef.Range.atLeast(1),
-                        Importance.LOW,
-                        "Cumulative size of records buffered in memory per 
partition before triggering",
+                        Importance.HIGH,
+                        "Cumulative size of records buffered in memory before 
triggering",
                         CONNECTOR_CONFIG,
-                        2,
+                        3,
                         ConfigDef.Width.NONE,
                         BUFFER_SIZE_BYTES)
                 .define(
@@ -254,12 +257,33 @@ public class DorisSinkConnectorConfig {
                         Type.LONG,
                         BUFFER_FLUSH_TIME_SEC_DEFAULT,
                         
ConfigDef.Range.atLeast(Duration.ofSeconds(1).getSeconds()),
-                        Importance.LOW,
+                        Importance.HIGH,
                         "The time in seconds to flush cached data",
                         CONNECTOR_CONFIG,
-                        3,
+                        4,
                         ConfigDef.Width.NONE,
                         BUFFER_FLUSH_TIME_SEC)
+                .define(
+                        ENABLE_COMBINE_FLUSH,
+                        Type.BOOLEAN,
+                        ENABLE_COMBINE_FLUSH_DEFAULT,
+                        Importance.HIGH,
+                        "Whether to merge data from all partitions together 
and write them. The default value is false. When enabled, only at_least_once 
semantics are guaranteed.",
+                        CONNECTOR_CONFIG,
+                        5,
+                        ConfigDef.Width.NONE,
+                        ENABLE_COMBINE_FLUSH)
+                .define(
+                        DELIVERY_GUARANTEE,
+                        Type.STRING,
+                        DELIVERY_GUARANTEE_DEFAULT,
+                        Importance.MEDIUM,
+                        "How to ensure data consistency when consuming Kafka 
data is imported into Doris. Supports at_least_once exactly_once, default is 
at_least_once. Doris needs to be upgraded to 2.1.0 or above to ensure data 
exactly_once",
+                        CONNECTOR_CONFIG,
+                        6,
+                        ConfigDef.Width.NONE,
+                        DELIVERY_GUARANTEE,
+                        EnumRecommender.in(DeliveryGuarantee.values()))
                 .define(
                         RECORD_TABLE_NAME_FIELD,
                         Type.STRING,
@@ -267,45 +291,69 @@ public class DorisSinkConnectorConfig {
                         Importance.LOW,
                         "The field name of record, and use this field value as 
the table name to be written",
                         CONNECTOR_CONFIG,
-                        4,
+                        7,
                         ConfigDef.Width.NONE,
                         RECORD_TABLE_NAME_FIELD)
                 .define(
-                        JMX_OPT,
-                        ConfigDef.Type.BOOLEAN,
-                        JMX_OPT_DEFAULT,
-                        ConfigDef.Importance.HIGH,
-                        "Whether to enable JMX MBeans for custom metrics")
+                        BEHAVIOR_ON_NULL_VALUES,
+                        Type.STRING,
+                        BEHAVIOR_ON_NULL_VALUES_DEFAULT,
+                        Importance.LOW,
+                        "Used to handle records with a null value.",
+                        CONNECTOR_CONFIG,
+                        8,
+                        ConfigDef.Width.NONE,
+                        BEHAVIOR_ON_NULL_VALUES)
+                // debezium config
+                .define(
+                        CONVERTER_MODE,
+                        Type.STRING,
+                        CONVERT_MODE_DEFAULT,
+                        Importance.LOW,
+                        "Type conversion mode of upstream data when using 
Connector to consume Kafka data.\n"
+                                + "normal means consuming data in Kafka 
normally without any type conversion.\n"
+                                + "debezium_ingestion means that when Kafka 
upstream data is collected through CDC (Changelog Data Capture) tools such as 
Debezium, the upstream data needs to undergo special type conversion to support 
it.",
+                        DEBEZIUM_CONFIG,
+                        1,
+                        ConfigDef.Width.NONE,
+                        CONVERTER_MODE)
                 .define(
                         ENABLE_DELETE,
                         ConfigDef.Type.BOOLEAN,
                         ENABLE_DELETE_DEFAULT,
-                        ConfigDef.Importance.HIGH,
-                        "Used to synchronize delete events")
-                .define(
-                        LOAD_MODEL,
-                        Type.STRING,
-                        LOAD_MODEL_DEFAULT,
-                        Importance.HIGH,
-                        "load model is stream_load.")
+                        Importance.LOW,
+                        "Under Debezium synchronization, whether to 
synchronize deletion events. Non-Debezium messages need to be marked with 
deletions themselves.",
+                        DEBEZIUM_CONFIG,
+                        2,
+                        ConfigDef.Width.NONE,
+                        ENABLE_DELETE)
+                // Retries
                 .define(
                         MAX_RETRIES,
                         Type.INT,
                         MAX_RETRIES_DEFAULT,
                         Importance.MEDIUM,
-                        "The maximum number of times to retry on errors before 
failing the task.")
+                        "The maximum number of times to retry on errors before 
failing the task.",
+                        RETRIES_GROUP,
+                        1,
+                        ConfigDef.Width.NONE,
+                        MAX_RETRIES)
                 .define(
                         RETRY_INTERVAL_MS,
                         Type.INT,
                         RETRY_INTERVAL_MS_DEFAULT,
                         Importance.MEDIUM,
-                        "The time in milliseconds to wait following an error 
before a retry attempt is made.")
+                        "The time in milliseconds to wait following an error 
before a retry attempt is made.",
+                        RETRIES_GROUP,
+                        2,
+                        ConfigDef.Width.NONE,
+                        RETRY_INTERVAL_MS)
                 .define(
-                        BEHAVIOR_ON_NULL_VALUES,
-                        Type.STRING,
-                        BEHAVIOR_ON_NULL_VALUES_DEFAULT,
-                        Importance.LOW,
-                        "Used to handle records with a null value .");
+                        JMX_OPT,
+                        ConfigDef.Type.BOOLEAN,
+                        JMX_OPT_DEFAULT,
+                        ConfigDef.Importance.HIGH,
+                        "Whether to enable JMX MBeans for custom metrics");
     }
 
     public static class TopicToTableValidator implements ConfigDef.Validator {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
index 49d7ba1..9dbbe90 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
@@ -92,8 +92,8 @@ public class AsyncDorisStreamLoad extends DataLoad {
                         new DefaultThreadFactory("streamload-executor"),
                         new ThreadPoolExecutor.AbortPolicy());
 
-        start();
         this.started = new AtomicBoolean(true);
+        start();
     }
 
     public void start() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to