This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b18df3e  [Improve]Optimize and delete some redundant code (#30)
b18df3e is described below

commit b18df3e76932a8ab7bdda1a4e746727480cd64f5
Author: wudongliang <[email protected]>
AuthorDate: Fri Jun 21 17:56:10 2024 +0800

    [Improve]Optimize and delete some redundant code (#30)
---
 .../doris/kafka/connector/DorisSinkConnector.java  |  1 -
 .../doris/kafka/connector/DorisSinkTask.java       |  4 +-
 .../doris/kafka/connector/cfg/DorisOptions.java    | 54 ----------------------
 .../kafka/connector/converter/RecordService.java   |  5 --
 .../doris/kafka/connector/utils/BackendUtils.java  | 25 ----------
 .../kafka/connector/utils/ConfigCheckUtils.java    | 10 ----
 .../doris/kafka/connector/utils/FileNameUtils.java | 11 +----
 .../kafka/connector/writer/CopyIntoWriter.java     |  2 +-
 .../doris/kafka/connector/writer/DorisWriter.java  |  1 -
 .../kafka/connector/writer/LoadConstants.java      |  1 -
 .../connector/converter/TestRecordService.java     |  7 ---
 11 files changed, 4 insertions(+), 117 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
index bd1fe20..0bf90bd 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
@@ -38,7 +38,6 @@ public class DorisSinkConnector extends SinkConnector {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSinkConnector.class);
 
     private Map<String, String> config;
-    private String connectorName;
 
     /** No-Arg constructor. Required by Kafka Connect framework */
     public DorisSinkConnector() {}
diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index 56faf7e..00c034c 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -119,12 +119,12 @@ public class DorisSinkTask extends SinkTask {
     @Override
     public Map<TopicPartition, OffsetAndMetadata> preCommit(
             Map<TopicPartition, OffsetAndMetadata> offsets) throws 
RetriableException {
-
-        sink.commit(offsets);
         // return an empty map means that offset commitment is not desired
         if (sink == null || sink.getPartitionCount() == 0) {
             return new HashMap<>();
         }
+
+        sink.commit(offsets);
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = new 
HashMap<>();
         // it's ok to just log the error since commit can retry
         try {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index d5eaaf2..69cbb80 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -299,58 +299,4 @@ public class DorisOptions {
         }
         return new HashMap<>();
     }
-
-    @Override
-    public String toString() {
-        return "DorisOptions{"
-                + "name='"
-                + name
-                + '\''
-                + ", urls='"
-                + urls
-                + '\''
-                + ", queryPort="
-                + queryPort
-                + ", httpPort="
-                + httpPort
-                + ", user='"
-                + user
-                + '\''
-                + ", password='"
-                + password
-                + '\''
-                + ", database='"
-                + database
-                + '\''
-                + ", topicMap="
-                + topicMap
-                + ", fileSize="
-                + fileSize
-                + ", recordNum="
-                + recordNum
-                + ", flushTime="
-                + flushTime
-                + ", enableCustomJMX="
-                + enableCustomJMX
-                + ", taskId="
-                + taskId
-                + ", enableDelete="
-                + enableDelete
-                + ", autoRedirect="
-                + autoRedirect
-                + ", requestReadTimeoutMs="
-                + requestReadTimeoutMs
-                + ", requestConnectTimeoutMs="
-                + requestConnectTimeoutMs
-                + ", streamLoadProp="
-                + streamLoadProp
-                + ", labelPrefix='"
-                + labelPrefix
-                + '\''
-                + ", loadModel="
-                + loadModel
-                + ", deliveryGuarantee="
-                + deliveryGuarantee
-                + '}';
-    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index 7c75139..4d1ef51 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -262,11 +262,6 @@ public class RecordService {
         }
     }
 
-    /** If not struct, map, list, use the default string */
-    public String processStringRecord(SinkRecord record) {
-        return record.value().toString();
-    }
-
     private String parseFieldValues(
             RecordDescriptor record, Struct source, List<String> fields, 
boolean isDelete) {
         Map<String, Object> filedMapping = new LinkedHashMap<>();
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java
index a2891d9..c078b02 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java
@@ -21,8 +21,6 @@ package org.apache.doris.kafka.connector.utils;
 
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.exception.DorisException;
@@ -41,29 +39,6 @@ public class BackendUtils {
         this.pos = 0;
     }
 
-    public BackendUtils(String beNodes) {
-        this.backends = initBackends(beNodes);
-        this.pos = 0;
-    }
-
-    private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
-        List<BackendV2.BackendRowV2> backends = new ArrayList<>();
-        List<String> nodes = Arrays.asList(beNodes.split(","));
-        nodes.forEach(
-                node -> {
-                    if (tryHttpConnection(node)) {
-                        node = node.trim();
-                        String[] ipAndPort = node.split(":");
-                        BackendV2.BackendRowV2 backendRowV2 = new 
BackendV2.BackendRowV2();
-                        backendRowV2.setIp(ipAndPort[0]);
-                        
backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
-                        backendRowV2.setAlive(true);
-                        backends.add(backendRowV2);
-                    }
-                });
-        return backends;
-    }
-
     public static BackendUtils getInstance(DorisOptions dorisOptions, Logger 
logger) {
         return new BackendUtils(RestService.getBackendsV2(dorisOptions, 
logger));
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 3a6e583..ca8ec31 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -29,8 +29,6 @@ import 
org.apache.doris.kafka.connector.exception.ArgumentsException;
 import org.apache.doris.kafka.connector.exception.DorisException;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
-import org.apache.kafka.common.config.Config;
-import org.apache.kafka.common.config.ConfigValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -269,14 +267,6 @@ public class ConfigCheckUtils {
         return topic2Table;
     }
 
-    public static Map<String, ConfigValue> validateConfigToMap(final Config 
result) {
-        Map<String, ConfigValue> validateMap = new HashMap<>();
-        for (ConfigValue v : result.configValues()) {
-            validateMap.put(v.name(), v);
-        }
-        return validateMap;
-    }
-
     private static boolean isNumeric(String str) {
         if (str != null && !str.isEmpty()) {
             Pattern pattern = Pattern.compile("[0-9]*");
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
index 8c00f43..39886db 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
@@ -65,19 +65,10 @@ public class FileNameUtils {
      */
     public static boolean verifyFileName(
             String name, String topic, int partition, String fileName) {
-        String prefix = getFilePrefix(name, topic, partition);
+        String prefix = filePrefix(name, topic, partition);
         return fileName.startsWith(prefix);
     }
 
-    public static String getFilePrefix(String name, String topic, int 
partition) {
-        return name
-                + LoadConstants.FILE_DELIM_DEFAULT
-                + topic
-                + LoadConstants.FILE_DELIM_DEFAULT
-                + partition
-                + LoadConstants.FILE_DELIM_DEFAULT;
-    }
-
     /**
      * read end offset from file name
      *
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
index 5018b27..8a09d7b 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java
@@ -78,7 +78,7 @@ public class CopyIntoWriter extends DorisWriter {
         final String SQL_TEMPLATE =
                 "SHOW COPY FROM %s WHERE TABLENAME = '%s' AND STATE = 
'FINISHED' AND FILES LIKE '%%%s%%' ORDER BY CREATETIME DESC LIMIT 100";
         final String filePrefix =
-                FileNameUtils.getFilePrefix(dorisOptions.getName(), topic, 
partition);
+                FileNameUtils.filePrefix(dorisOptions.getName(), topic, 
partition);
         String offsetQuery = String.format(SQL_TEMPLATE, dbName, tableName, 
filePrefix);
         LOG.info("query offset by sql: {}", offsetQuery);
         List<String> loadFileList = new ArrayList<>();
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 0ace5a6..31938d9 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -58,7 +58,6 @@ public abstract class DorisWriter {
     protected RecordService recordService;
     protected int taskId;
     protected final DorisConnectMonitor connectMonitor;
-    protected boolean schemaChange;
 
     public DorisWriter(
             String topic,
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
index 8c939d2..598cc3a 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
@@ -21,7 +21,6 @@ package org.apache.doris.kafka.connector.writer;
 
 public class LoadConstants {
     public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
-    public static final String DELETE_KET = "__deleted";
     public static final String DORIS_DEL_TRUE = "1";
     public static final String DORIS_DEL_FALSE = "0";
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 08e76ba..d9688da 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -315,13 +315,6 @@ public class TestRecordService {
                 new ObjectMapper().writeValueAsString(objectNode));
     }
 
-    @Test
-    public void processStringRecord() {
-        SinkRecord record = TestRecordBuffer.newSinkRecord("doris", 1);
-        String s = recordService.processStringRecord(record);
-        Assert.assertEquals("doris", s);
-    }
-
     @After
     public void close() {
         mockRestService.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to