This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c5b8396 [Improve] check topic mutating SMTs (#68)
c5b8396 is described below
commit c5b83966103b7eb04799d524118f787ca122f6f5
Author: wangchuang <[email protected]>
AuthorDate: Fri Apr 25 16:02:03 2025 +0800
[Improve] check topic mutating SMTs (#68)
---
.../doris/kafka/connector/DorisSinkTask.java | 2 +-
.../connector/service/DorisDefaultSinkService.java | 20 ++++++++++-
.../connector/service/DorisSinkServiceFactory.java | 6 ++--
.../connector/e2e/kafka/KafkaContainerService.java | 2 ++
.../e2e/kafka/KafkaContainerServiceImpl.java | 33 ++++++++++++++++++
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 25 ++++++++++++++
.../connector/service/TestDorisSinkService.java | 40 +++++++++++++++++++++-
.../e2e/transforms/regex_router_transforms.json | 26 ++++++++++++++
.../e2e/transforms/regex_router_transforms.sql | 12 +++++++
9 files changed, 161 insertions(+), 5 deletions(-)
diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index e83f033..576de26 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -55,7 +55,7 @@ public class DorisSinkTask extends SinkTask {
LOG.info("kafka doris sink task start with {}", parsedConfig);
this.options = new DorisOptions(parsedConfig);
this.remainingRetries = options.getMaxRetries();
- this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig);
+ this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig,
context);
}
/**
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 36f78fe..3a220f1 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -43,7 +43,9 @@ import org.apache.doris.kafka.connector.writer.load.LoadModel;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,9 +67,11 @@ public class DorisDefaultSinkService implements
DorisSinkService {
private final MetricsJmxReporter metricsJmxReporter;
private final DorisConnectMonitor connectMonitor;
private final ObjectMapper objectMapper;
+ private final SinkTaskContext context;
- DorisDefaultSinkService(Map<String, String> config) {
+ DorisDefaultSinkService(Map<String, String> config, SinkTaskContext
context) {
this.dorisOptions = new DorisOptions(config);
+ this.context = context;
this.objectMapper = new ObjectMapper();
this.writer = new HashMap<>();
this.conn = new JdbcConnectionProvider(dorisOptions);
@@ -131,6 +135,8 @@ public class DorisDefaultSinkService implements
DorisSinkService {
record.kafkaOffset());
continue;
}
+ // check topic mutating SMTs
+ checkTopicMutating(record);
// Might happen a count of record based flushing,buffer
insert(record);
}
@@ -194,6 +200,18 @@ public class DorisDefaultSinkService implements
DorisSinkService {
});
}
+ /** Check if the topic of record is mutated */
+ public void checkTopicMutating(SinkRecord record) {
+ TopicPartition tp = new TopicPartition(record.topic(),
record.kafkaPartition());
+ if (!context.assignment().contains(tp)) {
+ throw new ConnectException(
+ "Unexpected topic name: ["
+ + record.topic()
+ + "] that doesn't match assigned partitions. "
+ + "Connector doesn't support topic mutating SMTs.
");
+ }
+ }
+
/**
* Get the table name in doris for the given record.
*
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
index 05ae10d..dbf7192 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
@@ -20,11 +20,13 @@
package org.apache.doris.kafka.connector.service;
import java.util.Map;
+import org.apache.kafka.connect.sink.SinkTaskContext;
/** A factory to create {@link DorisSinkService} */
public class DorisSinkServiceFactory {
- public static DorisSinkService getDorisSinkService(Map<String, String>
connectorConfig) {
- return new DorisDefaultSinkService(connectorConfig);
+ public static DorisSinkService getDorisSinkService(
+ Map<String, String> connectorConfig, SinkTaskContext context) {
+ return new DorisDefaultSinkService(connectorConfig, context);
}
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
index 2e9dc8d..786d536 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
@@ -37,5 +37,7 @@ public interface KafkaContainerService {
void deleteKafkaConnector(String name);
+ String getConnectorTaskStatus(String name);
+
void close();
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
index ec39f49..cf1c164 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
@@ -19,6 +19,8 @@
package org.apache.doris.kafka.connector.e2e.kafka;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -35,10 +37,12 @@ import
org.apache.doris.kafka.connector.exception.DorisException;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +74,7 @@ public class KafkaContainerServiceImpl implements
KafkaContainerService {
private static final int MAX_RETRIES = 5;
private GenericContainer schemaRegistryContainer;
private static Network network = Network.SHARED;
+ private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String getInstanceHostAndPort() {
@@ -284,4 +289,32 @@ public class KafkaContainerServiceImpl implements
KafkaContainerService {
}
LOG.info("{} Kafka connector deleted successfully.", name);
}
+
+ @Override
+ public String getConnectorTaskStatus(String name) {
+ String connectUrl = "http://" + kafkaServerHost + ":" + CONNECT_PORT +
"/connectors/";
+ String getStatusUrl = connectUrl + name + "/status";
+ HttpGet httpGet = new HttpGet(getStatusUrl);
+ try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() != 200) {
+ LOG.warn(
+ "Failed to get connector status, name={}, msg={}",
+ name,
+ statusLine.getReasonPhrase());
+ }
+ JsonNode root =
objectMapper.readTree(EntityUtils.toString(response.getEntity()));
+ JsonNode tasks = root.get("tasks");
+ // tasks is an array, and only care about the first task
+ if (tasks != null && tasks.isArray() && tasks.size() > 0) {
+ JsonNode task = tasks.get(0);
+ return task.get("state").asText(); // RUNNING / FAILED /
UNASSIGNED
+ } else {
+ LOG.warn("No task info found for connector: " + name);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to get kafka connect task status, name={}", name);
+ }
+ return null;
+ }
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 8c4f207..4430dad 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -306,6 +306,31 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
checkResult(expectedResult, query1, 3);
}
+ @Test
+ public void testTopicMutatingSmt() throws Exception {
+
initialize("src/test/resources/e2e/transforms/regex_router_transforms.json");
+ String topic = "p-regex_router_transform_msg";
+ String msg1 = "{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\"}";
+ produceMsg2Kafka(topic, msg1);
+
+ String tableSql1 =
+
loadContent("src/test/resources/e2e/transforms/regex_router_transforms.sql");
+ createTable(tableSql1);
+
+ Thread.sleep(2000);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ List<String> expectedResult = Collections.emptyList();
+ Thread.sleep(10000);
+ String query1 =
+ String.format(
+ "select id,col1,col2 from %s.%s order by id",
+ database, "regex_router_transform_msg");
+
+ Assert.assertEquals("FAILED",
kafkaContainerService.getConnectorTaskStatus(connectorName));
+ checkResult(expectedResult, query1, 3);
+ }
+
@AfterClass
public static void closeInstance() {
kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
index 7ea1c2a..dc2e6ae 100644
---
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -19,6 +19,10 @@
package org.apache.doris.kafka.connector.service;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
@@ -26,12 +30,15 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -53,7 +60,10 @@ public class TestDorisSinkService {
props.put("task_id", "1");
props.put("name", "sink-connector-test");
props.put("record.tablename.field", "table_name");
- dorisDefaultSinkService = new DorisDefaultSinkService((Map) props);
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition assignedTp = new TopicPartition("expected-topic", 0);
+ when(context.assignment()).thenReturn(Sets.newHashSet(assignedTp));
+ dorisDefaultSinkService = new DorisDefaultSinkService((Map) props,
context);
jsonConverter.configure(new HashMap<>(), false);
}
@@ -132,4 +142,32 @@ public class TestDorisSinkService {
Assert.assertEquals(
"test_kafka_tbl",
dorisDefaultSinkService.getSinkDorisTableName(record5));
}
+
+ @Test
+ public void testCheckTopicMutating() {
+ SinkRecord record1 =
+ new SinkRecord(
+ "expected-topic",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "val",
+ 1);
+ SinkRecord record2 =
+ new SinkRecord(
+ "mutated-topic",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "val",
+ 1);
+ dorisDefaultSinkService.checkTopicMutating(record1);
+
+ Assert.assertThrows(
+ "Unexpected topic name: [mutated_topic] that doesn't match
assigned partitions. Connector doesn't support topic mutating SMTs.",
+ ConnectException.class,
+ () -> dorisDefaultSinkService.checkTopicMutating(record2));
+ }
}
diff --git a/src/test/resources/e2e/transforms/regex_router_transforms.json
b/src/test/resources/e2e/transforms/regex_router_transforms.json
new file mode 100644
index 0000000..d8aff6d
--- /dev/null
+++ b/src/test/resources/e2e/transforms/regex_router_transforms.json
@@ -0,0 +1,26 @@
+{
+ "name":"regex_router_transforms_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"p-regex_router_transform_msg",
+ "tasks.max":"1",
+ "doris.topic2table.map":
"p-regex_router_transform_msg:regex_router_transform_msg,regex_router_transform_msg:regex_router_transform_msg",
+ "buffer.count.records":"2",
+ "buffer.flush.time":"11",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"transforms_msg",
+ "load.model":"stream_load",
+ "transforms": "dropPrefix",
+ "transforms.dropPrefix.type":
"org.apache.kafka.connect.transforms.RegexRouter",
+ "transforms.dropPrefix.regex": "p-(.*)",
+ "transforms.dropPrefix.replacement": "$1",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/transforms/regex_router_transforms.sql
b/src/test/resources/e2e/transforms/regex_router_transforms.sql
new file mode 100644
index 0000000..e72edcd
--- /dev/null
+++ b/src/test/resources/e2e/transforms/regex_router_transforms.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE transforms_msg.regex_router_transform_msg (
+ id INT NULL,
+ col1 VARCHAR(20) NULL,
+ col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]