This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d2a3ac3 [e2e](test) add partial column update and debezium ingestion
e2e case (#38)
d2a3ac3 is described below
commit d2a3ac3479cda9530ff3e9cd56d8973e581b7ea1
Author: Petrichor <[email protected]>
AuthorDate: Wed Jul 10 10:04:31 2024 +0800
[e2e](test) add partial column update and debezium ingestion e2e case
(#38)
---
.../e2e/sink/AbstractKafka2DorisSink.java | 12 +++++
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 61 ++++++++++++++++++++++
.../resources/e2e/string_converter/full_types.json | 22 ++++++++
.../full_types_debezium_ingestion.sql | 59 +++++++++++++++++++++
.../string_converter/insert_partial_update_tab.sql | 3 ++
.../e2e/string_converter/partial_update.json | 24 +++++++++
.../e2e/string_converter/partial_update_tab.sql | 15 ++++++
7 files changed, 196 insertions(+)
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index c4c7fe4..d50556f 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -108,6 +108,18 @@ public abstract class AbstractKafka2DorisSink {
LOG.info("Create doris table successfully. sql={}", sql);
}
+ protected void insertTable(String sql) {
+ LOG.info("Will insert data to Doris table. SQL: {}", sql);
+ try {
+ Statement statement = getJdbcConnection().createStatement();
+ int rowCount = statement.executeUpdate(sql);
+ LOG.info("Inserted {} item data into the Doris table.", rowCount);
+ } catch (SQLException e) {
+ throw new DorisException("Failed to insert data to Doris table.",
e);
+ }
+ LOG.info("Data insertion to Doris table was successful. SQL: {}", sql);
+ }
+
private static void initDorisBase() {
if (Objects.nonNull(dorisContainerService)) {
return;
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 227a203..3143461 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -125,6 +125,65 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
checkResult(expected, query, 3);
}
+ @Test
+ public void testPartialUpdate() throws Exception {
+
initialize("src/test/resources/e2e/string_converter/partial_update.json");
+ String topic = "partial_update_test";
+ String msg1 =
+
"{\"id\":1,\"col1\":\"after_update_col1_1\",\"col2\":\"after_update_col2_1\"}";
+ String msg2 =
+
"{\"id\":2,\"col1\":\"after_update_col1_2\",\"col2\":\"after_update_col2_2\"}";
+
+ produceMsg2Kafka(topic, msg1);
+ produceMsg2Kafka(topic, msg2);
+
+ String tableSql =
+
loadContent("src/test/resources/e2e/string_converter/partial_update_tab.sql");
+ String insertSql =
+ loadContent(
+
"src/test/resources/e2e/string_converter/insert_partial_update_tab.sql");
+ createTable(tableSql);
+ Thread.sleep(2000);
+ insertTable(insertSql);
+ Thread.sleep(15000);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ String table = dorisOptions.getTopicMapTable(topic);
+ List<String> expected =
+ Arrays.asList(
+
"1,after_update_col1_1,after_update_col2_1,before_update_col3_1",
+
"2,after_update_col1_2,after_update_col2_2,before_update_col3_2");
+ Thread.sleep(10000);
+ String query =
+ String.format("select id,col1,col2,col3 from %s.%s order by
id", database, table);
+ checkResult(expected, query, 4);
+ }
+
+ @Test
+ public void testDebeziumIngestionFullTypes() throws Exception {
+ initialize("src/test/resources/e2e/string_converter/full_types.json");
+ String topic = "full_types";
+ String msg1 =
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_z_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"small_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_c\"},{\"type\":\"int32\",\"optio
[...]
+
+ produceMsg2Kafka(topic, msg1);
+
+ String tableSql =
+ loadContent(
+
"src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql");
+ createTable(tableSql);
+ Thread.sleep(2000);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ String table = dorisOptions.getTopicMapTable(topic);
+ List<String> expected =
+ Arrays.asList(
+
"1,127,255,255,32767,65535,65535,8388607,16777215,16777215,2147483647,4294967295,4294967295,2147483647,9223372036854775807,-1,-1,Hello
World,abc,123.102,123.102,123.103,123.104,404.4443,404.4444,404.4445,123.4567,123.4568,123.4569,346,34567892.1,false,true,true,2020-07-17,18:00:22,2020-07-17T18:00:22,2020-07-17T18:00:22,text,2021,red,a,b,{\"key1\":\"value1\"},{coordinates=[3,1],
type=Point, srid=0},{coordinates=[[[1,1],[2,1],[2,2],[1,2],[1,1]]],
type=Polygon, srid [...]
+ Thread.sleep(10000);
+ String query = String.format("select * from %s.%s order by id",
database, table);
+ checkResult(expected, query, 51);
+ }
+
public void checkResult(List<String> expected, String query, int
columnSize) throws Exception {
List<String> actual = new ArrayList<>();
@@ -143,6 +202,8 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
actual.add(StringUtils.join(row, ","));
}
}
+ LOG.info("expected result: {}", Arrays.toString(expected.toArray()));
+ LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
Assert.assertArrayEquals(expected.toArray(), actual.toArray());
}
diff --git a/src/test/resources/e2e/string_converter/full_types.json
b/src/test/resources/e2e/string_converter/full_types.json
new file mode 100644
index 0000000..ef95aef
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/full_types.json
@@ -0,0 +1,22 @@
+{
+ "name":"mysql_all_types",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"full_types",
+ "tasks.max":"1",
+ "doris.topic2table.map": "full_types:full_types_tab",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"debezium_ingestion_msg",
+ "converter.mode": "debezium_ingestion",
+ "load.model":"stream_load",
+ "key.converter":"org.apache.kafka.connect.json.JsonConverter",
+ "value.converter":"org.apache.kafka.connect.json.JsonConverter"
+ }
+}
\ No newline at end of file
diff --git
a/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql
b/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql
new file mode 100644
index 0000000..7cd001c
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql
@@ -0,0 +1,59 @@
+CREATE TABLE debezium_ingestion_msg.full_types_tab
+(
+ `id` LARGEINT NULL,
+ `tiny_c` TINYINT NULL,
+ `tiny_un_c` SMALLINT NULL,
+ `tiny_un_z_c` SMALLINT NULL,
+ `small_c` SMALLINT NULL,
+ `small_un_c` INT NULL,
+ `small_un_z_c` INT NULL,
+ `medium_c` INT NULL,
+ `medium_un_c` BIGINT NULL,
+ `medium_un_z_c` BIGINT NULL,
+ `int_c` INT NULL,
+ `int_un_c` BIGINT NULL,
+ `int_un_z_c` BIGINT NULL,
+ `int11_c` INT NULL,
+ `big_c` BIGINT NULL,
+ `big_un_c` LARGEINT NULL,
+ `big_un_z_c` LARGEINT NULL,
+ `varchar_c` VARCHAR(765) NULL,
+ `char_c` VARCHAR(9) NULL,
+ `real_c` DOUBLE NULL,
+ `float_c` FLOAT NULL,
+ `float_un_c` FLOAT NULL,
+ `float_un_z_c` FLOAT NULL,
+ `double_c` DOUBLE NULL,
+ `double_un_c` DOUBLE NULL,
+ `double_un_z_c` DOUBLE NULL,
+ `decimal_c` DECIMAL(8, 4) NULL,
+ `decimal_un_c` DECIMAL(8, 4) NULL,
+ `decimal_un_z_c` DECIMAL(8, 4) NULL,
+ `numeric_c` DECIMAL(6, 0) NULL,
+ `big_decimal_c` TEXT NULL,
+ `bit1_c` BOOLEAN NULL,
+ `tiny1_c` BOOLEAN NULL,
+ `boolean_c` BOOLEAN NULL,
+ `date_c` DATE NULL,
+ `time_c` TEXT NULL,
+ `datetime_c` DATETIME NULL,
+ `timestamp_c` DATETIME NULL,
+ `text_c` TEXT NULL,
+ `year_c` INT NULL,
+ `enum_c` TEXT NULL,
+ `set_c` TEXT NULL,
+ `json_c` JSON NULL,
+ `point_c` TEXT NULL,
+ `geometry_c` TEXT NULL,
+ `linestring_c` TEXT NULL,
+ `polygon_c` TEXT NULL,
+ `multipoint_c` TEXT NULL,
+ `multiline_c` TEXT NULL,
+ `multipolygon_c` TEXT NULL,
+ `geometrycollection_c` TEXT NULL
+)UNIQUE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file
diff --git
a/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql
b/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql
new file mode 100644
index 0000000..3dff4dd
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql
@@ -0,0 +1,3 @@
+insert into string_msg.partial_update_tab (id, col1, col2, col3)
+values (1, "before_update_col1_1", "before_update_col2_1",
"before_update_col3_1"),
+ (2, "before_update_col1_2", "before_update_col2_2",
"before_update_col3_2");
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/partial_update.json
b/src/test/resources/e2e/string_converter/partial_update.json
new file mode 100644
index 0000000..0caaa4b
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/partial_update.json
@@ -0,0 +1,24 @@
+{
+ "name":"partial_update_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"partial_update_test",
+ "tasks.max":"1",
+ "doris.topic2table.map": "partial_update_test:partial_update_tab",
+ "buffer.count.records":"2",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"string_msg",
+ "sink.properties.partial_columns":"true",
+ "sink.properties.columns": "id,col1,col2",
+ "enable.2pc": "false",
+ "load.model":"stream_load",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/partial_update_tab.sql
b/src/test/resources/e2e/string_converter/partial_update_tab.sql
new file mode 100644
index 0000000..64d3d5b
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/partial_update_tab.sql
@@ -0,0 +1,15 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE string_msg.partial_update_tab (
+ id INT NULL,
+ col1 VARCHAR(20) NULL,
+ col2 varchar(20) NULL,
+ col3 varchar(20) NUll
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change"="true",
+"enable_unique_key_merge_on_write" = "true"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]