This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7a0d6be [Enhancement] Optimize some code, including querying label
transaction status and others (#62)
7a0d6be is described below
commit 7a0d6be309d0fe5266e41e6f8e38b1a0266a2fd1
Author: wangchuang <[email protected]>
AuthorDate: Wed Jan 22 10:24:30 2025 +0800
[Enhancement] Optimize some code, including querying label transaction
status and others (#62)
---
.../doris/kafka/connector/DorisSinkConnector.java | 2 +-
.../connection/JdbcConnectionProvider.java | 8 +++----
.../connector/metrics/DorisConnectMonitor.java | 2 +-
.../connector/metrics/MetricsJmxReporter.java | 2 +-
.../doris/kafka/connector/service/RestService.java | 5 ++---
.../connector/utils/BackoffAndRetryUtils.java | 4 ++--
.../kafka/connector/writer/LabelGenerator.java | 26 +++++++++++++---------
.../kafka/connector/writer/StreamLoadWriter.java | 17 +++++---------
.../connector/writer/commit/DorisCommitter.java | 9 ++------
.../kafka/connector/writer/load/CopyLoad.java | 2 +-
10 files changed, 35 insertions(+), 42 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
index 0bf90bd..91347e4 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
@@ -79,7 +79,7 @@ public class DorisSinkConnector extends SinkConnector {
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; i++) {
Map<String, String> conf = new HashMap<>(config);
- conf.put(ConfigCheckUtils.TASK_ID, i + "");
+ conf.put(ConfigCheckUtils.TASK_ID, String.valueOf(i));
taskConfigs.add(conf);
}
return taskConfigs;
diff --git
a/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
b/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
index 57a83fe..47da2d9 100644
---
a/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
+++
b/src/main/java/org/apache/doris/kafka/connector/connection/JdbcConnectionProvider.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
public class JdbcConnectionProvider implements ConnectionProvider,
Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcConnectionProvider.class);
- protected final String driverName = "com.mysql.jdbc.Driver";
- protected final String cjDriverName = "com.mysql.cj.jdbc.Driver";
+ protected static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
+ protected static final String CJ_DRIVER_NAME = "com.mysql.cj.jdbc.Driver";
private static final String JDBC_URL_TEMPLATE = "jdbc:mysql://%s";
private static final long serialVersionUID = 1L;
@@ -51,11 +51,11 @@ public class JdbcConnectionProvider implements
ConnectionProvider, Serializable
return connection;
}
try {
- Class.forName(cjDriverName);
+ Class.forName(CJ_DRIVER_NAME);
} catch (ClassNotFoundException ex) {
LOG.warn(
"can not found class com.mysql.cj.jdbc.Driver, use class
com.mysql.jdbc.Driver");
- Class.forName(driverName);
+ Class.forName(DRIVER_NAME);
}
String jdbcUrl = String.format(JDBC_URL_TEMPLATE,
options.getQueryUrl());
if (!Objects.isNull(options.getUser())) {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
index c3b28a2..0fe7d00 100644
---
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
+++
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
@@ -81,7 +81,7 @@ public class DorisConnectMonitor {
// partition reassignment
LOG.debug(
"Registering metrics existing:{}",
-
metricsJmxReporter.getMetricRegistry().getMetrics().keySet().toString());
+ metricsJmxReporter.getMetricRegistry().getMetrics().keySet());
metricsJmxReporter.removeMetricsFromRegistry(String.valueOf(taskId));
try {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
b/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
index dc9da31..61fe552 100644
---
a/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
+++
b/src/main/java/org/apache/doris/kafka/connector/metrics/MetricsJmxReporter.java
@@ -89,7 +89,7 @@ public class MetricsJmxReporter {
"Metric registry:{}, size is:{}, names:{}",
prefixFilter,
metricRegistry.getMetrics().size(),
- metricRegistry.getMetrics().keySet().toString());
+ metricRegistry.getMetrics().keySet());
}
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
index 7662aaa..45942f4 100644
--- a/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
+++ b/src/main/java/org/apache/doris/kafka/connector/service/RestService.java
@@ -85,8 +85,7 @@ public class RestService {
HttpGet httpGet = new HttpGet(beUrl);
String response = send(options, httpGet, logger);
logger.info("Backend Info:{}", response);
- List<BackendV2.BackendRowV2> backends =
parseBackendV2(response, logger);
- return backends;
+ return parseBackendV2(response, logger);
} catch (ConnectedFailedException e) {
logger.info(
"Doris FE node {} is unavailable: {}, Request the next
Doris FE node",
@@ -313,7 +312,7 @@ public class RestService {
/** Get table schema from doris. */
public static Schema getSchema(
DorisOptions dorisOptions, String db, String table, Logger logger)
{
- logger.trace("start get " + db + "." + table + " schema from doris.");
+ logger.trace("start get {}.{} schema from doris.", db, table);
Object responseData = null;
try {
String tableSchemaUri =
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
index fe51b52..035b70e 100644
---
a/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
+++
b/src/main/java/org/apache/doris/kafka/connector/utils/BackoffAndRetryUtils.java
@@ -32,7 +32,7 @@ public class BackoffAndRetryUtils {
private static final int[] backoffSec = {0, 1, 2, 4};
/** Interfaces to define the lambda function to be used by backoffAndRetry
*/
- public interface backoffFunction {
+ public interface BackoffFunction {
Object apply() throws Exception;
}
@@ -45,7 +45,7 @@ public class BackoffAndRetryUtils {
* @throws Exception if the runnable function throws exception
*/
public static Object backoffAndRetry(
- final LoadOperation operation, final backoffFunction runnable)
throws Exception {
+ final LoadOperation operation, final BackoffFunction runnable)
throws Exception {
for (final int iteration : backoffSec) {
if (iteration != 0) {
Thread.sleep(iteration * 1000L);
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
index daca8b1..0d25c6f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
@@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicLong;
/** Generator label for stream load. */
public class LabelGenerator {
- private String topic;
- private int partition;
- private String tableIdentifier;
+ private final String topic;
+ private final int partition;
+ private final String tableIdentifier;
// The label of doris stream load cannot be repeated when loading.
// Under special circumstances (usually load failure) when
doris-kafka-connector is started,
// stream load is performed at the same offset every time, which will
cause label duplication.
@@ -35,8 +35,8 @@ public class LabelGenerator {
public LabelGenerator(String topic, int partition, String tableIdentifier)
{
// The label of stream load can not contain `.`
- this.tableIdentifier = tableIdentifier.replaceAll("\\.", "_");
- this.topic = topic.replaceAll("\\.", "_");
+ this.tableIdentifier = tableIdentifier.replace(".", "_");
+ this.topic = topic.replace(".", "_");
this.partition = partition;
Random random = new Random();
labelRandomSuffix = new AtomicLong(random.nextInt(1000));
@@ -44,15 +44,21 @@ public class LabelGenerator {
public String generateLabel(long lastOffset) {
StringBuilder sb = new StringBuilder();
- sb.append(topic)
- .append(LoadConstants.FILE_DELIM_DEFAULT)
- .append(partition)
- .append(LoadConstants.FILE_DELIM_DEFAULT)
- .append(tableIdentifier)
+ sb.append(this.buildLabelPrefix())
.append(LoadConstants.FILE_DELIM_DEFAULT)
.append(lastOffset)
.append(LoadConstants.FILE_DELIM_DEFAULT)
.append(labelRandomSuffix.getAndIncrement());
return sb.toString();
}
+
+ public String buildLabelPrefix() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(topic)
+ .append(LoadConstants.FILE_DELIM_DEFAULT)
+ .append(partition)
+ .append(LoadConstants.FILE_DELIM_DEFAULT)
+ .append(tableIdentifier);
+ return sb.toString();
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index 0144a7f..67d63e3 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -49,7 +49,7 @@ public class StreamLoadWriter extends DorisWriter {
private static final Logger LOG =
LoggerFactory.getLogger(StreamLoadWriter.class);
private static final String TRANSACTION_LABEL_PATTEN =
- "SHOW TRANSACTION FROM %s WHERE LABEL LIKE '";
+ "SHOW TRANSACTION FROM %s WHERE LABEL LIKE '%s%%'";
private List<DorisCommittable> committableList = new LinkedList<>();
private final LabelGenerator labelGenerator;
private final DorisCommitter dorisCommitter;
@@ -110,18 +110,11 @@ public class StreamLoadWriter extends DorisWriter {
*/
@VisibleForTesting
public Map<String, String> fetchLabel2Status() {
- String queryPatten = String.format(TRANSACTION_LABEL_PATTEN,
dorisOptions.getDatabase());
- String tmpTableIdentifier = tableIdentifier.replaceAll("\\.", "_");
- String tmpTopic = topic.replaceAll("\\.", "_");
String querySQL =
- queryPatten
- + tmpTopic
- + LoadConstants.FILE_DELIM_DEFAULT
- + partition
- + LoadConstants.FILE_DELIM_DEFAULT
- + tmpTableIdentifier
- + LoadConstants.FILE_DELIM_DEFAULT
- + "%'";
+ String.format(
+ TRANSACTION_LABEL_PATTEN,
+ dorisOptions.getDatabase(),
+ labelGenerator.buildLabelPrefix());
LOG.info("query doris offset by sql: {}", querySQL);
Map<String, String> label2Status = new HashMap<>();
try (Connection connection =
connectionProvider.getOrEstablishConnection();
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
index 46d9a65..d9c0a7e 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
@@ -21,7 +21,6 @@ package org.apache.doris.kafka.connector.writer.commit;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,15 +60,11 @@ public class DorisCommitter {
return;
}
for (DorisCommittable dorisCommittable : dorisCommittables) {
- try {
- commitTransaction(dorisCommittable);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ commitTransaction(dorisCommittable);
}
}
- private void commitTransaction(DorisCommittable committable) throws
IOException {
+ private void commitTransaction(DorisCommittable committable) {
// basic params
HttpPutBuilder builder =
new HttpPutBuilder()
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java
index a0e4596..a1228df 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java
@@ -134,7 +134,7 @@ public class CopyLoad extends DataLoad {
return true;
}
}
- LOG.error("commit failed, cause by: " +
loadResult);
+ LOG.error("commit failed, cause by: {}",
loadResult);
throw new CopyLoadException("commit failed, cause
by: " + loadResult);
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]