This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d098fb  [improve]Code optimization, delete some redundant code (#47)
3d098fb is described below

commit 3d098fb0ec75873829d498c24a73a86037122505
Author: wudongliang <[email protected]>
AuthorDate: Mon Oct 21 12:00:08 2024 +0800

    [improve]Code optimization, delete some redundant code (#47)
---
 .../doris/kafka/connector/DorisSinkTask.java       | 30 +-----------------
 .../doris/kafka/connector/cfg/DorisOptions.java    | 23 ++------------
 .../connector/service/DorisDefaultSinkService.java | 12 +++----
 .../kafka/connector/service/DorisSinkService.java  |  8 +++++
 .../kafka/connector/utils/ConfigCheckUtils.java    |  2 ++
 .../kafka/connector/utils/HttpGetWithEntity.java   | 37 ----------------------
 .../doris/kafka/connector/writer/LoadStatus.java   |  2 --
 .../doris/kafka/connector/writer/ResponseUtil.java |  2 --
 8 files changed, 20 insertions(+), 96 deletions(-)

diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index 00c034c..f793d26 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -22,10 +22,8 @@ package org.apache.doris.kafka.connector;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.service.DorisSinkService;
 import org.apache.doris.kafka.connector.service.DorisSinkServiceFactory;
-import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.utils.Version;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -38,9 +36,7 @@ import org.slf4j.LoggerFactory;
 /** DorisSinkTask implements SinkTask for Kafka Connect framework. */
 public class DorisSinkTask extends SinkTask {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSinkTask.class);
-
     private DorisSinkService sink = null;
-    private Map<String, String> topic2table = null;
 
     /** default constructor, invoked by kafka connect framework */
     public DorisSinkTask() {}
@@ -54,8 +50,6 @@ public class DorisSinkTask extends SinkTask {
     @Override
     public void start(final Map<String, String> parsedConfig) {
         LOG.info("kafka doris sink task start");
-        // generate topic to table map
-        this.topic2table = getTopicToTableMap(parsedConfig);
         this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig);
     }
 
@@ -76,10 +70,7 @@ public class DorisSinkTask extends SinkTask {
     @Override
     public void open(final Collection<TopicPartition> partitions) {
         LOG.info("kafka doris sink task open with {}", partitions.toString());
-        partitions.forEach(
-                tp ->
-                        this.sink.startTask(
-                                ConfigCheckUtils.tableName(tp.topic(), 
this.topic2table), tp));
+        partitions.forEach(tp -> this.sink.startTask(tp));
     }
 
     /**
@@ -146,23 +137,4 @@ public class DorisSinkTask extends SinkTask {
     public String version() {
         return Version.getVersion();
     }
-
-    /**
-     * parse topic to table map
-     *
-     * @param config connector config file
-     * @return result map
-     */
-    static Map<String, String> getTopicToTableMap(Map<String, String> config) {
-        if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) {
-            Map<String, String> result =
-                    ConfigCheckUtils.parseTopicToTableMap(
-                            
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
-            if (result != null) {
-                return result;
-            }
-            LOG.error("Invalid Input, Topic2Table Map disabled");
-        }
-        return new HashMap<>();
-    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index e8c1933..4262edd 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -22,7 +22,6 @@ package org.apache.doris.kafka.connector.cfg;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -88,7 +87,9 @@ public class DorisOptions {
                 
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS));
 
         this.flushTime = 
Long.parseLong(config.get(DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC));
-        this.topicMap = getTopicToTableMap(config);
+        this.topicMap =
+                ConfigCheckUtils.parseTopicToTableMap(
+                        
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
 
         this.enable2PC = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
         this.enableCustomJMX = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
@@ -292,22 +293,4 @@ public class DorisOptions {
     public boolean isEnableDelete() {
         return enableDelete;
     }
-
-    /**
-     * parse topic to table map
-     *
-     * @param config connector config file
-     * @return result map
-     */
-    static Map<String, String> getTopicToTableMap(Map<String, String> config) {
-        if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) {
-            Map<String, String> result =
-                    ConfigCheckUtils.parseTopicToTableMap(
-                            
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
-            if (result != null) {
-                return result;
-            }
-        }
-        return new HashMap<>();
-    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 1022344..9d3a2ff 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -29,7 +29,6 @@ import 
org.apache.doris.kafka.connector.connection.ConnectionProvider;
 import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
 import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
 import org.apache.doris.kafka.connector.metrics.MetricsJmxReporter;
-import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
 import org.apache.doris.kafka.connector.writer.DorisWriter;
 import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
@@ -54,7 +53,6 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
 
     private final ConnectionProvider conn;
     private final Map<String, DorisWriter> writer;
-    private final Map<String, String> topic2TableMap;
     private final DorisOptions dorisOptions;
     private final MetricsJmxReporter metricsJmxReporter;
     private final DorisConnectMonitor connectMonitor;
@@ -62,7 +60,6 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
     DorisDefaultSinkService(Map<String, String> config) {
         this.dorisOptions = new DorisOptions(config);
         this.writer = new HashMap<>();
-        this.topic2TableMap = new HashMap<>();
         this.conn = new JdbcConnectionProvider(dorisOptions);
         MetricRegistry metricRegistry = new MetricRegistry();
         this.metricsJmxReporter = new MetricsJmxReporter(metricRegistry, 
dorisOptions.getName());
@@ -73,6 +70,11 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
                         this.metricsJmxReporter);
     }
 
+    @Override
+    public void startTask(TopicPartition topicPartition) {
+        startTask(null, topicPartition);
+    }
+
     /**
      * Create new task
      *
@@ -130,9 +132,7 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         String nameIndex = getNameIndex(record.topic(), 
record.kafkaPartition());
         // init a new topic partition
         if (!writer.containsKey(nameIndex)) {
-            startTask(
-                    ConfigCheckUtils.tableName(record.topic(), 
this.topic2TableMap),
-                    new TopicPartition(record.topic(), 
record.kafkaPartition()));
+            startTask(new TopicPartition(record.topic(), 
record.kafkaPartition()));
         }
         writer.get(nameIndex).insert(record);
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
index 5ebece8..dacee81 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
@@ -27,6 +27,14 @@ import org.apache.kafka.connect.sink.SinkRecord;
 
 /** Background service of data sink, responsible to create/drop table and 
insert/delete files */
 public interface DorisSinkService {
+
+    /**
+     * Start the Task.
+     *
+     * @param topicPartition TopicPartition passed from Kafka
+     */
+    void startTask(TopicPartition topicPartition);
+
     /**
      * Start the Task. This should handle any configuration parsing and 
one-time setup of the task.
      *
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 51b8b06..e370253 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -207,6 +207,7 @@ public class ConfigCheckUtils {
      * @param topic2table topic to table map
      * @return valid table name
      */
+    @Deprecated
     public static String tableName(String topic, Map<String, String> 
topic2table) {
         return generateValidName(topic, topic2table);
     }
@@ -218,6 +219,7 @@ public class ConfigCheckUtils {
      * @param topic2table topic to table map
      * @return valid table/application name
      */
+    @Deprecated
     public static String generateValidName(String topic, Map<String, String> 
topic2table) {
         if (topic == null || topic.isEmpty()) {
             throw new DorisException("Topic name is empty String or null");
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
deleted file mode 100644
index b36a1d5..0000000
--- 
a/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.utils;
-
-import java.net.URI;
-import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
-
-public class HttpGetWithEntity extends HttpEntityEnclosingRequestBase {
-    private static final String METHOD_NAME = "GET";
-
-    @Override
-    public String getMethod() {
-        return METHOD_NAME;
-    }
-
-    public HttpGetWithEntity(final String uri) {
-        super();
-        setURI(URI.create(uri));
-    }
-}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java
index 28a0a0b..34708ef 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java
@@ -23,6 +23,4 @@ package org.apache.doris.kafka.connector.writer;
 public class LoadStatus {
     public static final String SUCCESS = "Success";
     public static final String PUBLISH_TIMEOUT = "Publish Timeout";
-    public static final String LABEL_ALREADY_EXIST = "Label Already Exists";
-    public static final String FAIL = "Fail";
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java
index 6a96b60..ad784f9 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java
@@ -23,8 +23,6 @@ import java.util.regex.Pattern;
 
 /** util for handle response. */
 public class ResponseUtil {
-    public static final Pattern LABEL_EXIST_PATTERN =
-            Pattern.compile("Label \\[(.*)\\] has already been used, relate to 
txn \\[(\\d+)\\]");
     public static final Pattern COMMITTED_PATTERN =
             Pattern.compile(
                     "transaction \\[(\\d+)\\] is already 
\\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to