This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7db491f [improve] Update config for ConfigDef (#85)
7db491f is described below
commit 7db491fd0f4731722bbe1322d0deae11187640ea
Author: wudi <[email protected]>
AuthorDate: Wed Aug 13 11:32:26 2025 +0800
[improve] Update config for ConfigDef (#85)
---
.../connector/cfg/DorisSinkConnectorConfig.java | 118 +++++++++++++++------
.../writer/load/AsyncDorisStreamLoad.java | 2 +-
2 files changed, 84 insertions(+), 36 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 97bc125..6705f1d 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -19,6 +19,7 @@
package org.apache.doris.kafka.connector.cfg;
+import io.confluent.kafka.schemaregistry.utils.EnumRecommender;
import java.time.Duration;
import java.util.HashMap;
import java.util.Locale;
@@ -61,7 +62,7 @@ public class DorisSinkConnectorConfig {
public static final long BUFFER_FLUSH_TIME_SEC_DEFAULT = 120;
public static final String BUFFER_FLUSH_TIME_SEC = "buffer.flush.time";
- private static final String DORIS_INFO = "Doris Info";
+ private static final String DORIS_INFO = "Doris Cluster Info";
// doris config
public static final String DORIS_URLS = "doris.urls";
@@ -88,10 +89,12 @@ public class DorisSinkConnectorConfig {
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+ public static final String DEBEZIUM_CONFIG = "Debezium Config";
public static final String DEBEZIUM_SCHEMA_EVOLUTION =
"debezium.schema.evolution";
public static final String DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT =
SchemaEvolutionMode.NONE.getName();
+ public static final String RETRIES_GROUP = "Retries";
public static final String MAX_RETRIES = "max.retries";
public static final int MAX_RETRIES_DEFAULT = 10;
@@ -161,7 +164,7 @@ public class DorisSinkConnectorConfig {
null,
nonEmptyStringValidator,
Importance.HIGH,
- "Doris account url",
+ "Doris host url",
DORIS_INFO,
0,
ConfigDef.Width.NONE,
@@ -174,7 +177,7 @@ public class DorisSinkConnectorConfig {
DORIS_INFO,
1,
ConfigDef.Width.NONE,
- DORIS_URLS)
+ DORIS_QUERY_PORT)
.define(
DORIS_HTTP_PORT,
Type.INT,
@@ -212,7 +215,7 @@ public class DorisSinkConnectorConfig {
Importance.HIGH,
"Doris database name",
DORIS_INFO,
- 6,
+ 5,
ConfigDef.Width.NONE,
DORIS_DATABASE)
.define(
@@ -220,11 +223,11 @@ public class DorisSinkConnectorConfig {
Type.STRING,
"",
topicToTableValidator,
- Importance.LOW,
+ Importance.HIGH,
"Map of topics to tables (optional). Format :
comma-separated tuples, e.g."
+ "
<topic-1>:<table-1>,<topic-2>:<table-2>,... ",
CONNECTOR_CONFIG,
- 0,
+ 1,
ConfigDef.Width.NONE,
TOPICS_TABLES_MAP)
.define(
@@ -232,10 +235,10 @@ public class DorisSinkConnectorConfig {
Type.LONG,
BUFFER_COUNT_RECORDS_DEFAULT,
ConfigDef.Range.atLeast(1),
- Importance.LOW,
- "Number of records buffered in memory per partition
before triggering",
+ Importance.HIGH,
+ "Number of records buffered in memory before
triggering",
CONNECTOR_CONFIG,
- 1,
+ 2,
ConfigDef.Width.NONE,
BUFFER_COUNT_RECORDS)
.define(
@@ -243,10 +246,10 @@ public class DorisSinkConnectorConfig {
Type.LONG,
BUFFER_SIZE_BYTES_DEFAULT,
ConfigDef.Range.atLeast(1),
- Importance.LOW,
- "Cumulative size of records buffered in memory per
partition before triggering",
+ Importance.HIGH,
+ "Cumulative size of records buffered in memory before
triggering",
CONNECTOR_CONFIG,
- 2,
+ 3,
ConfigDef.Width.NONE,
BUFFER_SIZE_BYTES)
.define(
@@ -254,12 +257,33 @@ public class DorisSinkConnectorConfig {
Type.LONG,
BUFFER_FLUSH_TIME_SEC_DEFAULT,
ConfigDef.Range.atLeast(Duration.ofSeconds(1).getSeconds()),
- Importance.LOW,
+ Importance.HIGH,
"The time in seconds to flush cached data",
CONNECTOR_CONFIG,
- 3,
+ 4,
ConfigDef.Width.NONE,
BUFFER_FLUSH_TIME_SEC)
+ .define(
+ ENABLE_COMBINE_FLUSH,
+ Type.BOOLEAN,
+ ENABLE_COMBINE_FLUSH_DEFAULT,
+ Importance.HIGH,
+ "Whether to merge data from all partitions together
and write them. The default value is false. When enabled, only at_least_once
semantics are guaranteed.",
+ CONNECTOR_CONFIG,
+ 5,
+ ConfigDef.Width.NONE,
+ ENABLE_COMBINE_FLUSH)
+ .define(
+ DELIVERY_GUARANTEE,
+ Type.STRING,
+ DELIVERY_GUARANTEE_DEFAULT,
+ Importance.MEDIUM,
+ "How to ensure data consistency when consuming Kafka
data is imported into Doris. Supports at_least_once exactly_once, default is
at_least_once. Doris needs to be upgraded to 2.1.0 or above to ensure data
exactly_once",
+ CONNECTOR_CONFIG,
+ 6,
+ ConfigDef.Width.NONE,
+ DELIVERY_GUARANTEE,
+ EnumRecommender.in(DeliveryGuarantee.values()))
.define(
RECORD_TABLE_NAME_FIELD,
Type.STRING,
@@ -267,45 +291,69 @@ public class DorisSinkConnectorConfig {
Importance.LOW,
"The field name of record, and use this field value as
the table name to be written",
CONNECTOR_CONFIG,
- 4,
+ 7,
ConfigDef.Width.NONE,
RECORD_TABLE_NAME_FIELD)
.define(
- JMX_OPT,
- ConfigDef.Type.BOOLEAN,
- JMX_OPT_DEFAULT,
- ConfigDef.Importance.HIGH,
- "Whether to enable JMX MBeans for custom metrics")
+ BEHAVIOR_ON_NULL_VALUES,
+ Type.STRING,
+ BEHAVIOR_ON_NULL_VALUES_DEFAULT,
+ Importance.LOW,
+ "Used to handle records with a null value.",
+ CONNECTOR_CONFIG,
+ 8,
+ ConfigDef.Width.NONE,
+ BEHAVIOR_ON_NULL_VALUES)
+ // debezium config
+ .define(
+ CONVERTER_MODE,
+ Type.STRING,
+ CONVERT_MODE_DEFAULT,
+ Importance.LOW,
+ "Type conversion mode of upstream data when using
Connector to consume Kafka data.\n"
+ + "normal means consuming data in Kafka
normally without any type conversion.\n"
+ + "debezium_ingestion means that when Kafka
upstream data is collected through CDC (Changelog Data Capture) tools such as
Debezium, the upstream data needs to undergo special type conversion to support
it.",
+ DEBEZIUM_CONFIG,
+ 1,
+ ConfigDef.Width.NONE,
+ CONVERTER_MODE)
.define(
ENABLE_DELETE,
ConfigDef.Type.BOOLEAN,
ENABLE_DELETE_DEFAULT,
- ConfigDef.Importance.HIGH,
- "Used to synchronize delete events")
- .define(
- LOAD_MODEL,
- Type.STRING,
- LOAD_MODEL_DEFAULT,
- Importance.HIGH,
- "load model is stream_load.")
+ Importance.LOW,
+ "Under Debezium synchronization, whether to
synchronize deletion events. Non-Debezium messages need to be marked with
deletions themselves.",
+ DEBEZIUM_CONFIG,
+ 2,
+ ConfigDef.Width.NONE,
+ ENABLE_DELETE)
+ // Retries
.define(
MAX_RETRIES,
Type.INT,
MAX_RETRIES_DEFAULT,
Importance.MEDIUM,
- "The maximum number of times to retry on errors before
failing the task.")
+ "The maximum number of times to retry on errors before
failing the task.",
+ RETRIES_GROUP,
+ 1,
+ ConfigDef.Width.NONE,
+ MAX_RETRIES)
.define(
RETRY_INTERVAL_MS,
Type.INT,
RETRY_INTERVAL_MS_DEFAULT,
Importance.MEDIUM,
- "The time in milliseconds to wait following an error
before a retry attempt is made.")
+ "The time in milliseconds to wait following an error
before a retry attempt is made.",
+ RETRIES_GROUP,
+ 2,
+ ConfigDef.Width.NONE,
+ RETRY_INTERVAL_MS)
.define(
- BEHAVIOR_ON_NULL_VALUES,
- Type.STRING,
- BEHAVIOR_ON_NULL_VALUES_DEFAULT,
- Importance.LOW,
- "Used to handle records with a null value .");
+ JMX_OPT,
+ ConfigDef.Type.BOOLEAN,
+ JMX_OPT_DEFAULT,
+ ConfigDef.Importance.HIGH,
+ "Whether to enable JMX MBeans for custom metrics");
}
public static class TopicToTableValidator implements ConfigDef.Validator {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
index 49d7ba1..9dbbe90 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/load/AsyncDorisStreamLoad.java
@@ -92,8 +92,8 @@ public class AsyncDorisStreamLoad extends DataLoad {
new DefaultThreadFactory("streamload-executor"),
new ThreadPoolExecutor.AbortPolicy());
- start();
this.started = new AtomicBoolean(true);
+ start();
}
public void start() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]