This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new fc4c8c8 [Improve]schema.evolution adds debezium prefix (#24)
fc4c8c8 is described below
commit fc4c8c8b392ad979621a58124546018520930aa6
Author: wudongliang <[email protected]>
AuthorDate: Mon Jun 3 18:05:18 2024 +0800
[Improve]schema.evolution adds debezium prefix (#24)
---
src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java | 4 ++--
.../apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java | 5 +++--
.../apache/doris/kafka/connector/converter/TestRecordService.java | 2 +-
3 files changed, 6 insertions(+), 5 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 4596f69..5747925 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -95,8 +95,8 @@ public class DorisOptions {
this.schemaEvolutionMode =
SchemaEvolutionMode.of(
config.getOrDefault(
- DorisSinkConnectorConfig.SCHEMA_EVOLUTION,
-
DorisSinkConnectorConfig.SCHEMA_EVOLUTION_DEFAULT));
+
DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION,
+
DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT));
this.fileSize =
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES));
this.recordNum =
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 5c33da4..f4cabb3 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -82,8 +82,9 @@ public class DorisSinkConnectorConfig {
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
- public static final String SCHEMA_EVOLUTION = "schema.evolution";
- public static final String SCHEMA_EVOLUTION_DEFAULT =
SchemaEvolutionMode.NONE.getName();
+ public static final String DEBEZIUM_SCHEMA_EVOLUTION =
"debezium.schema.evolution";
+ public static final String DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT =
+ SchemaEvolutionMode.NONE.getName();
// metrics
public static final String JMX_OPT = "jmx";
diff --git
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index f80a737..0142259 100644
---
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -72,7 +72,7 @@ public class TestRecordService {
props.load(stream);
props.put("task_id", "1");
props.put("converter.mode", "debezium_ingestion");
- props.put("schema.evolution", "basic");
+ props.put("debezium.schema.evolution", "basic");
props.put(
"doris.topic2table.map",
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]