This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 4ace2e6 [feature]support stream load with group commit mode (#35)
4ace2e6 is described below
commit 4ace2e65a7c0bf383c14010e7a4823e5fd5e0aaf
Author: Petrichor <[email protected]>
AuthorDate: Tue Jul 9 15:13:16 2024 +0800
[feature]support stream load with group commit mode (#35)
---
.../doris/kafka/connector/cfg/DorisOptions.java | 27 +++++++----
.../kafka/connector/utils/ConfigCheckUtils.java | 27 +++++++++++
.../kafka/connector/writer/LoadConstants.java | 4 ++
.../connector/writer/load/DorisStreamLoad.java | 23 +++++++--
.../GroupCommitMode.java} | 32 ++++++++++---
.../cfg/TestDorisSinkConnectorConfig.java | 52 ++++++++++++++++++++
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 55 ++++++++++++++++++++--
.../string_converter/group_commit_connector.json | 23 +++++++++
.../e2e/string_converter/group_commit_tab.sql | 13 +++++
9 files changed, 234 insertions(+), 22 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 69cbb80..e8c1933 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -55,8 +55,9 @@ public class DorisOptions {
private boolean autoRedirect = true;
private int requestReadTimeoutMs;
private int requestConnectTimeoutMs;
+ private boolean enableGroupCommit;
/** Properties for the StreamLoad. */
- private final Properties streamLoadProp = new Properties();
+ private final Properties streamLoadProp;
@Deprecated private String labelPrefix;
private final String databaseTimeZone;
@@ -113,25 +114,31 @@ public class DorisOptions {
this.requestReadTimeoutMs =
Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
}
- getStreamLoadPropFromConfig(config);
+ this.streamLoadProp = getStreamLoadPropFromConfig(config);
+ this.enableGroupCommit =
+ ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(),
enable2PC());
}
- private void getStreamLoadPropFromConfig(Map<String, String> config) {
- setStreamLoadDefaultValues();
+ private Properties getStreamLoadPropFromConfig(Map<String, String> config)
{
+ Properties properties = new Properties();
+ properties.putAll(getStreamLoadDefaultValues());
for (Map.Entry<String, String> entry : config.entrySet()) {
if
(entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) {
String subKey =
entry.getKey()
.substring(
DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length());
- streamLoadProp.put(subKey, entry.getValue());
+ properties.put(subKey, entry.getValue());
}
}
+ return properties;
}
- private void setStreamLoadDefaultValues() {
- streamLoadProp.setProperty("format", "json");
- streamLoadProp.setProperty("read_json_by_line", "true");
+ private Properties getStreamLoadDefaultValues() {
+ Properties properties = new Properties();
+ properties.setProperty("format", "json");
+ properties.setProperty("read_json_by_line", "true");
+ return properties;
}
public String getName() {
@@ -182,6 +189,10 @@ public class DorisOptions {
return enable2PC;
}
+ public boolean enableGroupCommit() {
+ return enableGroupCommit;
+ }
+
public Map<String, String> getTopicMap() {
return topicMap;
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index ca8ec31..51b8b06 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -19,8 +19,11 @@
package org.apache.doris.kafka.connector.utils;
+import static
org.apache.doris.kafka.connector.writer.LoadConstants.PARTIAL_COLUMNS;
+
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.converter.ConverterMode;
@@ -28,6 +31,8 @@ import
org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
import org.apache.doris.kafka.connector.exception.ArgumentsException;
import org.apache.doris.kafka.connector.exception.DorisException;
import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
+import org.apache.doris.kafka.connector.writer.LoadConstants;
+import org.apache.doris.kafka.connector.writer.load.GroupCommitMode;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -293,4 +298,26 @@ public class ConfigCheckUtils {
}
return false;
}
+
+ public static boolean validateGroupCommitMode(Properties streamLoadProp,
boolean enable2PC) {
+ if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
+ return false;
+ }
+
+ Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT);
+ String normalizedValue = value.toString().trim().toLowerCase();
+ if (!GroupCommitMode.instances().contains(normalizedValue)) {
+ throw new DorisException(
+ "The value of group commit mode is an illegal parameter,
illegal value="
+ + value);
+ } else if (enable2PC) {
+ throw new DorisException(
+ "When group commit is enabled, you should disable two
phase commit! Please set 'enable.2pc':'false'");
+ } else if (streamLoadProp.containsKey(PARTIAL_COLUMNS)
+ && streamLoadProp.get(PARTIAL_COLUMNS).equals("true")) {
+ throw new DorisException(
+ "When group commit is enabled,you can not load data with
partial column update.");
+ }
+ return true;
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
index 598cc3a..3e887eb 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
@@ -26,4 +26,8 @@ public class LoadConstants {
// Special identifier, label separator used for kafka-connect sink data
public static final String FILE_DELIM_DEFAULT = "__KC_";
+
+ // since apache doris 2.1.0, support stream load with group commit mode.
+ public static final String GROUP_COMMIT = "group_commit";
+ public static final String PARTIAL_COLUMNS = "partial_columns";
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
index b26a735..6d3f770 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
@@ -53,6 +53,7 @@ public class DorisStreamLoad extends DataLoad {
private final CloseableHttpClient httpClient = new
HttpUtils().getHttpClient();
private final BackendUtils backendUtils;
private Queue<KafkaRespContent> respContents = new LinkedList<>();
+ private final boolean enableGroupCommit;
public DorisStreamLoad(BackendUtils backendUtils, DorisOptions
dorisOptions, String topic) {
this.database = dorisOptions.getDatabase();
@@ -63,10 +64,16 @@ public class DorisStreamLoad extends DataLoad {
this.dorisOptions = dorisOptions;
this.backendUtils = backendUtils;
this.topic = topic;
+ this.enableGroupCommit = dorisOptions.enableGroupCommit();
}
/** execute stream load. */
public void load(String label, RecordBuffer buffer) throws IOException {
+
+ if (enableGroupCommit) {
+ label = null;
+ }
+
refreshLoadUrl(database, table);
String data = buffer.getData();
ByteArrayEntity entity = new
ByteArrayEntity(data.getBytes(StandardCharsets.UTF_8));
@@ -81,7 +88,12 @@ public class DorisStreamLoad extends DataLoad {
.enable2PC(dorisOptions.enable2PC())
.addProperties(dorisOptions.getStreamLoadProp());
- LOG.info("stream load started for {} on host {}", label, hostPort);
+ if (enableGroupCommit) {
+ LOG.info("stream load started with group commit on host {}",
hostPort);
+ } else {
+ LOG.info("stream load started for {} on host {}", label, hostPort);
+ }
+
try (CloseableHttpResponse response =
httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
@@ -101,10 +113,15 @@ public class DorisStreamLoad extends DataLoad {
respContent.setLastOffset(buffer.getLastOffset());
respContent.setTopic(topic);
respContents.add(respContent);
- return;
}
} catch (Exception ex) {
- String err = "failed to stream load data with label: " + label;
+ String err;
+ if (enableGroupCommit) {
+ err = "failed to stream load data with group commit";
+ } else {
+ err = "failed to stream load data with label: " + label;
+ }
+
LOG.warn(err, ex);
throw new StreamLoadException(err, ex);
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java
similarity index 57%
copy from
src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
copy to
src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java
index 598cc3a..869b940 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java
@@ -17,13 +17,31 @@
* under the License.
*/
-package org.apache.doris.kafka.connector.writer;
+package org.apache.doris.kafka.connector.writer.load;
-public class LoadConstants {
- public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
- public static final String DORIS_DEL_TRUE = "1";
- public static final String DORIS_DEL_FALSE = "0";
+import java.util.Arrays;
+import java.util.List;
- // Special identifier, label separator used for kafka-connect sink data
- public static final String FILE_DELIM_DEFAULT = "__KC_";
+public enum GroupCommitMode {
+ OFF_MODE("off_mode"),
+ SYNC_MODE("sync_mode"),
+ ASYNC_MODE("async_mode");
+
+ private final String name;
+
+ GroupCommitMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static LoadModel of(String name) {
+ return LoadModel.valueOf(name.toUpperCase());
+ }
+
+ public static List<String> instances() {
+ return Arrays.asList(OFF_MODE.name, SYNC_MODE.name, ASYNC_MODE.name);
+ }
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index eae3ec0..8f44707 100644
---
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.doris.kafka.connector.exception.DorisException;
import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.junit.Assert;
@@ -290,4 +291,55 @@ public class TestDorisSinkConnectorConfig {
Assert.assertTrue(convertConfig.containsKey(s));
}
}
+
+ @Test(expected = DorisException.class)
+ public void testGroupCommitWithIllegalParams() {
+ Map<String, String> config = getConfig();
+ config.put("sink.properties.group_commit", "sync_modes");
+ Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+ config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+ ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+ }
+
+ @Test(expected = DorisException.class)
+ public void testGroupCommitModeWithEnable2pc() {
+ Map<String, String> config = getConfig();
+ config.put("sink.properties.group_commit", "sync_mode");
+ Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+ boolean enable2pc =
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
+ ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, enable2pc);
+ }
+
+ @Test(expected = DorisException.class)
+ public void testGroupCommitWithPartialUpdate() {
+ Map<String, String> config = getConfig();
+ config.put("sink.properties.group_commit", "sync_mode");
+ config.put("sink.properties.partial_columns", "true");
+ config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+ Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+ ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+ }
+
+ @Test
+ public void testGroupCommitWithAsyncMode() {
+ Map<String, String> config = getConfig();
+ config.put("sink.properties.group_commit", "async_mode");
+ Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+ config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+ ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+ }
+
+ private Properties getStreamLoadPropFromConfig(Map<String, String> config)
{
+ Properties streamLoadProp = new Properties();
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ if
(entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) {
+ String subKey =
+ entry.getKey()
+ .substring(
+
DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length());
+ streamLoadProp.put(subKey, entry.getValue());
+ }
+ }
+ return streamLoadProp;
+ }
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index cd3f455..227a203 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -24,7 +24,11 @@ import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.exception.DorisException;
@@ -47,12 +51,10 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
public static void setUp() {
initServer();
initProducer();
- initialize();
}
- public static void initialize() {
- jsonMsgConnectorContent =
-
loadContent("src/test/resources/e2e/string_converter/string_msg_connector.json");
+ public static void initialize(String connectorPath) {
+ jsonMsgConnectorContent = loadContent(connectorPath);
JsonNode rootNode = null;
try {
rootNode = objectMapper.readTree(jsonMsgConnectorContent);
@@ -73,6 +75,8 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
@Test
public void testStringMsg() throws IOException, InterruptedException,
SQLException {
+
initialize("src/test/resources/e2e/string_converter/string_msg_connector.json");
+ Thread.sleep(5000);
String topic = "string_test";
String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
@@ -99,6 +103,49 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
Assert.assertEquals(12, age);
}
+ @Test
+ public void testGroupCommit() throws Exception {
+
+
initialize("src/test/resources/e2e/string_converter/group_commit_connector.json");
+ String topic = "group_commit_test";
+ String msg1 = "{\"id\":1,\"name\":\"kafka\",\"age\":12}";
+ String msg2 = "{\"id\":2,\"name\":\"doris\",\"age\":10}";
+
+ produceMsg2Kafka(topic, msg1);
+ produceMsg2Kafka(topic, msg2);
+ String tableSql =
+
loadContent("src/test/resources/e2e/string_converter/group_commit_tab.sql");
+ createTable(tableSql);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+ Thread.sleep(25000);
+
+ String table = dorisOptions.getTopicMapTable(topic);
+ List<String> expected = Arrays.asList("1,kafka,12", "2,doris,10");
+ String query = String.format("select id,name,age from %s.%s order by
id", database, table);
+ checkResult(expected, query, 3);
+ }
+
+ public void checkResult(List<String> expected, String query, int
columnSize) throws Exception {
+ List<String> actual = new ArrayList<>();
+
+ try (Statement statement = getJdbcConnection().createStatement()) {
+ ResultSet sinkResultSet = statement.executeQuery(query);
+ while (sinkResultSet.next()) {
+ List<String> row = new ArrayList<>();
+ for (int i = 1; i <= columnSize; i++) {
+ Object value = sinkResultSet.getObject(i);
+ if (value == null) {
+ row.add("null");
+ } else {
+ row.add(value.toString());
+ }
+ }
+ actual.add(StringUtils.join(row, ","));
+ }
+ }
+ Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+ }
+
@AfterClass
public static void closeInstance() {
kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git
a/src/test/resources/e2e/string_converter/group_commit_connector.json
b/src/test/resources/e2e/string_converter/group_commit_connector.json
new file mode 100644
index 0000000..9e9b55c
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/group_commit_connector.json
@@ -0,0 +1,23 @@
+{
+ "name":"group_commit_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"group_commit_test",
+ "tasks.max":"1",
+ "doris.topic2table.map": "group_commit_test:group_commit_tab",
+ "buffer.count.records":"2",
+ "buffer.flush.time":"120",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"group_commit",
+ "sink.properties.group_commit":"sync_mode",
+ "enable.2pc": "false",
+ "load.model":"stream_load",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/group_commit_tab.sql
b/src/test/resources/e2e/string_converter/group_commit_tab.sql
new file mode 100644
index 0000000..2ead8f1
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/group_commit_tab.sql
@@ -0,0 +1,13 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE group_commit.group_commit_tab (
+ id INT NULL,
+ name VARCHAR(100) NULL,
+ age INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change"="true"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]