This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b18df3e [Improve]Optimize and delete some redundant code (#30)
b18df3e is described below
commit b18df3e76932a8ab7bdda1a4e746727480cd64f5
Author: wudongliang <[email protected]>
AuthorDate: Fri Jun 21 17:56:10 2024 +0800
[Improve]Optimize and delete some redundant code (#30)
---
.../doris/kafka/connector/DorisSinkConnector.java | 1 -
.../doris/kafka/connector/DorisSinkTask.java | 4 +-
.../doris/kafka/connector/cfg/DorisOptions.java | 54 ----------------------
.../kafka/connector/converter/RecordService.java | 5 --
.../doris/kafka/connector/utils/BackendUtils.java | 25 ----------
.../kafka/connector/utils/ConfigCheckUtils.java | 10 ----
.../doris/kafka/connector/utils/FileNameUtils.java | 11 +----
.../kafka/connector/writer/CopyIntoWriter.java | 2 +-
.../doris/kafka/connector/writer/DorisWriter.java | 1 -
.../kafka/connector/writer/LoadConstants.java | 1 -
.../connector/converter/TestRecordService.java | 7 ---
11 files changed, 4 insertions(+), 117 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
index bd1fe20..0bf90bd 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
@@ -38,7 +38,6 @@ public class DorisSinkConnector extends SinkConnector {
private static final Logger LOG =
LoggerFactory.getLogger(DorisSinkConnector.class);
private Map<String, String> config;
- private String connectorName;
/** No-Arg constructor. Required by Kafka Connect framework */
public DorisSinkConnector() {}
diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index 56faf7e..00c034c 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -119,12 +119,12 @@ public class DorisSinkTask extends SinkTask {
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets) throws
RetriableException {
-
- sink.commit(offsets);
// return an empty map means that offset commitment is not desired
if (sink == null || sink.getPartitionCount() == 0) {
return new HashMap<>();
}
+
+ sink.commit(offsets);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new
HashMap<>();
// it's ok to just log the error since commit can retry
try {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index d5eaaf2..69cbb80 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -299,58 +299,4 @@ public class DorisOptions {
}
return new HashMap<>();
}
-
- @Override
- public String toString() {
- return "DorisOptions{"
- + "name='"
- + name
- + '\''
- + ", urls='"
- + urls
- + '\''
- + ", queryPort="
- + queryPort
- + ", httpPort="
- + httpPort
- + ", user='"
- + user
- + '\''
- + ", password='"
- + password
- + '\''
- + ", database='"
- + database
- + '\''
- + ", topicMap="
- + topicMap
- + ", fileSize="
- + fileSize
- + ", recordNum="
- + recordNum
- + ", flushTime="
- + flushTime
- + ", enableCustomJMX="
- + enableCustomJMX
- + ", taskId="
- + taskId
- + ", enableDelete="
- + enableDelete
- + ", autoRedirect="
- + autoRedirect
- + ", requestReadTimeoutMs="
- + requestReadTimeoutMs
- + ", requestConnectTimeoutMs="
- + requestConnectTimeoutMs
- + ", streamLoadProp="
- + streamLoadProp
- + ", labelPrefix='"
- + labelPrefix
- + '\''
- + ", loadModel="
- + loadModel
- + ", deliveryGuarantee="
- + deliveryGuarantee
- + '}';
- }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index 7c75139..4d1ef51 100644
---
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -262,11 +262,6 @@ public class RecordService {
}
}
- /** If not struct, map, list, use the default string */
- public String processStringRecord(SinkRecord record) {
- return record.value().toString();
- }
-
private String parseFieldValues(
RecordDescriptor record, Struct source, List<String> fields,
boolean isDelete) {
Map<String, Object> filedMapping = new LinkedHashMap<>();
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java
index a2891d9..c078b02 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java
@@ -21,8 +21,6 @@ package org.apache.doris.kafka.connector.utils;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.exception.DorisException;
@@ -41,29 +39,6 @@ public class BackendUtils {
this.pos = 0;
}
- public BackendUtils(String beNodes) {
- this.backends = initBackends(beNodes);
- this.pos = 0;
- }
-
- private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
- List<BackendV2.BackendRowV2> backends = new ArrayList<>();
- List<String> nodes = Arrays.asList(beNodes.split(","));
- nodes.forEach(
- node -> {
- if (tryHttpConnection(node)) {
- node = node.trim();
- String[] ipAndPort = node.split(":");
- BackendV2.BackendRowV2 backendRowV2 = new
BackendV2.BackendRowV2();
- backendRowV2.setIp(ipAndPort[0]);
-
backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
- backendRowV2.setAlive(true);
- backends.add(backendRowV2);
- }
- });
- return backends;
- }
-
public static BackendUtils getInstance(DorisOptions dorisOptions, Logger
logger) {
return new BackendUtils(RestService.getBackendsV2(dorisOptions,
logger));
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 3a6e583..ca8ec31 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -29,8 +29,6 @@ import
org.apache.doris.kafka.connector.exception.ArgumentsException;
import org.apache.doris.kafka.connector.exception.DorisException;
import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
-import org.apache.kafka.common.config.Config;
-import org.apache.kafka.common.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,14 +267,6 @@ public class ConfigCheckUtils {
return topic2Table;
}
- public static Map<String, ConfigValue> validateConfigToMap(final Config
result) {
- Map<String, ConfigValue> validateMap = new HashMap<>();
- for (ConfigValue v : result.configValues()) {
- validateMap.put(v.name(), v);
- }
- return validateMap;
- }
-
private static boolean isNumeric(String str) {
if (str != null && !str.isEmpty()) {
Pattern pattern = Pattern.compile("[0-9]*");
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
index 8c00f43..39886db 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
@@ -65,19 +65,10 @@ public class FileNameUtils {
*/
public static boolean verifyFileName(
String name, String topic, int partition, String fileName) {
- String prefix = getFilePrefix(name, topic, partition);
+ String prefix = filePrefix(name, topic, partition);
return fileName.startsWith(prefix);
}
- public static String getFilePrefix(String name, String topic, int
partition) {
- return name
- + LoadConstants.FILE_DELIM_DEFAULT
- + topic
- + LoadConstants.FILE_DELIM_DEFAULT
- + partition
- + LoadConstants.FILE_DELIM_DEFAULT;
- }
-
/**
* read end offset from file name
*
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
index 5018b27..8a09d7b 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
@@ -78,7 +78,7 @@ public class CopyIntoWriter extends DorisWriter {
final String SQL_TEMPLATE =
"SHOW COPY FROM %s WHERE TABLENAME = '%s' AND STATE =
'FINISHED' AND FILES LIKE '%%%s%%' ORDER BY CREATETIME DESC LIMIT 100";
final String filePrefix =
- FileNameUtils.getFilePrefix(dorisOptions.getName(), topic,
partition);
+ FileNameUtils.filePrefix(dorisOptions.getName(), topic,
partition);
String offsetQuery = String.format(SQL_TEMPLATE, dbName, tableName,
filePrefix);
LOG.info("query offset by sql: {}", offsetQuery);
List<String> loadFileList = new ArrayList<>();
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 0ace5a6..31938d9 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -58,7 +58,6 @@ public abstract class DorisWriter {
protected RecordService recordService;
protected int taskId;
protected final DorisConnectMonitor connectMonitor;
- protected boolean schemaChange;
public DorisWriter(
String topic,
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
index 8c939d2..598cc3a 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
@@ -21,7 +21,6 @@ package org.apache.doris.kafka.connector.writer;
public class LoadConstants {
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
- public static final String DELETE_KET = "__deleted";
public static final String DORIS_DEL_TRUE = "1";
public static final String DORIS_DEL_FALSE = "0";
diff --git
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 08e76ba..d9688da 100644
---
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -315,13 +315,6 @@ public class TestRecordService {
new ObjectMapper().writeValueAsString(objectNode));
}
- @Test
- public void processStringRecord() {
- SinkRecord record = TestRecordBuffer.newSinkRecord("doris", 1);
- String s = recordService.processStringRecord(record);
- Assert.assertEquals("doris", s);
- }
-
@After
public void close() {
mockRestService.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]