This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3d098fb [improve]Code optimization, delete some redundant code (#47)
3d098fb is described below
commit 3d098fb0ec75873829d498c24a73a86037122505
Author: wudongliang <[email protected]>
AuthorDate: Mon Oct 21 12:00:08 2024 +0800
[improve]Code optimization, delete some redundant code (#47)
---
.../doris/kafka/connector/DorisSinkTask.java | 30 +-----------------
.../doris/kafka/connector/cfg/DorisOptions.java | 23 ++------------
.../connector/service/DorisDefaultSinkService.java | 12 +++----
.../kafka/connector/service/DorisSinkService.java | 8 +++++
.../kafka/connector/utils/ConfigCheckUtils.java | 2 ++
.../kafka/connector/utils/HttpGetWithEntity.java | 37 ----------------------
.../doris/kafka/connector/writer/LoadStatus.java | 2 --
.../doris/kafka/connector/writer/ResponseUtil.java | 2 --
8 files changed, 20 insertions(+), 96 deletions(-)
diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index 00c034c..f793d26 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -22,10 +22,8 @@ package org.apache.doris.kafka.connector;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.service.DorisSinkService;
import org.apache.doris.kafka.connector.service.DorisSinkServiceFactory;
-import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.utils.Version;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -38,9 +36,7 @@ import org.slf4j.LoggerFactory;
/** DorisSinkTask implements SinkTask for Kafka Connect framework. */
public class DorisSinkTask extends SinkTask {
private static final Logger LOG =
LoggerFactory.getLogger(DorisSinkTask.class);
-
private DorisSinkService sink = null;
- private Map<String, String> topic2table = null;
/** default constructor, invoked by kafka connect framework */
public DorisSinkTask() {}
@@ -54,8 +50,6 @@ public class DorisSinkTask extends SinkTask {
@Override
public void start(final Map<String, String> parsedConfig) {
LOG.info("kafka doris sink task start");
- // generate topic to table map
- this.topic2table = getTopicToTableMap(parsedConfig);
this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig);
}
@@ -76,10 +70,7 @@ public class DorisSinkTask extends SinkTask {
@Override
public void open(final Collection<TopicPartition> partitions) {
LOG.info("kafka doris sink task open with {}", partitions.toString());
- partitions.forEach(
- tp ->
- this.sink.startTask(
- ConfigCheckUtils.tableName(tp.topic(),
this.topic2table), tp));
+ partitions.forEach(tp -> this.sink.startTask(tp));
}
/**
@@ -146,23 +137,4 @@ public class DorisSinkTask extends SinkTask {
public String version() {
return Version.getVersion();
}
-
- /**
- * parse topic to table map
- *
- * @param config connector config file
- * @return result map
- */
- static Map<String, String> getTopicToTableMap(Map<String, String> config) {
- if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) {
- Map<String, String> result =
- ConfigCheckUtils.parseTopicToTableMap(
-
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
- if (result != null) {
- return result;
- }
- LOG.error("Invalid Input, Topic2Table Map disabled");
- }
- return new HashMap<>();
- }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index e8c1933..4262edd 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -22,7 +22,6 @@ package org.apache.doris.kafka.connector.cfg;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -88,7 +87,9 @@ public class DorisOptions {
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS));
this.flushTime =
Long.parseLong(config.get(DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC));
- this.topicMap = getTopicToTableMap(config);
+ this.topicMap =
+ ConfigCheckUtils.parseTopicToTableMap(
+
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
this.enable2PC =
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
this.enableCustomJMX =
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
@@ -292,22 +293,4 @@ public class DorisOptions {
public boolean isEnableDelete() {
return enableDelete;
}
-
- /**
- * parse topic to table map
- *
- * @param config connector config file
- * @return result map
- */
- static Map<String, String> getTopicToTableMap(Map<String, String> config) {
- if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) {
- Map<String, String> result =
- ConfigCheckUtils.parseTopicToTableMap(
-
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
- if (result != null) {
- return result;
- }
- }
- return new HashMap<>();
- }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 1022344..9d3a2ff 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -29,7 +29,6 @@ import
org.apache.doris.kafka.connector.connection.ConnectionProvider;
import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.metrics.MetricsJmxReporter;
-import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
import org.apache.doris.kafka.connector.writer.DorisWriter;
import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
@@ -54,7 +53,6 @@ public class DorisDefaultSinkService implements
DorisSinkService {
private final ConnectionProvider conn;
private final Map<String, DorisWriter> writer;
- private final Map<String, String> topic2TableMap;
private final DorisOptions dorisOptions;
private final MetricsJmxReporter metricsJmxReporter;
private final DorisConnectMonitor connectMonitor;
@@ -62,7 +60,6 @@ public class DorisDefaultSinkService implements
DorisSinkService {
DorisDefaultSinkService(Map<String, String> config) {
this.dorisOptions = new DorisOptions(config);
this.writer = new HashMap<>();
- this.topic2TableMap = new HashMap<>();
this.conn = new JdbcConnectionProvider(dorisOptions);
MetricRegistry metricRegistry = new MetricRegistry();
this.metricsJmxReporter = new MetricsJmxReporter(metricRegistry,
dorisOptions.getName());
@@ -73,6 +70,11 @@ public class DorisDefaultSinkService implements
DorisSinkService {
this.metricsJmxReporter);
}
+ @Override
+ public void startTask(TopicPartition topicPartition) {
+ startTask(null, topicPartition);
+ }
+
/**
* Create new task
*
@@ -130,9 +132,7 @@ public class DorisDefaultSinkService implements
DorisSinkService {
String nameIndex = getNameIndex(record.topic(),
record.kafkaPartition());
// init a new topic partition
if (!writer.containsKey(nameIndex)) {
- startTask(
- ConfigCheckUtils.tableName(record.topic(),
this.topic2TableMap),
- new TopicPartition(record.topic(),
record.kafkaPartition()));
+ startTask(new TopicPartition(record.topic(),
record.kafkaPartition()));
}
writer.get(nameIndex).insert(record);
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
index 5ebece8..dacee81 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java
@@ -27,6 +27,14 @@ import org.apache.kafka.connect.sink.SinkRecord;
/** Background service of data sink, responsible to create/drop table and
insert/delete files */
public interface DorisSinkService {
+
+ /**
+ * Start the Task.
+ *
+ * @param topicPartition TopicPartition passed from Kafka
+ */
+ void startTask(TopicPartition topicPartition);
+
/**
* Start the Task. This should handle any configuration parsing and
one-time setup of the task.
*
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 51b8b06..e370253 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -207,6 +207,7 @@ public class ConfigCheckUtils {
* @param topic2table topic to table map
* @return valid table name
*/
+ @Deprecated
public static String tableName(String topic, Map<String, String>
topic2table) {
return generateValidName(topic, topic2table);
}
@@ -218,6 +219,7 @@ public class ConfigCheckUtils {
* @param topic2table topic to table map
* @return valid table/application name
*/
+ @Deprecated
public static String generateValidName(String topic, Map<String, String>
topic2table) {
if (topic == null || topic.isEmpty()) {
throw new DorisException("Topic name is empty String or null");
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
b/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
deleted file mode 100644
index b36a1d5..0000000
---
a/src/main/java/org/apache/doris/kafka/connector/utils/HttpGetWithEntity.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.utils;
-
-import java.net.URI;
-import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
-
-public class HttpGetWithEntity extends HttpEntityEnclosingRequestBase {
- private static final String METHOD_NAME = "GET";
-
- @Override
- public String getMethod() {
- return METHOD_NAME;
- }
-
- public HttpGetWithEntity(final String uri) {
- super();
- setURI(URI.create(uri));
- }
-}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java
b/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java
index 28a0a0b..34708ef 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LoadStatus.java
@@ -23,6 +23,4 @@ package org.apache.doris.kafka.connector.writer;
public class LoadStatus {
public static final String SUCCESS = "Success";
public static final String PUBLISH_TIMEOUT = "Publish Timeout";
- public static final String LABEL_ALREADY_EXIST = "Label Already Exists";
- public static final String FAIL = "Fail";
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java
b/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java
index 6a96b60..ad784f9 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/ResponseUtil.java
@@ -23,8 +23,6 @@ import java.util.regex.Pattern;
/** util for handle response. */
public class ResponseUtil {
- public static final Pattern LABEL_EXIST_PATTERN =
- Pattern.compile("Label \\[(.*)\\] has already been used, relate to
txn \\[(\\d+)\\]");
public static final Pattern COMMITTED_PATTERN =
Pattern.compile(
"transaction \\[(\\d+)\\] is already
\\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]