This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d65a505 [Improve](case) add case for debezium update, delete and avro
convert (#65)
d65a505 is described below
commit d65a5055d1ea9328efec3a97b97710fa06f6d2ba
Author: wudi <[email protected]>
AuthorDate: Mon Apr 7 10:22:44 2025 +0800
[Improve](case) add case for debezium update, delete and avro convert (#65)
---
.../connector/e2e/kafka/KafkaContainerService.java | 4 +
.../e2e/kafka/KafkaContainerServiceImpl.java | 50 +++++-
.../e2e/sink/AbstractKafka2DorisSink.java | 44 ++++-
.../e2e/sink/avro/AbstractAvroE2ESinkTest.java | 98 ++++++++++
.../connector/e2e/sink/avro/AvroMsgE2ETest.java | 198 +++++++++++++++++++++
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 61 ++++---
.../e2e/avro_converter/confluent_avro_convert.json | 23 +++
.../e2e/avro_converter/confluent_avro_tab.sql | 11 ++
.../e2e/avro_converter/doris_avro_convert.json | 21 +++
.../e2e/avro_converter/doris_avro_tab.sql | 11 ++
.../e2e/string_converter/debezium_dml_event.json | 25 +++
.../string_converter/debezium_dml_event_tab.sql | 11 ++
12 files changed, 529 insertions(+), 28 deletions(-)
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
index 45616c4..2e9dc8d 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
@@ -25,6 +25,10 @@ public interface KafkaContainerService {
void startContainer();
+ void startSchemaRegistry();
+
+ String getSchemaRegistryUrl();
+
void startConnector();
String getInstanceHostAndPort();
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
index 4e38ab3..ec39f49 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
@@ -26,6 +26,7 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -41,7 +42,10 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
public class KafkaContainerServiceImpl implements KafkaContainerService {
@@ -60,9 +64,12 @@ public class KafkaContainerServiceImpl implements
KafkaContainerService {
private String kafkaServerHost;
private int kafkaServerPort;
private static final String CONNECT_PORT = "8083";
+ private static final String REGISTRY_PORT = "8081";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private final ExecutorService executorService =
Executors.newSingleThreadExecutor();
private static final int MAX_RETRIES = 5;
+ private GenericContainer schemaRegistryContainer;
+ private static Network network = Network.SHARED;
@Override
public String getInstanceHostAndPort() {
@@ -148,7 +155,7 @@ public class KafkaContainerServiceImpl implements
KafkaContainerService {
public void startContainer() {
LOG.info("kafka server is about to be initialized.");
kafkaContainer = new
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
-
+ kafkaContainer.withNetwork(network).withNetworkAliases("kafka");
kafkaContainer.start();
try {
Thread.sleep(5000);
@@ -166,12 +173,53 @@ public class KafkaContainerServiceImpl implements
KafkaContainerService {
kafkaServerPort);
}
+ @Override
+ public void startSchemaRegistry() {
+ LOG.info("start schema registry.");
+ schemaRegistryContainer =
+ new GenericContainer<>("confluentinc/cp-schema-registry:7.6.1")
+ .withNetwork(network)
+ .withExposedPorts(8081)
+ .withNetworkAliases("schema-registry")
+ .withEnv("SCHEMA_REGISTRY_HOST_NAME",
"schema-registry")
+ .withEnv(
+ "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+ "PLAINTEXT://kafka:9092")
+ .withEnv("SCHEMA_REGISTRY_LISTENERS",
"http://0.0.0.0:8081")
+ .waitingFor(
+
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
+
+ schemaRegistryContainer.start();
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new DorisException(e);
+ }
+ LOG.info(
+ "start schema registry successfuly, with "
+ + schemaRegistryContainer.getHost()
+ + ":"
+ + schemaRegistryContainer.getMappedPort(8081));
+ }
+
+ @Override
+ public String getSchemaRegistryUrl() {
+ return "http://"
+ + schemaRegistryContainer.getHost()
+ + ":"
+ + schemaRegistryContainer.getMappedPort(8081);
+ }
+
@Override
public void close() {
LOG.info("Kafka server is about to be shut down.");
shutdownConnector();
kafkaContainer.close();
LOG.info("Kafka server shuts down successfully.");
+ if (schemaRegistryContainer != null) {
+ schemaRegistryContainer.close();
+ LOG.info("Kafka schema registry shuts down successfully.");
+ }
}
private void shutdownConnector() {
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index ba919b6..78fd626 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -28,8 +28,12 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -39,6 +43,7 @@ import
org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService;
import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl;
import org.apache.doris.kafka.connector.exception.DorisException;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +77,17 @@ public abstract class AbstractKafka2DorisSink {
kafkaInstanceHostAndPort =
kafkaContainerService.getInstanceHostAndPort();
}
+ protected static void initSchemaRegistry() {
+ if (Objects.isNull(kafkaContainerService)) {
+ return;
+ }
+ kafkaContainerService.startSchemaRegistry();
+ }
+
+ protected static String getSchemaRegistryUrl() {
+ return kafkaContainerService.getSchemaRegistryUrl();
+ }
+
protected static Connection getJdbcConnection() {
try {
return DriverManager.getConnection(
@@ -152,7 +168,31 @@ public abstract class AbstractKafka2DorisSink {
@AfterClass
public static void close() {
- kafkaContainerService.close();
- dorisContainerService.close();
+ // Closed automatically, multiple itcases can be reused
+ // kafkaContainerService.close();
+ // dorisContainerService.close();
+ }
+
+ public void checkResult(List<String> expected, String query, int
columnSize) throws Exception {
+ List<String> actual = new ArrayList<>();
+
+ try (Statement statement = getJdbcConnection().createStatement()) {
+ ResultSet sinkResultSet = statement.executeQuery(query);
+ while (sinkResultSet.next()) {
+ List<String> row = new ArrayList<>();
+ for (int i = 1; i <= columnSize; i++) {
+ Object value = sinkResultSet.getObject(i);
+ if (value == null) {
+ row.add("null");
+ } else {
+ row.add(value.toString());
+ }
+ }
+ actual.add(StringUtils.join(row, ","));
+ }
+ }
+ LOG.info("expected result: {}", Arrays.toString(expected.toArray()));
+ LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
+ Assert.assertArrayEquals(expected.toArray(), actual.toArray());
}
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java
new file mode 100644
index 0000000..d6e11df
--- /dev/null
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.avro;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.doris.kafka.connector.e2e.sink.AbstractKafka2DorisSink;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Produce messages with avro. 1.Manage your own schema 2.Use a schema
registry */
+public abstract class AbstractAvroE2ESinkTest extends AbstractKafka2DorisSink {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractAvroE2ESinkTest.class);
+ private static KafkaProducer<String, byte[]> producer;
+ private static KafkaProducer<String, GenericRecord> avroProducer;
+
+ public static void initByteProducer() {
+ if (Objects.nonNull(producer)) {
+ return;
+ }
+ // Producer properties
+ Properties producerProperties = new Properties();
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInstanceHostAndPort);
+ producerProperties.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ org.apache.kafka.common.serialization.StringSerializer.class);
+ producerProperties.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+
org.apache.kafka.common.serialization.ByteArraySerializer.class);
+ producer = new KafkaProducer<>(producerProperties);
+ LOG.info("kafka producer started successfully.");
+ }
+
+ public static void initAvroProducer() {
+ if (Objects.nonNull(avroProducer)) {
+ return;
+ }
+ // Producer properties
+ Properties producerProperties = new Properties();
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInstanceHostAndPort);
+ producerProperties.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ org.apache.kafka.common.serialization.StringSerializer.class);
+ producerProperties.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ io.confluent.kafka.serializers.KafkaAvroSerializer.class);
+ producerProperties.put("schema.registry.url", getSchemaRegistryUrl());
+ avroProducer = new KafkaProducer<>(producerProperties);
+ LOG.info("kafka avro producer started successfully.");
+ }
+
+ protected void produceMsg2Kafka(String topic, GenericRecord value) {
+ LOG.info("Kafka avro producer will produce msg. topic={}, msg={}",
topic, value);
+ ProducerRecord<String, GenericRecord> record = new
ProducerRecord<>(topic, value);
+ avroProducer.send(
+ record,
+ (recordMetadata, e) ->
+ LOG.info(
+ "Send avro Callback is {}, with error is ",
+ recordMetadata.offset(),
+ e));
+ LOG.info("Kafka avro producer produced msg successfully. topic={},
msg={}", topic, value);
+ }
+
+ protected void produceMsg2Kafka(String topic, byte[] value) {
+ LOG.info("Kafka producer will produce msg. topic={}, msg={}", topic,
value);
+
+ ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic,
value);
+ producer.send(
+ record,
+ (recordMetadata, e) ->
+ LOG.info(
+ "Send Callback is {}, with error is ",
recordMetadata.offset(), e));
+
+ LOG.info("Kafka producer produced msg successfully. topic={}, msg={}",
topic, value);
+ }
+}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
new file mode 100644
index 0000000..d920903
--- /dev/null
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.avro;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroMsgE2ETest extends AbstractAvroE2ESinkTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(AvroMsgE2ETest.class);
+ private static String connectorName;
+ private static String jsonMsgConnectorContent;
+ private static DorisOptions dorisOptions;
+ private static String database;
+
+ @BeforeClass
+ public static void setUp() {
+ initServer();
+ }
+
+ public static void initialize(String connectorPath) {
+ jsonMsgConnectorContent = loadContent(connectorPath);
+ JsonNode rootNode = null;
+ try {
+ rootNode = objectMapper.readTree(jsonMsgConnectorContent);
+ } catch (IOException e) {
+ throw new DorisException("Failed to read content body.", e);
+ }
+ connectorName = rootNode.get(NAME).asText();
+ JsonNode configNode = rootNode.get(CONFIG);
+ Map<String, String> configMap = objectMapper.convertValue(configNode,
Map.class);
+ configMap.put(ConfigCheckUtils.TASK_ID, "1");
+ Map<String, String> lowerCaseConfigMap =
+ DorisSinkConnectorConfig.convertToLowercase(configMap);
+ DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap);
+ dorisOptions = new DorisOptions(lowerCaseConfigMap);
+ database = dorisOptions.getDatabase();
+ createDatabase(database);
+ setTimeZone();
+ }
+
+ private static void setTimeZone() {
+ executeSql(getJdbcConnection(), "set global time_zone =
'Asia/Shanghai'");
+ }
+
+ @Test
+ public void testConfluentAvroConvert() throws Exception {
+ LOG.info("starting to testConfluentAvroConvert test");
+ initSchemaRegistry();
+ initAvroProducer();
+
initialize("src/test/resources/e2e/avro_converter/confluent_avro_convert.json");
+
+ // replace file path
+ String connectJson =
+
loadContent("src/test/resources/e2e/avro_converter/confluent_avro_convert.json");
+ JsonNode jsonNode = new ObjectMapper().readTree(connectJson);
+ ObjectNode configNode = (ObjectNode) jsonNode.get("config");
+
+ configNode.put("key.converter.schema.registry.url",
getSchemaRegistryUrl());
+ configNode.put("value.converter.schema.registry.url",
getSchemaRegistryUrl());
+ jsonMsgConnectorContent = new
ObjectMapper().writeValueAsString(jsonNode);
+
+ String topic = "avro-user-confluent";
+ Schema.Parser parser = new Schema.Parser();
+ Schema schema =
parser.parse(loadContent("src/test/resources/decode/avro/user.avsc"));
+
+ GenericRecord user1 = new GenericData.Record(schema);
+ user1.put("id", 3);
+ user1.put("name", "kafka-confluent");
+ user1.put("age", 38);
+ produceMsg2Kafka(topic, user1);
+
+ GenericRecord user2 = new GenericData.Record(schema);
+ user2.put("id", 4);
+ user2.put("name", "doris-confluent");
+ user2.put("age", 58);
+ produceMsg2Kafka(topic, user2);
+
+ String tableSql =
+
loadContent("src/test/resources/e2e/avro_converter/confluent_avro_tab.sql");
+ createTable(tableSql);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+ Thread.sleep(30000);
+
+ String table = dorisOptions.getTopicMapTable(topic);
+ List<String> expected = Arrays.asList("3,kafka-confluent,38",
"4,doris-confluent,58");
+ String query = String.format("select id,name,age from %s.%s order by
id", database, table);
+ checkResult(expected, query, 3);
+ }
+
+ @Test
+ public void testDorisAvroConvert() throws Exception {
+ LOG.info("starting to testDorisAvroConvert test");
+ initByteProducer();
+
initialize("src/test/resources/e2e/avro_converter/doris_avro_convert.json");
+ // replace file path
+ String connectJson =
+
loadContent("src/test/resources/e2e/avro_converter/doris_avro_convert.json");
+ JsonNode jsonNode = new ObjectMapper().readTree(connectJson);
+ ObjectNode configNode = (ObjectNode) jsonNode.get("config");
+
+ String absolutePath = getAbsolutePath("decode/avro/user.avsc");
+ configNode.put(
+ "value.converter.avro.topic2schema.filepath",
"avro-user:file://" + absolutePath);
+ jsonMsgConnectorContent = new
ObjectMapper().writeValueAsString(jsonNode);
+
+ String topic = "avro-user";
+ Schema.Parser parser = new Schema.Parser();
+ Schema schema =
parser.parse(loadContent("src/test/resources/decode/avro/user.avsc"));
+
+ GenericRecord user1 = new GenericData.Record(schema);
+ user1.put("id", 1);
+ user1.put("name", "kafka");
+ user1.put("age", 30);
+ produceMsg2Kafka(topic, convertAvro2Byte(user1, schema));
+
+ GenericRecord user2 = new GenericData.Record(schema);
+ user2.put("id", 2);
+ user2.put("name", "doris");
+ user2.put("age", 18);
+ produceMsg2Kafka(topic, convertAvro2Byte(user2, schema));
+
+ String tableSql =
loadContent("src/test/resources/e2e/avro_converter/doris_avro_tab.sql");
+ createTable(tableSql);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+ Thread.sleep(30000);
+
+ String table = dorisOptions.getTopicMapTable(topic);
+ List<String> expected = Arrays.asList("1,kafka,30", "2,doris,18");
+ String query = String.format("select id,name,age from %s.%s order by
id", database, table);
+ checkResult(expected, query, 3);
+ }
+
+ @AfterClass
+ public static void closeInstance() {
+ kafkaContainerService.deleteKafkaConnector(connectorName);
+ }
+
+ public static String getAbsolutePath(String fileName) {
+ ClassLoader classLoader = AvroMsgE2ETest.class.getClassLoader();
+ URL resource = classLoader.getResource(fileName);
+ if (resource != null) {
+ return Paths.get(resource.getPath()).toAbsolutePath().toString();
+ } else {
+ return null;
+ }
+ }
+
+ public static byte[] convertAvro2Byte(GenericRecord data, Schema schema)
throws IOException {
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ writer.write(data, encoder);
+ encoder.flush();
+ byte[] avroBytes = out.toByteArray();
+ return avroBytes;
+ }
+}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index dba6d54..8c4f207 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -24,12 +24,10 @@ import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.exception.DorisException;
@@ -190,6 +188,42 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
checkResult(expected, query, 51);
}
+ @Test
+ public void testDebeziumIngestionDMLEvent() throws Exception {
+
initialize("src/test/resources/e2e/string_converter/debezium_dml_event.json");
+ String insert1 =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal
[...]
+ String topic = "debezium_dml_event";
+ produceMsg2Kafka(topic, insert1);
+ String tableSql =
+
loadContent("src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql");
+ createTable(tableSql);
+ Thread.sleep(2000);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ String table = dorisOptions.getTopicMapTable(topic);
+ List<String> expected = Arrays.asList("1,zhangsan,18");
+ Thread.sleep(20000);
+ String query = String.format("select * from %s.%s order by id",
database, table);
+ checkResult(expected, query, 3);
+
+ // update
+ String update =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal
[...]
+ produceMsg2Kafka(topic, update);
+ Thread.sleep(20000);
+ List<String> expectedUpdate = Arrays.asList("1,zhangsan,48");
+ checkResult(expectedUpdate, query, 3);
+
+ // delete
+ String delete =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal
[...]
+ produceMsg2Kafka(topic, delete);
+ Thread.sleep(20000);
+ List<String> expectedDelete = Arrays.asList();
+ checkResult(expectedDelete, query, 3);
+ }
+
@Test
public void testTimeExampleTypes() throws Exception {
initialize("src/test/resources/e2e/string_converter/time_types.json");
@@ -272,29 +306,6 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
checkResult(expectedResult, query1, 3);
}
- public void checkResult(List<String> expected, String query, int
columnSize) throws Exception {
- List<String> actual = new ArrayList<>();
-
- try (Statement statement = getJdbcConnection().createStatement()) {
- ResultSet sinkResultSet = statement.executeQuery(query);
- while (sinkResultSet.next()) {
- List<String> row = new ArrayList<>();
- for (int i = 1; i <= columnSize; i++) {
- Object value = sinkResultSet.getObject(i);
- if (value == null) {
- row.add("null");
- } else {
- row.add(value.toString());
- }
- }
- actual.add(StringUtils.join(row, ","));
- }
- }
- LOG.info("expected result: {}", Arrays.toString(expected.toArray()));
- LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
- Assert.assertArrayEquals(expected.toArray(), actual.toArray());
- }
-
@AfterClass
public static void closeInstance() {
kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git a/src/test/resources/e2e/avro_converter/confluent_avro_convert.json
b/src/test/resources/e2e/avro_converter/confluent_avro_convert.json
new file mode 100644
index 0000000..88dbd11
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/confluent_avro_convert.json
@@ -0,0 +1,23 @@
+{
+ "name":"confluent-avro-test",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"avro-user-confluent",
+ "tasks.max":"1",
+ "doris.topic2table.map": "avro-user-confluent:confluent_avro_tab",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"confluent_avro_convert",
+ "load.model":"stream_load",
+ "key.converter":"io.confluent.connect.avro.AvroConverter",
+ "key.converter.schema.registry.url":"http://127.0.0.1:8081",
+ "value.converter":"io.confluent.connect.avro.AvroConverter",
+ "value.converter.schema.registry.url":"http://127.0.0.1:8081"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql
b/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql
new file mode 100644
index 0000000..5864783
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql
@@ -0,0 +1,11 @@
+CREATE TABLE confluent_avro_convert.confluent_avro_tab
+(
+ `id` INT,
+ `name` VARCHAR(256),
+ `age` SMALLINT,
+)DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file
diff --git a/src/test/resources/e2e/avro_converter/doris_avro_convert.json
b/src/test/resources/e2e/avro_converter/doris_avro_convert.json
new file mode 100644
index 0000000..0a44ef8
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/doris_avro_convert.json
@@ -0,0 +1,21 @@
+{
+ "name":"avro_sink-connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"avro-user",
+ "tasks.max":"1",
+ "doris.topic2table.map": "avro-user:doris_avro_tab",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"avro_convert",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+
"value.converter":"org.apache.doris.kafka.connector.decode.avro.DorisAvroConverter",
+
"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/avro_converter/doris_avro_tab.sql
b/src/test/resources/e2e/avro_converter/doris_avro_tab.sql
new file mode 100644
index 0000000..a665c32
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/doris_avro_tab.sql
@@ -0,0 +1,11 @@
+CREATE TABLE avro_convert.doris_avro_tab
+(
+ `id` INT,
+ `name` VARCHAR(256),
+ `age` INT,
+)DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/debezium_dml_event.json
b/src/test/resources/e2e/string_converter/debezium_dml_event.json
new file mode 100644
index 0000000..73ee118
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/debezium_dml_event.json
@@ -0,0 +1,25 @@
+{
+ "name":"debezium_dml_event",
+ "config": {
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"debezium_dml_event",
+ "tasks.max":"1",
+ "doris.topic2table.map": "debezium_dml_event:debezium_dml_event_tab",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"debezium_ingestion_msg",
+ "converter.mode": "debezium_ingestion",
+ "load.model":"stream_load",
+ "delivery.guarantee":"exactly_once",
+ "enable.2pc": "true",
+ "enable.delete": "true",
+ "key.converter":"org.apache.kafka.connect.json.JsonConverter",
+ "value.converter":"org.apache.kafka.connect.json.JsonConverter"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql
b/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql
new file mode 100644
index 0000000..a3c7db4
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql
@@ -0,0 +1,11 @@
+CREATE TABLE debezium_ingestion_msg.debezium_dml_event_tab
+(
+ `id` INT,
+ `name` VARCHAR(256),
+ `age` SMALLINT,
+)UNIQUE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]