This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b0734fe2 [INLONG-5224][Agent][DataProxy] Remove unused 
StreamConfigLog related classes (#5230)
0b0734fe2 is described below

commit 0b0734fe2a6f86a452beefdbf7974b5005f9a712
Author: healchow <[email protected]>
AuthorDate: Thu Jul 28 11:10:37 2022 +0800

    [INLONG-5224][Agent][DataProxy] Remove unused StreamConfigLog related 
classes (#5230)
---
 .../org/apache/inlong/agent/plugin/Reader.java     |   2 +-
 .../apache/inlong/agent/task/TestTaskWrapper.java  |   2 +-
 .../agent/plugin/sources/TextFileSource.java       |   6 +-
 .../plugin/sources/reader/AbstractReader.java      |   3 +-
 .../agent/plugin/sources/reader/BinlogReader.java  |  73 ++---
 .../agent/plugin/sources/reader/KafkaReader.java   |   2 +-
 .../agent/plugin/sources/reader/SqlReader.java     |   2 +-
 .../plugin/sources/reader/TextFileReader.java      |   2 +-
 .../inlong/common/reporpter/AbstractReporter.java  | 191 -------------
 .../inlong/common/reporpter/ConfigLogTypeEnum.java |  47 ----
 .../apache/inlong/common/reporpter/Response.java   |  29 --
 .../common/reporpter/StreamConfigLogMetric.java    | 140 ----------
 .../common/reporpter/StreamConfigLogReporter.java  |  40 ---
 .../common/reporpter/dto/StreamConfigLogInfo.java  |  45 ----
 .../common/metric/reporter/ReporterTest.java       |  45 ----
 inlong-dataproxy/conf/common.properties            |   5 -
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 295 ++++++++-------------
 .../dataproxy/sink/pulsar/PulsarClientService.java |  87 ++----
 18 files changed, 160 insertions(+), 856 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
index 55b768c72..3b5ec83e0 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
@@ -49,7 +49,7 @@ public interface Reader extends Stage {
      * set wait milliseconds when tailing a file
      * to solve while loop cause too much cpu usage
      */
-    void setWaitMillisecs(long millis);
+    void setWaitMillisecond(long millis);
 
     /**
      * get snapshot of the reader
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 1aad8444a..438db7fc3 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -138,7 +138,7 @@ public class TestTaskWrapper {
         }
 
         @Override
-        public void setWaitMillisecs(long millis) {
+        public void setWaitMillisecond(long millis) {
 
         }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index d1258685e..215d599f6 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -30,7 +30,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
@@ -48,10 +47,8 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOU
 public class TextFileSource implements Source {
 
     // path + suffix
-    public static final String MD5_SUFFIX = ".md5";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TextFileSource.class);
     private static final String TEXT_FILE_SOURCE_TAG_NAME = 
"AgentTextFileSourceMetric";
-    private static AtomicLong metricsIndex = new AtomicLong(0);
 
     public TextFileSource() {
     }
@@ -68,10 +65,9 @@ public class TextFileSource implements Source {
             int seekPosition = jobConf.getInt(file.getAbsolutePath() + 
POSITION_SUFFIX, 0);
             LOGGER.info("read from history position {} with job profile {}, 
file absolute path: {}", seekPosition,
                     jobConf.getInstanceId(), file.getAbsolutePath());
-            String md5 = jobConf.get(file.getAbsolutePath() + MD5_SUFFIX, "");
             TextFileReader textFileReader = new TextFileReader(file, 
seekPosition);
             long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, 
DEFAULT_JOB_READ_WAIT_TIMEOUT);
-            textFileReader.setWaitMillisecs(waitTimeout);
+            textFileReader.setWaitMillisecond(waitTimeout);
             addValidator(filterPattern, textFileReader);
             result.add(textFileReader);
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index e9ed0ac62..50e8fbd9b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -26,12 +26,11 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROU
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 
 /**
- * abstract reader, init reader and reader metrics
+ * Abstract reader, init reader and reader metrics
  */
 public abstract class AbstractReader implements Reader {
 
     protected String inlongGroupId;
-
     protected String inlongStreamId;
     protected String metricTagName;
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 7b7961a48..5e519d731 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -27,7 +27,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.constant.SnapshotModeConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.plugin.Message;
@@ -39,8 +38,6 @@ import org.apache.inlong.agent.pojo.DebeziumFormat;
 import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
-import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
-import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +55,7 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPAC
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
 
 /**
- * read binlog data
+ * Binlog data reader.
  */
 public class BinlogReader extends AbstractReader {
 
@@ -80,7 +77,7 @@ public class BinlogReader extends AbstractReader {
     public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
     public static final String JOB_DATABASE_QUEUE_SIZE = 
"job.binlogJob.queueSize";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BinlogReader.class);
-    private static final Gson gson = new Gson();
+    private static final Gson GSON = new Gson();
     private static final String BINLOG_READER_TAG_NAME = "AgentBinlogMetric";
     private final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
     /**
@@ -104,16 +101,11 @@ public class BinlogReader extends AbstractReader {
     private String historyMonitorDdl;
     private String instanceId;
     private ExecutorService executor;
-    private String offset;
     private String specificOffsetFile;
     private String specificOffsetPos;
     private BinlogSnapshotBase binlogSnapshot;
     private JobProfile jobProfile;
     private boolean destroyed = false;
-    private boolean enableReportConfigLog;
-    private StreamConfigLogMetric streamConfigLogMetric;
-    private String inlongGroupId;
-    private String inlongStreamId;
 
     public BinlogReader() {
     }
@@ -132,8 +124,7 @@ public class BinlogReader extends AbstractReader {
         Pair<String, String> message = binlogMessagesQueue.poll();
         Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
         header.put(PROXY_KEY_DATA, message.getKey());
-        return new 
DefaultMessage(message.getValue().getBytes(StandardCharsets.UTF_8),
-                header);
+        return new 
DefaultMessage(message.getValue().getBytes(StandardCharsets.UTF_8), header);
     }
 
     @Override
@@ -150,8 +141,7 @@ public class BinlogReader extends AbstractReader {
         offsetFlushIntervalMs = 
jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
         databaseStoreHistoryName = 
jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
                 tryToInitAndGetHistoryPath()) + "/history.dat" + 
jobConf.getInstanceId();
-        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
-                tryToInitAndGetHistoryPath()) + "/offset.dat" + 
jobConf.getInstanceId();
+
         snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, "");
         includeSchemaChanges = 
jobConf.get(JOB_DATABASE_INCLUDE_SCHEMA_CHANGES, "false");
         historyMonitorDdl = jobConf.get(JOB_DATABASE_HISTORY_MONITOR_DDL, 
"false");
@@ -159,75 +149,46 @@ public class BinlogReader extends AbstractReader {
         instanceId = jobConf.getInstanceId();
         finished = false;
 
-        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
         specificOffsetFile = 
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
         specificOffsetPos = 
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/offset.dat" + 
jobConf.getInstanceId();
         binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
+        String offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
         binlogSnapshot.save(offset);
 
-        enableReportConfigLog =
-                
Boolean.parseBoolean(jobConf.get(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,
-                        "true"));
-
-        inlongGroupId = jobConf.get(CommonConstants.PROXY_INLONG_GROUP_ID,
-                CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID);
-        inlongStreamId = jobConf.get(CommonConstants.PROXY_INLONG_STREAM_ID,
-                CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID);
-        metricTagName = BINLOG_READER_TAG_NAME + "_" + inlongGroupId + "_" + 
inlongStreamId;
-
-        if (enableReportConfigLog) {
-            String reportConfigServerUrl = jobConf
-                    .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL, 
"");
-            String reportConfigLogInterval = jobConf
-                    .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, 
"60000");
-            String clientVersion = jobConf
-                    
.get(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
-            streamConfigLogMetric = new StreamConfigLogMetric(COMPONENT_NAME,
-                    reportConfigServerUrl, 
Long.parseLong(reportConfigLogInterval),
-                    AgentUtils.getLocalIp(), clientVersion);
-        }
+        metricTagName = String.join("_", BINLOG_READER_TAG_NAME, 
inlongGroupId, inlongStreamId);
 
         Properties props = getEngineProps();
-
-        DebeziumEngine<ChangeEvent<String, String>> engine = 
DebeziumEngine.create(
-                io.debezium.engine.format.Json.class)
-                .using(props)
+        DebeziumEngine<ChangeEvent<String, String>> engine = 
DebeziumEngine.create(io.debezium.engine.format.Json.class)
                 .notifying((records, committer) -> {
                     try {
                         for (ChangeEvent<String, String> record : records) {
-                            DebeziumFormat debeziumFormat = gson
-                                    .fromJson(record.value(), 
DebeziumFormat.class);
-                            
binlogMessagesQueue.put(Pair.of(debeziumFormat.getSource().getTable(),
-                                    record.value()));
+                            DebeziumFormat debeziumFormat = 
GSON.fromJson(record.value(), DebeziumFormat.class);
+                            
binlogMessagesQueue.put(Pair.of(debeziumFormat.getSource().getTable(), 
record.value()));
                             committer.markProcessed(record);
                         }
                         committer.markBatchFinished();
                     } catch (Exception e) {
                         LOGGER.error("parse binlog message error", e);
-
                     }
-
                 })
+                .using(props)
                 .using((success, message, error) -> {
                     if (!success) {
-                        LOGGER.error("binlog job with jobConf {} has " + 
"error {}",
-                                jobConf.getInstanceId(), message, error);
-                        streamConfigLogMetric
-                                .updateConfigLog(inlongGroupId, 
inlongStreamId, "DBConfig",
-                                        ConfigLogTypeEnum.ERROR, error == null 
? "" : error.toString());
+                        LOGGER.error("error for binlog job: {}, msg: {}", 
jobConf.getInstanceId(), message, error);
                     }
                 }).build();
 
         executor = Executors.newSingleThreadExecutor();
         executor.execute(engine);
 
-        LOGGER.info("get initial snapshot of job {}, snapshot {}",
-                jobConf.getInstanceId(), getSnapshot());
+        LOGGER.info("get initial snapshot of job {}, snapshot {}", 
jobConf.getInstanceId(), getSnapshot());
     }
 
     private Properties getEngineProps() {
         Properties props = new Properties();
-
         props.setProperty("name", "engine" + instanceId);
         props.setProperty("connector.class", 
MySqlConnector.class.getCanonicalName());
 
@@ -316,12 +277,10 @@ public class BinlogReader extends AbstractReader {
 
     @Override
     public void setReadTimeout(long mill) {
-        return;
     }
 
     @Override
-    public void setWaitMillisecs(long millis) {
-        return;
+    public void setWaitMillisecond(long millis) {
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 93910e99a..18bd19362 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -164,7 +164,7 @@ public class KafkaReader<K, V> extends AbstractReader {
     }
 
     @Override
-    public void setWaitMillisecs(long millis) {
+    public void setWaitMillisecond(long millis) {
         waitTimeout = millis;
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index 33f4f04a9..880231cb1 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -151,7 +151,7 @@ public class SqlReader extends AbstractReader {
     }
 
     @Override
-    public void setWaitMillisecs(long millis) {
+    public void setWaitMillisecond(long millis) {
 
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index 7ec8e8ad1..312dd281e 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -134,7 +134,7 @@ public class TextFileReader extends AbstractReader {
     }
 
     @Override
-    public void setWaitMillisecs(long millis) {
+    public void setWaitMillisecond(long millis) {
         waitTimeout = millis;
     }
 
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
deleted file mode 100644
index cf1f796c4..000000000
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import com.google.gson.Gson;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AbstractReporter<T> {
-
-    public static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractReporter.class);
-
-    public static final String AGENT_HTTP_APPLICATION_JSON = 
"application/json";
-
-    private static final Gson gson = new Gson();
-
-    private static final int DEFAULT_CORE_POOL_SIZE = 1;
-
-    private static final int DEFAULT_MAX_POOL_SIZE = 2;
-
-    private static final int DEFAULT_SYNC_SEND_QUEUE_SIZE = 10000;
-
-    private static CloseableHttpClient httpClient;
-
-    private final Class<?> clazz = Response.class;
-
-    private ThreadPoolExecutor pool;
-
-    private String serverUrl;
-
-    public AbstractReporter(String serverUrl) {
-        this(serverUrl, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
-                DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
-    }
-
-    public AbstractReporter(CloseableHttpClient httpClient, String serverUrl) {
-        this(httpClient, serverUrl, DEFAULT_CORE_POOL_SIZE, 
DEFAULT_MAX_POOL_SIZE,
-                DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
-    }
-
-    public AbstractReporter(String serverUrl, int corePoolSize, int 
maximumPoolsize,
-            int syncSendQueueSize,
-            RejectedExecutionHandler rejectedExecutionHandler) {
-        this.serverUrl = serverUrl;
-        if (httpClient == null) {
-            RequestConfig requestConfig = RequestConfig.custom().build();
-            HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
-            httpClientBuilder.setDefaultRequestConfig(requestConfig);
-            httpClient = httpClientBuilder.build();
-        }
-        if (rejectedExecutionHandler == null) {
-            rejectedExecutionHandler = new 
ThreadPoolExecutor.DiscardOldestPolicy();
-        }
-        pool = new ThreadPoolExecutor(corePoolSize, maximumPoolsize,
-                60, TimeUnit.SECONDS, new 
LinkedBlockingDeque<>(syncSendQueueSize),
-                Executors.defaultThreadFactory(), rejectedExecutionHandler);
-    }
-
-    public AbstractReporter(CloseableHttpClient httpClient, String serverUrl, 
int corePoolSize,
-            int maximumPoolSize,
-            int syncSendQueueSize,
-            RejectedExecutionHandler rejectedExecutionHandler) {
-        this(serverUrl, corePoolSize, maximumPoolSize, syncSendQueueSize, 
rejectedExecutionHandler);
-        this.httpClient = httpClient;
-    }
-
-    /**
-     * Report data by sync.
-     */
-    public Response syncReportData(T data, String serverUrl) throws Exception {
-        if (StringUtils.isEmpty(serverUrl)) {
-            LOGGER.warn("Report config log server url is empty, so config log 
can not be "
-                    + "reported!");
-            return null;
-        }
-        HttpPost httpPost = new HttpPost(serverUrl);
-        String returnStr = null;
-        try {
-            StringEntity stringEntity = new StringEntity(gson.toJson(data));
-            stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
-            httpPost.setEntity(stringEntity);
-            returnStr = executeHttpPost(httpPost);
-            return parse(returnStr);
-        } catch (Exception e) {
-            LOGGER.error("syncReportData has exception returnStr = {}, e:", 
returnStr, e);
-            throw e;
-        }
-    }
-
-    /**
-     * Report data by sync.
-     */
-    public Response syncReportData(T data) throws Exception {
-        return this.syncReportData(data, serverUrl);
-    }
-
-    public String executeHttpPost(HttpPost httpPost) throws Exception {
-        CloseableHttpResponse response = httpClient.execute(httpPost);
-        if (response == null) {
-            return null;
-        }
-        return EntityUtils.toString(response.getEntity());
-    }
-
-    /**
-     * Report data by async.
-     */
-    public Future<Response>  asyncReportData(T data, String serverUrl) {
-        CompletableFuture<Response> completableFuture = new 
CompletableFuture<>();
-
-        if (pool != null) {
-            pool.execute(new RunTask(completableFuture, data, serverUrl));
-        } else {
-            completableFuture.completeExceptionally(new Exception("Send pool 
is null!"));
-        }
-
-        return completableFuture;
-    }
-
-    /**
-     * Report data by async.
-     */
-    public Future<Response> asyncReportData(T data) {
-        return asyncReportData(data, serverUrl);
-    }
-
-    /**
-     * Parse json data.
-     */
-    public Response parse(String json) throws Exception {
-
-        if (StringUtils.isEmpty(json)) {
-            return null;
-        }
-
-        return gson.fromJson(json, Response.class);
-    }
-
-    class RunTask implements Runnable {
-
-        private CompletableFuture<Response> completableFuture;
-
-        private T data;
-
-        private String url;
-
-        public RunTask(CompletableFuture<Response> completableFuture, T data, 
String url) {
-            this.completableFuture = completableFuture;
-            this.data = data;
-            this.url = url;
-        }
-
-        public void run() {
-            try {
-                completableFuture.complete(syncReportData(data, url));
-            } catch (Exception e) {
-                completableFuture.completeExceptionally(e);
-            }
-        }
-    }
-}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
deleted file mode 100644
index d7d654ad4..000000000
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import static java.util.Objects.requireNonNull;
-
-public enum ConfigLogTypeEnum {
-
-    NORMAL(0),ERROR(1);
-
-    private int type;
-
-    ConfigLogTypeEnum(int type) {
-        this.type = type;
-    }
-
-    public static ConfigLogTypeEnum getOpType(int opType) {
-        requireNonNull(opType);
-        switch (opType) {
-            case 0:
-                return NORMAL;
-            case 1:
-                return ERROR;
-            default:
-                throw new RuntimeException("config log type doesn't exist");
-        }
-    }
-
-    public int getType() {
-        return type;
-    }
-}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java 
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
deleted file mode 100644
index 4f4d1173c..000000000
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import lombok.Getter;
-import lombok.Setter;
-
-@Setter
-@Getter
-public class Response {
-    private boolean success;
-    private String errMsg;
-    private String data;
-}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
deleted file mode 100644
index 364cb59a7..000000000
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.time.Instant;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamConfigLogMetric implements Runnable {
-
-    public static final Logger LOGGER = 
LoggerFactory.getLogger(StreamConfigLogMetric.class);
-
-    /*
-     * config name for log report
-     */
-    public static final String CONFIG_LOG_REPORT_ENABLE = 
"report.config.log.enable";
-    public static final String CONFIG_LOG_REPORT_SERVER_URL = 
"report.config.log.server.url";
-    public static final String CONFIG_LOG_REPORT_INTERVAL = 
"report.config.log.interval";
-    public static final String CONFIG_LOG_REPORT_CLIENT_VERSION = 
"report.config.log.client.version";
-    public static final String CONFIG_LOG_PULSAR_PRODUCER = "pulsar-producer";
-    public static final String CONFIG_LOG_PULSAR_CLIENT = "pulsar-client";
-
-    private StreamConfigLogReporter streamConfigLogReporter;
-
-    private String moduleName;
-
-    private String clientVersion;
-
-    private String localIp;
-
-    private long reportInterval;
-
-    public ConcurrentHashMap<String, StreamConfigLogInfo> dataCacheMap = new 
ConcurrentHashMap<>();
-
-    private static ScheduledExecutorService statExecutor =
-            Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
-                    .setNameFormat("StreamConfigLogMetric-Report")
-                    .setUncaughtExceptionHandler((t, e) ->
-                            LOGGER.error(t.getName() + " has an uncaught 
exception: ", e))
-                    .build());
-
-    public StreamConfigLogMetric(String moduleName, String serverUrl, long 
reportInterval,
-            String localIp, String clientVersion) {
-        this.streamConfigLogReporter = new StreamConfigLogReporter(serverUrl);
-        this.reportInterval = reportInterval;
-        this.moduleName = moduleName;
-        this.localIp = localIp;
-        this.clientVersion = clientVersion;
-        statExecutor.scheduleWithFixedDelay(this,
-                reportInterval, reportInterval, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * updateConfigLog
-     * @param inlongGroupId  inlongGroupId
-     * @param inlongStreamId inlongStreamId
-     * @param configName configName
-     * @param configLogTypeEnum configLogTypeEnum
-     * @param log log
-     */
-    public void updateConfigLog(String inlongGroupId, String inlongStreamId, 
String configName,
-            ConfigLogTypeEnum configLogTypeEnum, String log) {
-        String key = moduleName + "-" +  inlongGroupId + "-" + inlongStreamId 
+ "-" + configName;
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("updateConfigLog key = {}", key);
-        }
-        dataCacheMap.compute(key, (k, v) -> {
-            if (v == null) {
-                v = new StreamConfigLogInfo();
-            }
-            updateDataValue(v, inlongGroupId,
-                    inlongStreamId, configName, configLogTypeEnum, log);
-            return v;
-        });
-    }
-
-    /**
-     * Update value by config.
-     */
-    private void updateDataValue(StreamConfigLogInfo streamConfigLogInfo,
-            String inlongGroupId, String inlongStreamId, String configName,
-            ConfigLogTypeEnum configLogTypeEnum, String log) {
-        streamConfigLogInfo.setComponentName(moduleName);
-        streamConfigLogInfo.setConfigName(configName);
-        streamConfigLogInfo.setInlongGroupId(inlongGroupId);
-        streamConfigLogInfo.setInlongStreamId(inlongStreamId);
-        streamConfigLogInfo.setIp(localIp);
-        streamConfigLogInfo.setVersion(clientVersion);
-        streamConfigLogInfo.setLogInfo(log);
-        streamConfigLogInfo.setReportTime(Instant.now().toEpochMilli());
-        streamConfigLogInfo.setLogType(configLogTypeEnum.getType());
-    }
-
-    /**
-     * Logic entrance of Metric.
-     */
-    public void run() {
-        try {
-            Set<Entry<String, StreamConfigLogInfo>> set = 
dataCacheMap.entrySet();
-            long currentTimeMills = Instant.now().toEpochMilli();
-            for (Entry<String, StreamConfigLogInfo> entry : set) {
-                StreamConfigLogInfo streamConfigLogInfo = entry.getValue();
-                if ((currentTimeMills - streamConfigLogInfo.getReportTime()) < 
reportInterval) {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("Report metric data config key = {}!", 
streamConfigLogInfo.getConfigName());
-                    }
-                    
streamConfigLogReporter.asyncReportData(streamConfigLogInfo);
-                } else {
-                    dataCacheMap.remove(entry.getKey());
-                    LOGGER.info("Remove expired config key {}", 
entry.getKey());
-                }
-            }
-        } catch (Exception e) {
-            LOGGER.error("Report streamConfigLogMetric has exception = {}", e);
-        }
-    }
-}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
deleted file mode 100644
index 14009a727..000000000
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import java.util.concurrent.RejectedExecutionHandler;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-
-public class StreamConfigLogReporter extends 
AbstractReporter<StreamConfigLogInfo> {
-
-    public StreamConfigLogReporter(String serverUrl) {
-        super(serverUrl);
-    }
-
-    public StreamConfigLogReporter(CloseableHttpClient httpClient, String 
serverUrl) {
-        super(httpClient, serverUrl);
-    }
-
-    public StreamConfigLogReporter(String serverUrl, int corePoolSize, int 
maximumPoolsize,
-            int syncSendQueueSize,
-            RejectedExecutionHandler rejectedExecutionHandler) {
-        super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
-                rejectedExecutionHandler);
-    }
-}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
deleted file mode 100644
index 3ce50e135..000000000
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter.dto;
-
-import lombok.Getter;
-import lombok.Setter;
-
-@Setter
-@Getter
-public class StreamConfigLogInfo {
-
-    private String ip;
-
-    private String version;
-
-    private String componentName;
-
-    private String configName;
-
-    private Integer logType;
-
-    private long reportTime;
-
-    private String logInfo;
-
-    private String inlongGroupId;
-
-    private String inlongStreamId;
-
-}
diff --git 
a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
 
b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
deleted file mode 100644
index eadbea2d6..000000000
--- 
a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.metric.reporter;
-
-import static org.mockito.Mockito.mock;
-
-import java.util.concurrent.Future;
-import org.apache.inlong.common.reporpter.Response;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.inlong.common.reporpter.StreamConfigLogReporter;
-import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class ReporterTest {
-
-    @Test
-    public void streamConfigLogReporterTest() throws Exception {
-        String serverUrl = 
"http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log";
-                + "/reportConfigLogStatus";
-        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
-        StreamConfigLogReporter streamConfigLogReporter = new 
StreamConfigLogReporter(httpClient,
-                serverUrl);
-        StreamConfigLogReporter spy = Mockito.spy(streamConfigLogReporter);
-        StreamConfigLogInfo info = new StreamConfigLogInfo();
-        Future<Response> future = spy.asyncReportData(info, serverUrl);
-        Assert.assertEquals(future.get(),null);
-    }
-}
diff --git a/inlong-dataproxy/conf/common.properties 
b/inlong-dataproxy/conf/common.properties
index 4639c54e0..8ec3857dc 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -35,8 +35,3 @@ prometheusHttpPort=9081
 audit.enable=true
 # audit proxy address
 audit.proxys=127.0.0.1:10081
-
-# report config log
-report.config.log.enable=true
-report.config.log.server.url=http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log/reportConfigLogStatus
-report.config.log.interval=60000
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 380826649..2c3dc712a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -25,7 +25,6 @@ import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.handler.codec.TooLongFrameException;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -34,12 +33,10 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
-import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
 import org.apache.inlong.dataproxy.base.OrderEvent;
 import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -64,6 +61,7 @@ import 
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedExcepti
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -71,134 +69,117 @@ import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * use pulsarSink need adding such config, if these ara not config in 
dataproxy-pulsar.conf,
+ * Use pulsarSink need adding such config, if these ara not config in 
dataproxy-pulsar.conf,
  * PulsarSink will use default value.
- * prefix of pulsar sink config in flume.conf like this XXX.sinks.XXX.property
- * and properties are may these Configurations:
- *  type (*): value must be 'org.apache.inlong.dataproxy.sink.PulsarSink'
- *  pulsar_server_url_list (*): value is pulsar broker url , like this 
'pulsar://127.0.0.1:6650'
- *  send_timeout_MILL: send message timeout, unit is millisecond, default 
value is 30000 (mean 30s)
- *  stat_interval_sec: stat info will be made period time , unit is second, 
default value is 60s
- *  thread-num: sink thread num. default value  is 8
- *  client-id-cache: whether use cache in client, default value is true
- *  max_pending_messages: default value is 10000
- *  max_batching_messages: default value is 1000
- *  enable_batch: default is true
- *  block_if_queue_full: default is true
+ * <p/>
+ *
+ * Prefix of Pulsar sink config in flume.conf like this XXX.sinks.XXX.property 
and properties are may these
+ * configurations:
+ * <p/>
+ * <code>type</code> (*): value must be 
'org.apache.inlong.dataproxy.sink.PulsarSink'
+ * <p/>
+ * <code>pulsar_server_url_list</code> (*): value is pulsar broker url, like 
'pulsar://127.0.0.1:6650'
+ * <p/>
+ * <code>send_timeout_MILL</code>: send message timeout, unit is millisecond, 
default is 30000 (mean 30s)
+ * <p/>
+ * <code>stat_interval_sec</code>: stat info will be made period time, unit is 
second, default is 60s
+ * <p/>
+ * <code>thread-num</code>: sink thread num, default is 8
+ * <p/>
+ * <code>client-id-cache</code>: whether the client uses cache, default is true
+ * <p/>
+ * <code>max_pending_messages</code>: default is 10000
+ * <p/>
+ * <code>max_batching_messages</code>: default is 1000
+ * <p/>
+ * <code>enable_batch</code>: default is true
+ * <p/>
+ * <code>block_if_queue_full</code>: default is true
  */
-public class PulsarSink extends AbstractSink implements Configurable,
-        SendMessageCallBack, CreatePulsarClientCallBack {
+public class PulsarSink extends AbstractSink implements Configurable, 
SendMessageCallBack, CreatePulsarClientCallBack {
 
     private static final Logger logger = 
LoggerFactory.getLogger(PulsarSink.class);
-
-    /*
-     * properties for header info
+    /**
+     * log tools
      */
-    private static String TOPIC = "topic";
+    private static final LogCounter logPrinterB = new LogCounter(10, 100000, 
60 * 1000);
+    private static final LogCounter logPrinterC = new LogCounter(10, 100000, 
60 * 1000);
+    private static final String SEPARATOR = "#";
+    private static final Long PRINT_INTERVAL = 30L;
 
+    private static final PulsarPerformanceTask PULSAR_PERFORMANCE_TASK = new 
PulsarPerformanceTask();
+    private static final LoadingCache<String, Long> AGENT_ID_CACHE = 
CacheBuilder.newBuilder()
+            .concurrencyLevel(4 * 
8).initialCapacity(500).expireAfterAccess(30, TimeUnit.SECONDS)
+            .build(new CacheLoader<String, Long>() {
+                @Nonnull
+                @Override
+                public Long load(@Nonnull String key) {
+                    return System.currentTimeMillis();
+                }
+            });
     /*
-     * for log
+     * properties for header info
      */
-    private RateLimiter diskRateLimiter;
-
+    private static final String TOPIC = "topic";
     /*
      * for stat
      */
-    private static AtomicLong totalPulsarSuccSendCnt = new AtomicLong(0);
-    private static AtomicLong totalPulsarSuccSendSize = new AtomicLong(0);
-    private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
-    private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
-    private long t1 = System.currentTimeMillis();
-    private long t2 = 0L;
-
-    private AtomicInteger processIndex = new AtomicInteger(0);
+    private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_CNT = new 
AtomicLong(0);
+    private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_SIZE = new 
AtomicLong(0);
+    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = 
Executors
+            .newScheduledThreadPool(1, new 
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
 
-    private int maxMonitorCnt = 300000;
+    static {
+        
SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(PULSAR_PERFORMANCE_TASK, 0L,
+                PRINT_INTERVAL, TimeUnit.SECONDS);
+        logger.info("success to start pulsar performance task");
+    }
 
     private final AtomicLong currentInFlightCount = new AtomicLong(0);
+    private final AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+    private final AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+    private final AtomicInteger processIndex = new AtomicInteger(0);
 
+    private RateLimiter diskRateLimiter;
+    private long t1 = System.currentTimeMillis();
+    private int maxMonitorCnt = 300000;
     /*
      * Control whether the SinkRunner thread can read data from the Channel
      */
     private volatile boolean canTake = false;
-
-    /*
-     * log tools
-     */
-    private static final LogCounter logPrinterB = new LogCounter(10, 100000, 
60 * 1000);
-    private static final LogCounter logPrinterC = new LogCounter(10, 100000, 
60 * 1000);
     private SinkCounter sinkCounter;
-
     /*
      * message queue and retry
      */
     private int eventQueueSize = 10000;
     private int badEventQueueSize = 10000;
     private int maxRetrySendCnt = 16;
-
     /*
      * send thread pool
      */
     private SinkTask[] sinkThreadPool;
     private int sinkThreadPoolSize;
     private PulsarClientService pulsarClientService;
-
-    private static final String SEPARATOR = "#";
-
-
     /*
      * statistic info log
      */
     private MonitorIndex monitorIndex;
     private MonitorIndexExt monitorIndexExt;
 
-    /*
-     * report error log
-     */
-    private StreamConfigLogMetric streamConfigLogMetric;
-    private String localIp;
-
     /*
      *  metric
      */
     private Map<String, String> dimensions;
     private DataProxyMetricItemSet metricItemSet;
-
     private ConfigManager configManager;
-    private Map<String, String> commonProperties;
     private Map<String, String> topicProperties;
-
     private Map<String, String> pulsarCluster;
     private MQClusterConfig pulsarConfig;
 
-    private static final Long PRINT_INTERVAL = 30L;
-
-    private static final PulsarPerformanceTask pulsarPerformanceTask = new 
PulsarPerformanceTask();
-
-    private static ScheduledExecutorService scheduledExecutorService = 
Executors
-            .newScheduledThreadPool(1, new 
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
-
-    private static final  LoadingCache<String, Long> agentIdCache = 
CacheBuilder.newBuilder()
-            .concurrencyLevel(4 * 
8).initialCapacity(500).expireAfterAccess(30, TimeUnit.SECONDS)
-            .build(new CacheLoader<String, Long>() {
-                @Override
-                public Long load(String key) {
-                    return System.currentTimeMillis();
-                }
-            });
-
-    static {
-        /*
-         * stat pulsar performance
-         */
-        logger.info("pulsarPerformanceTask!!!!!!");
-        scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, 
0L,
-                PRINT_INTERVAL, TimeUnit.SECONDS);
-    }
-
     public PulsarSink() {
         super();
         logger.debug("new instance of PulsarSink!");
@@ -206,7 +187,6 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
 
     /**
      * configure
-     * @param context
      */
     @Override
     public void configure(Context context) {
@@ -217,35 +197,18 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
         topicProperties = configManager.getTopicProperties();
         pulsarCluster = configManager.getMqClusterUrl2Token();
         pulsarConfig = configManager.getMqClusterConfig(); //pulsar common 
config
-        commonProperties = configManager.getCommonProperties();
+        Map<String, String> commonProperties = 
configManager.getCommonProperties();
         sinkThreadPoolSize = pulsarConfig.getThreadNum();
         if (sinkThreadPoolSize <= 0) {
             sinkThreadPoolSize = 1;
         }
         pulsarClientService = new PulsarClientService(pulsarConfig, 
sinkThreadPoolSize);
-        boolean enableReportConfigLog =
-                Boolean.parseBoolean(commonProperties
-                        
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,"true"));
-        localIp = NetworkUtils.getLocalIp();
-        if (enableReportConfigLog) {
-            String reportConfigServerUrl = commonProperties
-                    
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL, "");
-            String reportConfigLogInterval = commonProperties
-                    
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
-            String clientVersion = commonProperties
-                    
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
-            streamConfigLogMetric = new 
StreamConfigLogMetric(ComponentTypeEnum.DataProxy.getName(),
-                    reportConfigServerUrl, 
Long.parseLong(reportConfigLogInterval),
-                    localIp, clientVersion);
-            pulsarClientService.setConfigLogMetric(streamConfigLogMetric);
-        }
 
         configManager.getTopicConfig().addUpdateCallback(new 
ConfigUpdateCallback() {
             @Override
             public void update() {
                 if (pulsarClientService != null) {
-                    diffSetPublish(pulsarClientService,
-                            new HashSet<>(topicProperties.values()),
+                    diffSetPublish(pulsarClientService, new 
HashSet<>(topicProperties.values()),
                             new 
HashSet<>(configManager.getTopicProperties().values()));
                 }
             }
@@ -254,8 +217,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
             @Override
             public void update() {
                 if (pulsarClientService != null) {
-                    diffUpdatePulsarClient(pulsarClientService, pulsarCluster,
-                            configManager.getMqClusterUrl2Token());
+                    diffUpdatePulsarClient(pulsarClientService, pulsarCluster, 
configManager.getMqClusterUrl2Token());
                 }
             }
         });
@@ -280,19 +242,14 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
                 pulsarClientService.initTopicProducer(topic);
             }
         }
-        logger.info(getName() + " initTopicSet cost: "
-                + (System.currentTimeMillis() - startTime) + "ms");
-        logger.info(getName() + " producer is ready for topics : "
-                + pulsarClientService.getProducerInfoMap().keySet());
+        logger.info(getName() + " initTopicSet cost: " + 
(System.currentTimeMillis() - startTime) + "ms");
+        logger.info(getName() + " producer is ready for topics: " + 
pulsarClientService.getProducerInfoMap().keySet());
     }
 
     /**
      * When topic.properties is re-enabled, the producer update is triggered
-     * @param originalSet
-     * @param endSet
      */
-    public void diffSetPublish(PulsarClientService pulsarClientService, 
Set<String> originalSet,
-            Set<String> endSet) {
+    public void diffSetPublish(PulsarClientService pulsarClientService, 
Set<String> originalSet, Set<String> endSet) {
         boolean changed = false;
         for (String s : endSet) {
             if (!originalSet.contains(s)) {
@@ -300,7 +257,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
                 try {
                     pulsarClientService.initTopicProducer(s);
                 } catch (Exception e) {
-                    logger.error("Get producer failed!", e);
+                    logger.error("get producer failed: ", e);
                 }
             }
         }
@@ -312,22 +269,17 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
 
     /**
      * When pulsarURLList change, close and restart
-     *
-     * @param originalCluster
-     * @param endCluster
      */
     public void diffUpdatePulsarClient(PulsarClientService 
pulsarClientService, Map<String, String> originalCluster,
-                                       Map<String, String> endCluster) {
+            Map<String, String> endCluster) {
         MapDifference<String, String> mapDifference = 
Maps.difference(originalCluster, endCluster);
         if (mapDifference.areEqual()) {
             return;
         }
 
         logger.info("pulsarConfig has changed, close unused url clients and 
start new url clients");
-        Map<String, String> needToStart = new HashMap<>();
-        Map<String, String> needToClose = new HashMap<>();
-        needToClose.putAll(mapDifference.entriesOnlyOnLeft());
-        needToStart.putAll(mapDifference.entriesOnlyOnRight());
+        Map<String, String> needToClose = new 
HashMap<>(mapDifference.entriesOnlyOnLeft());
+        Map<String, String> needToStart = new 
HashMap<>(mapDifference.entriesOnlyOnRight());
         Map<String, MapDifference.ValueDifference<String>> differentToken = 
mapDifference.entriesDiffering();
         for (String url : differentToken.keySet()) {
             needToClose.put(url, originalCluster.get(url));
@@ -343,7 +295,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
     @Override
     public void start() {
         logger.info("[{}] pulsar sink starting...", getName());
-        //register metrics
+        // register metrics
         this.dimensions = new HashMap<>();
         this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
         this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
@@ -354,9 +306,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
         int statIntervalSec = pulsarConfig.getStatIntervalSec();
         Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec 
must be >= 0");
         if (statIntervalSec > 0) {
-            /*
-             * switch for lots of metrics
-             */
+            // switch for lots of metrics
             monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, 
maxMonitorCnt);
             monitorIndexExt = new MonitorIndexExt("Pulsar_Sink_monitors#" + 
this.getName(),
                     statIntervalSec, maxMonitorCnt);
@@ -391,7 +341,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
         int waitCount = 0;
         while (isAllSendFinished() && waitCount++ < 10) {
             try {
-                Thread.currentThread().sleep(1000);
+                Thread.sleep(1000);
             } catch (InterruptedException e) {
                 logger.info("Stop thread has been interrupt!");
                 break;
@@ -418,8 +368,8 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
         }
 
         super.stop();
-        if (!scheduledExecutorService.isShutdown()) {
-            scheduledExecutorService.shutdown();
+        if (!SCHEDULED_EXECUTOR_SERVICE.isShutdown()) {
+            SCHEDULED_EXECUTOR_SERVICE.shutdown();
         }
         sinkCounter.stop();
         logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
@@ -456,16 +406,14 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
                             + "last long time it will cause memoryChannel full 
and fileChannel write.)", getName());
                     tx.rollback();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                            event.getHeaders().getOrDefault(TOPIC, ""));
+                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, 
event.getHeaders().getOrDefault(TOPIC, ""));
                     DataProxyMetricItem metricItem = 
this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readFailCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
                 } else {
                     tx.commit();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                            event.getHeaders().getOrDefault(TOPIC, ""));
+                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, 
event.getHeaders().getOrDefault(TOPIC, ""));
                     DataProxyMetricItem metricItem = 
this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readSuccessCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
@@ -491,7 +439,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
     private void editStatistic(final Event event, String keyPostfix, boolean 
isOrder) {
         String topic = "";
         String streamId = "";
-        String nodeIp = null;
+        String nodeIp;
         if (event != null) {
             if (event.getHeaders().containsKey(TOPIC)) {
                 topic = event.getHeaders().get(TOPIC);
@@ -502,17 +450,13 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
                 streamId = event.getHeaders().get(AttributeConstants.INAME);
             }
 
-            /*
-             * Compatible agent
-             */
+            // Compatible agent
             if (event.getHeaders().containsKey("ip")) {
                 event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, 
event.getHeaders().get("ip"));
                 event.getHeaders().remove("ip");
             }
 
-            /*
-             * Compatible agent
-             */
+            // Compatible agent
             if (event.getHeaders().containsKey("time")) {
                 event.getHeaders().put(AttributeConstants.DATA_TIME, 
event.getHeaders().get("time"));
                 event.getHeaders().remove("time");
@@ -521,46 +465,39 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
             if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) 
{
                 nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
                 if 
(event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) {
-
                     if (nodeIp != null) {
                         nodeIp = nodeIp.split(":")[0];
                     }
 
-                    long tMsgCounterL = 1L;
-                    /*
-                     * msg counter
-                     */
+                    long msgCounterL = 1L;
+                    // msg counter
                     if 
(event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
-                        tMsgCounterL = Integer.parseInt(event.getHeaders()
-                                .get(ConfigConstants.MSG_COUNTER_KEY));
+                        msgCounterL = 
Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
                     }
 
                     String orderType = "non-order";
                     if (isOrder) {
                         orderType = "order";
                     }
-                    StringBuilder newbase = new StringBuilder();
-                    
newbase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
+                    StringBuilder newBase = new StringBuilder();
+                    
newBase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
                             .append(streamId).append(SEPARATOR).append(nodeIp)
                             
.append(SEPARATOR).append(NetworkUtils.getLocalIp())
                             
.append(SEPARATOR).append(orderType).append(SEPARATOR)
                             
.append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
 
                     long messageSize = event.getBody().length;
-
                     if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != 
null) {
                         messageSize = 
Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN));
                     }
 
                     if (keyPostfix != null && !keyPostfix.equals("")) {
-                        monitorIndex.addAndGet(new String(newbase), 0, 0,
-                                0, (int) tMsgCounterL);
+                        monitorIndex.addAndGet(new String(newBase), 0, 0, 0, 
(int) msgCounterL);
                         if (logPrinterB.shouldPrint()) {
                             logger.warn("error cannot send event, {} event 
size is {}", topic, messageSize);
                         }
                     } else {
-                        monitorIndex.addAndGet(new String(newbase), (int) 
tMsgCounterL,
-                                1, messageSize, 0);
+                        monitorIndex.addAndGet(new String(newBase), (int) 
msgCounterL, 1, messageSize, 0);
                     }
                 }
             }
@@ -584,8 +521,8 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
         /*
          * Statistics pulsar performance
          */
-        totalPulsarSuccSendCnt.incrementAndGet();
-        
totalPulsarSuccSendSize.addAndGet(eventStat.getEvent().getBody().length);
+        TOTAL_PULSAR_SUCC_SEND_CNT.incrementAndGet();
+        
TOTAL_PULSAR_SUCC_SEND_SIZE.addAndGet(eventStat.getEvent().getBody().length);
         /*
          *add to sinkCounter
          */
@@ -599,15 +536,12 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
 
         if (nowCnt % logEveryNEvents == 0 && nowCnt != 
lastSuccessSendCnt.get()) {
             lastSuccessSendCnt.set(nowCnt);
-            t2 = System.currentTimeMillis();
-            logger.info("Pulsar sink {}, succ put {} events to pulsar,"
-                    + " in the past {} millsec", new Object[] {
-                    getName(), (nowCnt - oldCnt), (t2 - t1)
-            });
+            long t2 = System.currentTimeMillis();
+            logger.info("Pulsar sink {}, succ put {} events to pulsar in the 
past {} millsec",
+                    getName(), (nowCnt - oldCnt), (t2 - t1));
             t1 = t2;
         }
-        Map<String, String> dimensions =  
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                topic);
+        Map<String, String> dimensions = 
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
         DataProxyMetricItem metricItem = 
this.metricItemSet.findMetricItem(dimensions);
         metricItem.sendSuccessCount.incrementAndGet();
         
metricItem.sendSuccessSize.addAndGet(eventStat.getEvent().getBody().length);
@@ -623,24 +557,23 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
         monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
         boolean needRetry = true;
         if (e instanceof NotFoundException) {
-            logger.error("NotFoundException topic {}, message will be 
discard!", topic, e);
+            logger.error("NotFoundException for topic " + topic + ", message 
will be discard!", e);
             needRetry = false;
         } else if (e instanceof TooLongFrameException) {
-            logger.error("Send failed TooLongFrameException! {}{}", getName(), 
e);
+            logger.error("TooLongFrameException, send failed for " + 
getName(), e);
         } else if (e instanceof ProducerQueueIsFullError) {
-            logger.error("Send failed ProducerQueueIsFullError! {}{}", 
getName(), e);
+            logger.error("ProducerQueueIsFullError, send failed for " + 
getName(), e);
         } else if (!(e instanceof AlreadyClosedException
                 || e instanceof PulsarClientException.NotConnectedException
                 || e instanceof TopicTerminatedException)) {
             if (logPrinterB.shouldPrint()) {
-                logger.error("Send failed!{}{}", getName(), e);
+                logger.error("send failed for " + getName(), e);
             }
             if (eventStat.getRetryCnt() == 0) {
                 editStatistic(eventStat.getEvent(), "failure", 
eventStat.isOrderMessage());
             }
         }
-        Map<String, String> dimensions =  
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                topic);
+        Map<String, String> dimensions = 
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
         DataProxyMetricItem metricItem = 
this.metricItemSet.findMetricItem(dimensions);
         metricItem.sendFailCount.incrementAndGet();
         
metricItem.sendFailSize.addAndGet(eventStat.getEvent().getBody().length);
@@ -650,8 +583,8 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
         }
     }
 
-    private Map getNewDimension(String otherKey, String value) {
-        Map dimensions = new HashMap<>();
+    private Map<String, String> getNewDimension(String otherKey, String value) 
{
+        Map<String, String> dimensions = new HashMap<>();
         dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
         dimensions.put(otherKey, value);
@@ -659,10 +592,11 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
     }
 
     private boolean processEvent(EventStat eventStat) {
-        boolean result = true;
         if (eventStat == null || eventStat.getEvent() == null) {
-            return result;
+            return true;
         }
+
+        boolean result = true;
         Event event = eventStat.getEvent();
         if (MessageUtils.isSyncSendForOrder(event) && (event instanceof 
OrderEvent)) {
             String partitionKey = 
event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
@@ -701,8 +635,8 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
              */
             if (pulsarConfig.getClientIdCache()) {
                 String clientId = 
eventStat.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID);
-                if (clientId != null && 
agentIdCache.asMap().containsKey(clientId)) {
-                    agentIdCache.invalidate(clientId);
+                if (clientId != null && 
AGENT_ID_CACHE.asMap().containsKey(clientId)) {
+                    AGENT_ID_CACHE.invalidate(clientId);
                 }
             }
             boolean result = false;
@@ -737,7 +671,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
     }
 
     public LoadingCache<String, Long> getAgentIdCache() {
-        return agentIdCache;
+        return AGENT_ID_CACHE;
     }
 
     public Map<String, String> getTopicsProperties() {
@@ -761,20 +695,19 @@ public class PulsarSink extends AbstractSink implements 
Configurable,
     }
 
     static class PulsarPerformanceTask implements Runnable {
+
         @Override
         public void run() {
             try {
-                if (totalPulsarSuccSendSize.get() != 0) {
+                if (TOTAL_PULSAR_SUCC_SEND_SIZE.get() != 0) {
                     logger.info("Total pulsar performance tps :"
-                            + totalPulsarSuccSendCnt.get() / PRINT_INTERVAL
+                            + TOTAL_PULSAR_SUCC_SEND_CNT.get() / PRINT_INTERVAL
                             + "/s, avg msg size:"
-                            + totalPulsarSuccSendSize.get() / 
totalPulsarSuccSendCnt.get()
+                            + TOTAL_PULSAR_SUCC_SEND_SIZE.get() / 
TOTAL_PULSAR_SUCC_SEND_CNT.get()
                             + ",print every " + PRINT_INTERVAL + " seconds");
-                    /*
-                     * totalpulsarSuccSendCnt represents the number of packets
-                     */
-                    totalPulsarSuccSendCnt.set(0);
-                    totalPulsarSuccSendSize.set(0);
+                    // totalPulsarSuccSendCnt represents the number of packets
+                    TOTAL_PULSAR_SUCC_SEND_CNT.set(0);
+                    TOTAL_PULSAR_SUCC_SEND_SIZE.set(0);
                 }
 
             } catch (Exception e) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index ad00702d7..28af3fa1c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -22,8 +22,6 @@ import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
-import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
-import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.inlong.dataproxy.base.OrderEvent;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
@@ -42,7 +40,6 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,7 +84,6 @@ public class PulsarClientService {
     private long retryIntervalWhenSendMsgError = 30 * 1000L;
     private String localIp = "127.0.0.1";
 
-    private StreamConfigLogMetric streamConfigLogMetric;
     private int sinkThreadPoolSize;
 
     /**
@@ -127,10 +123,6 @@ public class PulsarClientService {
         localIp = NetworkUtils.getLocalIp();
     }
 
-    public void setConfigLogMetric(StreamConfigLogMetric 
streamConfigLogMetric) {
-        this.streamConfigLogMetric = streamConfigLogMetric;
-    }
-
     public void initCreateConnection(CreatePulsarClientCallBack callBack) {
         pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
@@ -151,19 +143,13 @@ public class PulsarClientService {
     public boolean sendMessage(int poolIndex, String topic, Event event,
             SendMessageCallBack sendMessageCallBack, EventStat es) {
         TopicProducerInfo producerInfo = null;
-        boolean result = false;
+        boolean result;
         final String inlongStreamId = getInlongStreamId(event);
         final String inlongGroupId = getInlongGroupId(event);
         try {
             producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, 
inlongStreamId);
         } catch (Exception e) {
-            producerInfo = null;
-            logger.error("get producer failed! topic = " + topic, e);
-            if (streamConfigLogMetric != null) {
-                streamConfigLogMetric.updateConfigLog(inlongGroupId,
-                        inlongStreamId, 
StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
-                        ConfigLogTypeEnum.ERROR, e.toString());
-            }
+            logger.error("get producer failed for topic=" + topic, e);
         }
         /*
          * If the producer is a null value,\ it means that the topic is not yet
@@ -196,25 +182,19 @@ public class PulsarClientService {
         if (es.isOrderMessage()) {
             String partitionKey = 
event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
             try {
-                MessageId msgId = producer.newMessage().key(partitionKey)
-                        .properties(proMap).value(event.getBody())
+                MessageId msgId = producer.newMessage()
+                        .properties(proMap)
+                        .key(partitionKey)
+                        .value(event.getBody())
                         .send();
                 sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
                 AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, 
event);
                 forCallBackP.setCanUseSend(true);
                 result = true;
             } catch (PulsarClientException ex) {
-                if (streamConfigLogMetric != null) {
-                    streamConfigLogMetric.updateConfigLog(inlongGroupId,
-                            inlongStreamId, 
StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
-                            ConfigLogTypeEnum.ERROR, ex.toString());
-                }
                 forCallBackP.setCanUseSend(false);
                 sendMessageCallBack.handleMessageSendException(topic, es, ex);
-                result = false;
-                if (ex instanceof NotFoundException) {
-                    result = true;
-                }
+                result = ex instanceof NotFoundException;
             }
             /*
              * avoid client timeout
@@ -224,17 +204,15 @@ public class PulsarClientService {
                 sendResponse((OrderEvent) event, inlongGroupId, 
inlongStreamId);
             }
         } else {
-            producer.newMessage().properties(proMap).value(event.getBody())
-                    .sendAsync().thenAccept((msgId) -> {
+            producer.newMessage().properties(proMap)
+                    .value(event.getBody())
+                    .sendAsync()
+                    .thenAccept((msgId) -> {
                         
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
                         forCallBackP.setCanUseSend(true);
-                        sendMessageCallBack.handleMessageSendSuccess(topic, 
(MessageIdImpl) msgId, es);
-                    }).exceptionally((e) -> {
-                        if (streamConfigLogMetric != null) {
-                            
streamConfigLogMetric.updateConfigLog(inlongGroupId,
-                                    inlongStreamId, 
StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
-                                    ConfigLogTypeEnum.ERROR, e.toString());
-                        }
+                        sendMessageCallBack.handleMessageSendSuccess(topic, 
msgId, es);
+                    })
+                    .exceptionally((e) -> {
                         forCallBackP.setCanUseSend(false);
                         sendMessageCallBack.handleMessageSendException(topic, 
es, e);
                         return null;
@@ -297,12 +275,6 @@ public class PulsarClientService {
                 callBack.handleCreateClientSuccess(info.getKey());
             } catch (PulsarClientException e) {
                 callBack.handleCreateClientException(info.getKey());
-                if (streamConfigLogMetric != null) {
-                    streamConfigLogMetric.updateConfigLog("DataProxyGlobal",
-                            "DataProxyGlobal",
-                            StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
-                            ConfigLogTypeEnum.ERROR, e.toString());
-                }
                 logger.error("create connection error in Pulsar sink, "
                         + "maybe pulsar master set error, please re-check. url 
" + info.getKey(), e);
             } catch (Throwable e) {
@@ -358,15 +330,14 @@ public class PulsarClientService {
 
     private TopicProducerInfo getProducerInfo(int poolIndex, String topic, 
String inlongGroupId,
             String inlongStreamId) {
-        List<TopicProducerInfo> producerList =
-                initTopicProducer(topic, inlongGroupId, inlongStreamId);
+        List<TopicProducerInfo> producerList = initTopicProducer(topic, 
inlongGroupId, inlongStreamId);
         AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) 
-> new AtomicLong(0));
         int maxTryToGetProducer = producerList == null ? 0 : 
producerList.size();
         if (maxTryToGetProducer == 0) {
             return null;
         }
         int retryTime = 0;
-        TopicProducerInfo p = null;
+        TopicProducerInfo p;
         do {
             int index = (int) (topicIndex.getAndIncrement() % 
maxTryToGetProducer);
             p = producerList.get(index);
@@ -410,7 +381,7 @@ public class PulsarClientService {
     /**
      * close pulsarClients(the related url is removed); start pulsarClients 
for new url, and create producers for them
      *
-     * @param callBack
+     * @param callBack callback
      * @param needToClose url-token map
      * @param needToStart url-token map
      * @param topicSet for new pulsarClient, create these topics' producers
@@ -453,11 +424,11 @@ public class PulsarClientService {
 
             } catch (PulsarClientException e) {
                 callBack.handleCreateClientException(url);
-                logger.error("create connnection error in pulsar sink, "
+                logger.error("create connection error in pulsar sink, "
                         + "maybe pulsar master set error, please re-check.url 
" + url, e);
             } catch (Throwable e) {
                 callBack.handleCreateClientException(url);
-                logger.error("create connnection error in pulsar sink, "
+                logger.error("create connection error in pulsar sink, "
                         + "maybe pulsar master set error/shutdown in progress, 
please "
                         + "re-check. url " + url, e);
             }
@@ -496,16 +467,11 @@ public class PulsarClientService {
 
     class TopicProducerInfo {
 
+        private final Producer[] producers;
+        private final PulsarClient pulsarClient;
+        private final int sinkThreadPoolSize;
+        private final String topic;
         private long lastSendMsgErrorTime;
-
-        private Producer[] producers;
-
-        private PulsarClient pulsarClient;
-
-        private int sinkThreadPoolSize;
-
-        private String topic;
-
         private volatile Boolean isCanUseSend = true;
 
         private volatile Boolean isFinishInit = false;
@@ -536,13 +502,6 @@ public class PulsarClientService {
                         producers[i].closeAsync();
                     }
                 }
-                if (streamConfigLogMetric != null
-                        && StringUtils.isNotEmpty(inlongGroupId)
-                        && StringUtils.isNotEmpty(inlongStreamId)) {
-                    streamConfigLogMetric.updateConfigLog(inlongGroupId,
-                            inlongStreamId, 
StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
-                            ConfigLogTypeEnum.ERROR, e.toString());
-                }
             }
         }
 

Reply via email to