This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new da2c602f2 [INLONG-4056][DataProxy] Return the response to the 
SDK/Agent after saving the event to the cache cluster (#4064)
da2c602f2 is described below

commit da2c602f2520d0042782d59b5e78addb4093072c
Author: 卢春亮 <[email protected]>
AuthorDate: Mon May 23 14:21:45 2022 +0800

    [INLONG-4056][DataProxy] Return the response to the SDK/Agent after saving 
the event to the cache cluster (#4064)
---
 .../dataproxy/config/RemoteConfigManager.java      |  41 ++++----
 .../config/holder/CommonPropertiesHolder.java      |  63 +++++++++++-
 .../inlong/dataproxy/dispatch/DispatchManager.java |  51 ++++++++--
 .../inlong/dataproxy/dispatch/DispatchProfile.java |  48 ++++++++-
 .../dispatch/DispatchProfileCallback.java          |  61 +++++++++++
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |  17 ++--
 .../apache/inlong/dataproxy/node/Application.java  |  33 +++++-
 .../sink/kafkazone/KafkaClusterProducer.java       |  28 ++---
 .../dataproxy/sink/kafkazone/KafkaZoneSink.java    |  32 +++---
 .../sink/kafkazone/KafkaZoneSinkContext.java       |  23 ++++-
 .../sink/pulsarzone/PulsarClusterProducer.java     |  37 ++++---
 .../dataproxy/sink/pulsarzone/PulsarZoneSink.java  |  33 +++---
 .../sink/pulsarzone/PulsarZoneSinkContext.java     |  23 ++++-
 .../sink/tubezone/TubeClusterProducer.java         |  26 +++--
 .../dataproxy/sink/tubezone/TubeZoneSink.java      |  32 +++---
 .../sink/tubezone/TubeZoneSinkContext.java         |  23 ++++-
 .../source/tcp/InlongTcpChannelHandler.java        |  65 +++++++++++-
 .../source/tcp/InlongTcpSourceCallback.java        | 113 +++++++++++++++++++++
 inlong-dataproxy/pom.xml                           |   6 ++
 .../sdk/commons/admin/AdminHttpSourceHandler.java  |   2 +-
 .../inlong/sdk/commons/protocol/ProxyEvent.java    |   5 +-
 .../sdk/commons/protocol/ProxyPackEvent.java       | 106 +++++++++++++++++++
 .../sdk/commons/protocol/SourceCallback.java       |  28 +++++
 23 files changed, 749 insertions(+), 147 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 7611d3057..861e364e6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -17,17 +17,7 @@
 
 package org.apache.inlong.dataproxy.config;
 
-import java.security.SecureRandom;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import com.google.gson.Gson;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
@@ -50,10 +40,21 @@ import 
org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
 import org.apache.inlong.common.pojo.dataproxy.ProxySink;
 import org.apache.inlong.common.pojo.dataproxy.ProxySource;
 import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.Gson;
+import java.security.SecureRandom;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * RemoteConfigManager
@@ -102,12 +103,10 @@ public class RemoteConfigManager implements IRepository {
             if (!isInit) {
                 instance = new RemoteConfigManager();
                 try {
-                    String strReloadInterval = 
ConfigManager.getInstance().getCommonProperties()
-                            .get(KEY_CONFIG_CHECK_INTERVAL);
+                    String strReloadInterval = 
CommonPropertiesHolder.getString(KEY_CONFIG_CHECK_INTERVAL);
                     instance.reloadInterval = 
NumberUtils.toLong(strReloadInterval, DEFAULT_HEARTBEAT_INTERVAL_MS);
                     //
-                    String ipListParserType = 
ConfigManager.getInstance().getCommonProperties()
-                            .get(IManagerIpListParser.KEY_MANAGER_TYPE);
+                    String ipListParserType = 
CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE);
                     Class<? extends IManagerIpListParser> ipListParserClass;
                     ipListParserClass = (Class<? extends 
IManagerIpListParser>) Class
                             .forName(ipListParserType);
@@ -134,13 +133,13 @@ public class RemoteConfigManager implements IRepository {
      */
     public void reload() {
         LOGGER.info("start to reload config.");
-        String proxyClusterName = 
ConfigManager.getInstance().getCommonProperties().get(KEY_PROXY_CLUSTER_NAME);
-        String setName = 
ConfigManager.getInstance().getCommonProperties().get(KEY_SET_NAME);
+        String proxyClusterName = 
CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
+        String setName = CommonPropertiesHolder.getString(KEY_SET_NAME);
         if (StringUtils.isBlank(proxyClusterName) || 
StringUtils.isBlank(setName)) {
             return;
         }
         //
-        
this.ipListParser.setCommonProperties(ConfigManager.getInstance().getCommonProperties());
+        this.ipListParser.setCommonProperties(CommonPropertiesHolder.get());
         List<String> managerIpList = this.ipListParser.getIpList();
         if (managerIpList == null || managerIpList.size() == 0) {
             return;
@@ -255,7 +254,7 @@ public class RemoteConfigManager implements IRepository {
         if (currentClusterConfig != null) {
             return currentClusterConfig.getProxyCluster().getName();
         }
-        return 
ConfigManager.getInstance().getCommonProperties().get(KEY_PROXY_CLUSTER_NAME);
+        return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
     }
 
     /**
@@ -268,7 +267,7 @@ public class RemoteConfigManager implements IRepository {
         if (currentClusterConfig != null) {
             return currentClusterConfig.getProxyCluster().getSetName();
         }
-        return 
ConfigManager.getInstance().getCommonProperties().get(KEY_SET_NAME);
+        return CommonPropertiesHolder.getString(KEY_SET_NAME);
     }
 
     /**
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 592ad10b1..a811db310 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -17,10 +17,8 @@
 
 package org.apache.inlong.dataproxy.config.holder;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.flume.Context;
 import 
org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
 import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
@@ -28,6 +26,9 @@ import 
org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * 
  * CommonPropertiesHolder
@@ -38,10 +39,16 @@ public class CommonPropertiesHolder {
     public static final String KEY_COMMON_PROPERTIES = 
"common-properties-loader";
     public static final String DEFAULT_LOADER = 
ClassResourceCommonPropertiesLoader.class.getName();
     public static final String KEY_CLUSTER_ID = "proxy.cluster.name";
+    public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
+    public static final boolean DEFAULT_RESPONSE_AFTER_SAVE = false;
+    public static final String KEY_MAX_RESPONSE_TIMEOUT_MS = 
"maxResponseTimeoutMs";
+    public static final long DEFAULT_MAX_RESPONSE_TIMEOUT_MS = 10000L;
 
     private static Map<String, String> props;
 
     private static long auditFormatInterval = 60000L;
+    private static boolean isResponseAfterSave = DEFAULT_RESPONSE_AFTER_SAVE;
+    private static long maxResponseTimeout = DEFAULT_MAX_RESPONSE_TIMEOUT_MS;
 
     /**
      * init
@@ -61,6 +68,10 @@ public class CommonPropertiesHolder {
                         LOG.info("loaderClass:{},properties:{}", 
loaderClassName, props);
                         auditFormatInterval = NumberUtils
                                 
.toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
+                        isResponseAfterSave = BooleanUtils
+                                
.toBoolean(CommonPropertiesHolder.getString(KEY_RESPONSE_AFTER_SAVE));
+                        maxResponseTimeout = 
CommonPropertiesHolder.getLong(KEY_MAX_RESPONSE_TIMEOUT_MS,
+                                DEFAULT_MAX_RESPONSE_TIMEOUT_MS);
                     }
                 } catch (Throwable t) {
                     LOG.error("Fail to init 
CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -150,6 +161,36 @@ public class CommonPropertiesHolder {
         return getInteger(key, null);
     }
 
+    /**
+     * Gets value mapped to key, returning defaultValue if unmapped.
+     * 
+     * @param  key          to be found
+     * @param  defaultValue returned if key is unmapped
+     * @return              value associated with key
+     */
+    public static Long getLong(String key, Long defaultValue) {
+        String value = get().get(key);
+        if (value != null) {
+            return Long.valueOf(Long.parseLong(value.trim()));
+        }
+        return defaultValue;
+    }
+
+    /**
+     * Gets value mapped to key, returning null if unmapped.
+     * <p>
+     * Note that this method returns an object as opposed to a primitive. The 
configuration key requested may not be
+     * mapped to a value and by returning the primitive object wrapper we can 
return null. If the key does not exist the
+     * return value of this method is assigned directly to a primitive, a 
{@link NullPointerException} will be thrown.
+     * </p>
+     * 
+     * @param  key to be found
+     * @return     value associated with key or null if unmapped
+     */
+    public static Long getLong(String key) {
+        return getLong(key, null);
+    }
+
     /**
      * getAuditFormatInterval
      * 
@@ -159,4 +200,20 @@ public class CommonPropertiesHolder {
         return auditFormatInterval;
     }
 
+    /**
+     * isResponseAfterSave
+     * @return
+     */
+    public static boolean isResponseAfterSave() {
+        return isResponseAfterSave;
+    }
+
+    /**
+     * get maxResponseTimeout
+     * @return the maxResponseTimeout
+     */
+    public static long getMaxResponseTimeout() {
+        return maxResponseTimeout;
+    }
+
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
index 57b365e88..1c4320fd7 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
@@ -17,6 +17,12 @@
 
 package org.apache.inlong.dataproxy.dispatch;
 
+import org.apache.flume.Context;
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -25,11 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.flume.Context;
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * DispatchManager
  */
@@ -97,6 +98,40 @@ public class DispatchManager {
         inCounter.incrementAndGet();
     }
 
+    /**
+     * addPackEvent
+     * @param packEvent
+     */
+    public void addPackEvent(ProxyPackEvent packEvent) {
+        String eventUid = packEvent.getUid();
+        long dispatchTime = packEvent.getMsgTime() - packEvent.getMsgTime() % 
MINUTE_MS;
+        DispatchProfile dispatchProfile = new DispatchProfile(eventUid, 
packEvent.getInlongGroupId(),
+                packEvent.getInlongStreamId(), dispatchTime);
+        // callback
+        DispatchProfileCallback callback = new 
DispatchProfileCallback(packEvent.getEvents().size(),
+                packEvent.getCallback());
+        dispatchProfile.setCallback(callback);
+        // offer queue
+        for (ProxyEvent event : packEvent.getEvents()) {
+            inCounter.incrementAndGet();
+            boolean addResult = dispatchProfile.addEvent(event, maxPackCount, 
maxPackSize);
+            // dispatch profile is full
+            if (!addResult) {
+                outCounter.addAndGet(dispatchProfile.getCount());
+                this.dispatchQueue.offer(dispatchProfile);
+                dispatchProfile = new DispatchProfile(eventUid, 
event.getInlongGroupId(), event.getInlongStreamId(),
+                        dispatchTime);
+                dispatchProfile.setCallback(callback);
+                dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
+            }
+        }
+        // last dispatch profile
+        if (dispatchProfile.getEvents().size() > 0) {
+            outCounter.addAndGet(dispatchProfile.getCount());
+            this.dispatchQueue.offer(dispatchProfile);
+        }
+    }
+
     /**
      * outputOvertimeData
      * 
@@ -124,13 +159,13 @@ public class DispatchManager {
         removeKeys.forEach((key) -> {
             DispatchProfile dispatchProfile = this.profileCache.remove(key);
             if (dispatchProfile != null) {
-            dispatchQueue.offer(dispatchProfile);
-            outCounter.addAndGet(dispatchProfile.getCount());
+                dispatchQueue.offer(dispatchProfile);
+                outCounter.addAndGet(dispatchProfile.getCount());
             }
         });
         LOG.info("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
                 + "inCounter:{},outCounter:{}",
-                profileCache.size(), dispatchQueue.size(), eventCount, 
+                profileCache.size(), dispatchQueue.size(), eventCount,
                 inCounter.getAndSet(0), outCounter.getAndSet(0));
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
index e6c28014c..bf0f3fde1 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.dataproxy.dispatch;
 
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-
 /**
  * 
  * DispatchProfile
@@ -36,6 +36,7 @@ public class DispatchProfile {
     private long count = 0;
     private long size = 0;
     private long dispatchTime;
+    private DispatchProfileCallback callback;
 
     /**
      * Constructor
@@ -171,4 +172,47 @@ public class DispatchProfile {
         return dispatchTime;
     }
 
+    /**
+     * ack
+     */
+    public void ack() {
+        if (callback != null) {
+            callback.ack(this.events.size());
+        }
+    }
+
+    /**
+     * fail
+     * @return
+     */
+    public void fail() {
+        if (callback != null) {
+            callback.fail();
+        }
+    }
+
+    /**
+     * isResend
+     * @return
+     */
+    public boolean isResend() {
+        return callback == null;
+    }
+
+    /**
+     * get callback
+     * @return the callback
+     */
+    public DispatchProfileCallback getCallback() {
+        return callback;
+    }
+
+    /**
+     * set callback
+     * @param callback the callback to set
+     */
+    public void setCallback(DispatchProfileCallback callback) {
+        this.callback = callback;
+    }
+
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfileCallback.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfileCallback.java
new file mode 100644
index 000000000..92db5911d
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfileCallback.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.dispatch;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+import org.apache.inlong.sdk.commons.protocol.SourceCallback;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * DispatchProfileCallback
+ * 
+ */
+public class DispatchProfileCallback {
+
+    private AtomicInteger ackingCount;
+    private SourceCallback callback;
+
+    /**
+     * Constructor
+     * @param totalCount
+     * @param callback
+     */
+    public DispatchProfileCallback(int totalCount, SourceCallback callback) {
+        this.ackingCount = new AtomicInteger(totalCount);
+        this.callback = callback;
+    }
+
+    /**
+     * ack
+     * @param eventCount
+     */
+    public void ack(int eventCount) {
+        int currentCount = this.ackingCount.addAndGet(-eventCount);
+        if (currentCount <= 0) {
+            this.callback.callback(ResultCode.SUCCUSS);
+        }
+    }
+
+    /**
+     * fail
+     */
+    public void fail() {
+        this.callback.callback(ResultCode.ERR_REJECT);
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 2ecd55811..21ed8090e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -17,26 +17,26 @@
 
 package org.apache.inlong.dataproxy.metrics.audit;
 
-import java.util.HashSet;
-import java.util.Map;
-
 import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Event;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.audit.util.AuditConfig;
-import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
 
+import java.util.HashSet;
+import java.util.Map;
+
 /**
  * 
  * AuditUtils
  */
 public class AuditUtils {
+
     public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
     public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
     public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
@@ -54,10 +54,10 @@ public class AuditUtils {
      */
     public static void initAudit() {
         // IS_AUDIT
-        IS_AUDIT = 
BooleanUtils.toBoolean(ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_IS_AUDIT));
+        IS_AUDIT = 
BooleanUtils.toBoolean(CommonPropertiesHolder.getString(AUDIT_KEY_IS_AUDIT));
         if (IS_AUDIT) {
             // AuditProxy
-            String strIpPorts = 
ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_PROXYS);
+            String strIpPorts = 
CommonPropertiesHolder.getString(AUDIT_KEY_PROXYS);
             HashSet<String> proxys = new HashSet<>();
             if (!StringUtils.isBlank(strIpPorts)) {
                 String[] ipPorts = strIpPorts.split("\\s+");
@@ -67,10 +67,9 @@ public class AuditUtils {
             }
             AuditImp.getInstance().setAuditProxy(proxys);
             // AuditConfig
-            String filePath = 
ConfigManager.getInstance().getCommonProperties().getOrDefault(AUDIT_KEY_FILE_PATH,
-                    AUDIT_DEFAULT_FILE_PATH);
+            String filePath = 
CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
             int maxCacheRow = NumberUtils.toInt(
-                    
ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_MAX_CACHE_ROWS),
+                    CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
                     AUDIT_DEFAULT_MAX_CACHE_ROWS);
             AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
             AuditImp.getInstance().setAuditConfig(auditConfig);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index 31c51ce3d..42d9adb91 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -48,7 +48,6 @@ import 
org.apache.flume.node.StaticZooKeeperConfigurationProvider;
 import org.apache.flume.util.SSLUtil;
 import org.apache.inlong.common.config.IDataProxyConfigHolder;
 import org.apache.inlong.common.metric.MetricObserver;
-import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.RemoteConfigManager;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -85,15 +84,25 @@ public class Application {
     private final ReentrantLock lifecycleLock = new ReentrantLock();
     private AdminTask adminTask;
 
+    /**
+     * Constructor
+     */
     public Application() {
         this(new ArrayList<LifecycleAware>(0));
     }
 
+    /**
+     * Constructor
+     * @param components
+     */
     public Application(List<LifecycleAware> components) {
         this.components = components;
         supervisor = new LifecycleSupervisor();
     }
 
+    /**
+     * start
+     */
     public void start() {
         lifecycleLock.lock();
         try {
@@ -115,6 +124,10 @@ public class Application {
         }
     }
 
+    /**
+     * handleConfigurationEvent
+     * @param conf
+     */
     @Subscribe
     public void handleConfigurationEvent(MaterializedConfiguration conf) {
         try {
@@ -132,6 +145,9 @@ public class Application {
         }
     }
 
+    /**
+     * stop
+     */
     public void stop() {
         lifecycleLock.lock();
         stopAllComponents();
@@ -149,6 +165,9 @@ public class Application {
         }
     }
 
+    /**
+     * stopAllComponents
+     */
     private void stopAllComponents() {
         if (this.materializedConfiguration != null) {
             logger.info("Shutting down configuration: {}", 
this.materializedConfiguration);
@@ -187,6 +206,10 @@ public class Application {
         }
     }
 
+    /**
+     * startAllComponents
+     * @param materializedConfiguration
+     */
     private void startAllComponents(MaterializedConfiguration 
materializedConfiguration) {
         logger.info("Starting new configuration:{}", 
materializedConfiguration);
 
@@ -244,6 +267,9 @@ public class Application {
         this.loadMonitoring();
     }
 
+    /**
+     * loadMonitoring
+     */
     @SuppressWarnings("unchecked")
     private void loadMonitoring() {
         Properties systemProps = System.getProperties();
@@ -403,7 +429,7 @@ public class Application {
                 }
             }
             // metrics
-            
MetricObserver.init(ConfigManager.getInstance().getCommonProperties());
+            MetricObserver.init(CommonPropertiesHolder.get());
             // audit
             AuditUtils.initAudit();
 
@@ -431,8 +457,7 @@ public class Application {
      * @param commandLine
      */
     private static void startByManagerConf(CommandLine commandLine) {
-        String proxyName = ConfigManager.getInstance().getCommonProperties()
-                .get(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+        String proxyName = 
CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
         ManagerPropertiesConfigurationProvider configurationProvider = new 
ManagerPropertiesConfigurationProvider(
                 proxyName);
         Application application = new Application();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
index 0d6daace1..ff21d563b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
@@ -17,13 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.kafkazone;
 
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.flume.Context;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -40,6 +33,13 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
 /**
  * KafkaClusterProducer
  */
@@ -128,8 +128,7 @@ public class KafkaClusterProducer implements LifecycleAware 
{
             }
             // create producer failed
             if (producer == null) {
-                sinkContext.getDispatchQueue().offer(event);
-                sinkContext.addSendResultMetric(event, topic, false, 0);
+                sinkContext.processSendFail(event, topic, 0);
                 return false;
             }
             // headers
@@ -154,10 +153,14 @@ public class KafkaClusterProducer implements 
LifecycleAware {
                     if (ex != null) {
                         LOG.error("Send fail:{}", ex.getMessage());
                         LOG.error(ex.getMessage(), ex);
-                        sinkContext.getDispatchQueue().offer(event);
-                        sinkContext.addSendResultMetric(event, topic, false, 
sendTime);
+                        if (event.isResend()) {
+                            sinkContext.processSendFail(event, topic, 
sendTime);
+                        } else {
+                            event.fail();
+                        }
                     } else {
                         sinkContext.addSendResultMetric(event, topic, true, 
sendTime);
+                        event.ack();
                     }
                 }
             };
@@ -165,8 +168,7 @@ public class KafkaClusterProducer implements LifecycleAware 
{
             return true;
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
-            sinkContext.getDispatchQueue().offer(event);
-            sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+            sinkContext.processSendFail(event, event.getUid(), 0);
             return false;
         }
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
index 70b8d82b9..a6c344089 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
@@ -17,13 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.kafkazone;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -34,9 +27,17 @@ import org.apache.flume.sink.AbstractSink;
 import org.apache.inlong.dataproxy.dispatch.DispatchManager;
 import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
  * KafkaZoneSink
  */
@@ -133,15 +134,22 @@ public class KafkaZoneSink extends AbstractSink 
implements Configurable {
                 tx.commit();
                 return Status.BACKOFF;
             }
-            if (!(event instanceof ProxyEvent)) {
+            // ProxyEvent
+            if (event instanceof ProxyEvent) {
+                ProxyEvent proxyEvent = (ProxyEvent) event;
+                this.dispatchManager.addEvent(proxyEvent);
+                tx.commit();
+                return Status.READY;
+            }
+            // ProxyPackEvent
+            if (event instanceof ProxyPackEvent) {
+                ProxyPackEvent packEvent = (ProxyPackEvent) event;
+                this.dispatchManager.addPackEvent(packEvent);
                 tx.commit();
-                this.context.addSendFailMetric();
                 return Status.READY;
             }
-            //
-            ProxyEvent proxyEvent = (ProxyEvent) event;
-            this.dispatchManager.addEvent(proxyEvent);
             tx.commit();
+            this.context.addSendFailMetric();
             return Status.READY;
         } catch (Throwable t) {
             LOG.error("Process event failed!" + this.getName(), t);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
index 99ee3d6d7..3c9b1d6ef 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.kafkazone;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -34,6 +30,10 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.SinkContext;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
 /**
  * 
  * KafkaZoneSinkContext
@@ -216,6 +216,21 @@ public class KafkaZoneSinkContext extends SinkContext {
         dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, 
inlongStreamId);
     }
 
+    /**
+     * processSendFail
+     * @param currentRecord
+     * @param producerTopic
+     * @param sendTime
+     */
+    public void processSendFail(DispatchProfile currentRecord, String 
producerTopic, long sendTime) {
+        if (currentRecord.isResend()) {
+            dispatchQueue.offer(currentRecord);
+            this.addSendResultMetric(currentRecord, producerTopic, false, 
sendTime);
+        } else {
+            currentRecord.fail();
+        }
+    }
+
     /**
      * addSendResultMetric
      * 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
index 835a863a9..7a13fbbc1 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -17,17 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.pulsarzone;
 
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
-import java.security.SecureRandom;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flume.Context;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -48,6 +37,17 @@ import org.apache.pulsar.client.api.SizeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
 /**
  * PulsarClusterProducer
  */
@@ -131,8 +131,8 @@ public class PulsarClusterProducer implements 
LifecycleAware {
                     
.connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
                     .build();
             this.baseBuilder = client.newProducer();
-//            Map<String, Object> builderConf = new HashMap<>();
-//            builderConf.putAll(context.getParameters());
+            // Map<String, Object> builderConf = new HashMap<>();
+            // builderConf.putAll(context.getParameters());
             this.baseBuilder
                     .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), 
TimeUnit.MILLISECONDS)
                     
.maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
@@ -222,6 +222,7 @@ public class PulsarClusterProducer implements 
LifecycleAware {
             String baseTopic = 
sinkContext.getIdTopicHolder().getTopic(event.getUid());
             if (baseTopic == null) {
                 sinkContext.addSendResultMetric(event, event.getUid(), false, 
0);
+                event.fail();
                 return false;
             }
             // get producer
@@ -251,8 +252,7 @@ public class PulsarClusterProducer implements 
LifecycleAware {
             }
             // create producer failed
             if (producer == null) {
-                sinkContext.getDispatchQueue().offer(event);
-                sinkContext.addSendResultMetric(event, producerTopic, false, 
0);
+                sinkContext.processSendFail(event, producerTopic, 0);
                 return false;
             }
             // headers
@@ -268,17 +268,16 @@ public class PulsarClusterProducer implements 
LifecycleAware {
                 if (ex != null) {
                     LOG.error("Send fail:{}", ex.getMessage());
                     LOG.error(ex.getMessage(), ex);
-                    sinkContext.getDispatchQueue().offer(event);
-                    sinkContext.addSendResultMetric(event, producerTopic, 
false, sendTime);
+                    sinkContext.processSendFail(event, producerTopic, 
sendTime);
                 } else {
                     sinkContext.addSendResultMetric(event, producerTopic, 
true, sendTime);
+                    event.ack();
                 }
             });
             return true;
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
-            sinkContext.getDispatchQueue().offer(event);
-            sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+            sinkContext.processSendFail(event, event.getUid(), 0);
             return false;
         }
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
index 1fa8448ef..75ec7718b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
@@ -17,13 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.pulsarzone;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -34,9 +27,17 @@ import org.apache.flume.sink.AbstractSink;
 import org.apache.inlong.dataproxy.dispatch.DispatchManager;
 import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
  * PulsarZoneSink
  */
@@ -129,19 +130,27 @@ public class PulsarZoneSink extends AbstractSink 
implements Configurable {
         tx.begin();
         try {
             Event event = channel.take();
+            // no data
             if (event == null) {
                 tx.commit();
                 return Status.BACKOFF;
             }
-            if (!(event instanceof ProxyEvent)) {
+            // ProxyEvent
+            if (event instanceof ProxyEvent) {
+                ProxyEvent proxyEvent = (ProxyEvent) event;
+                this.dispatchManager.addEvent(proxyEvent);
+                tx.commit();
+                return Status.READY;
+            }
+            // ProxyPackEvent
+            if (event instanceof ProxyPackEvent) {
+                ProxyPackEvent packEvent = (ProxyPackEvent) event;
+                this.dispatchManager.addPackEvent(packEvent);
                 tx.commit();
-                this.context.addSendFailMetric();
                 return Status.READY;
             }
-            //
-            ProxyEvent proxyEvent = (ProxyEvent) event;
-            this.dispatchManager.addEvent(proxyEvent);
             tx.commit();
+            this.context.addSendFailMetric();
             return Status.READY;
         } catch (Throwable t) {
             LOG.error("Process event failed!" + this.getName(), t);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
index e9ea54214..fd13e6df1 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.pulsarzone;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -34,6 +30,10 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.SinkContext;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
 /**
  * 
  * PulsarZoneSinkContext
@@ -216,6 +216,21 @@ public class PulsarZoneSinkContext extends SinkContext {
         dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, 
inlongStreamId);
     }
 
+    /**
+     * processSendFail
+     * @param currentRecord
+     * @param producerTopic
+     * @param sendTime
+     */
+    public void processSendFail(DispatchProfile currentRecord, String 
producerTopic, long sendTime) {
+        if (currentRecord.isResend()) {
+            dispatchQueue.offer(currentRecord);
+            this.addSendResultMetric(currentRecord, producerTopic, false, 
sendTime);
+        } else {
+            currentRecord.fail();
+        }
+    }
+
     /**
      * addSendResultMetric
      * 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
index 0b8c1a01b..505cb1639 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
@@ -17,14 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.tubezone;
 
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flume.Context;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -43,6 +35,14 @@ import org.apache.inlong.tubemq.corebase.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static 
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
 /**
  * TubeClusterProducer
  */
@@ -189,8 +189,7 @@ public class TubeClusterProducer implements LifecycleAware {
             }
             // create producer failed
             if (producer == null) {
-                sinkContext.getDispatchQueue().offer(event);
-                sinkContext.addSendResultMetric(event, topic, false, 0);
+                sinkContext.processSendFail(event, topic, 0);
                 return false;
             }
             // headers
@@ -210,22 +209,21 @@ public class TubeClusterProducer implements 
LifecycleAware {
                 @Override
                 public void onMessageSent(MessageSentResult result) {
                     sinkContext.addSendResultMetric(event, topic, true, 
sendTime);
+                    event.ack();
                 }
 
                 @Override
                 public void onException(Throwable ex) {
                     LOG.error("Send fail:{}", ex.getMessage());
                     LOG.error(ex.getMessage(), ex);
-                    sinkContext.getDispatchQueue().offer(event);
-                    sinkContext.addSendResultMetric(event, topic, false, 
sendTime);
+                    sinkContext.processSendFail(event, topic, sendTime);
                 }
             };
             producer.sendMessage(message, callback);
             return true;
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
-            sinkContext.getDispatchQueue().offer(event);
-            sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+            sinkContext.processSendFail(event, event.getUid(), 0);
             return false;
         }
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
index f16148ada..bdd1b37e6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
@@ -17,13 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.tubezone;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -34,9 +27,17 @@ import org.apache.flume.sink.AbstractSink;
 import org.apache.inlong.dataproxy.dispatch.DispatchManager;
 import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
  * TubeZoneSink
  */
@@ -133,15 +134,22 @@ public class TubeZoneSink extends AbstractSink implements 
Configurable {
                 tx.commit();
                 return Status.BACKOFF;
             }
-            if (!(event instanceof ProxyEvent)) {
+            // ProxyEvent
+            if (event instanceof ProxyEvent) {
+                ProxyEvent proxyEvent = (ProxyEvent) event;
+                this.dispatchManager.addEvent(proxyEvent);
+                tx.commit();
+                return Status.READY;
+            }
+            // ProxyPackEvent
+            if (event instanceof ProxyPackEvent) {
+                ProxyPackEvent packEvent = (ProxyPackEvent) event;
+                this.dispatchManager.addPackEvent(packEvent);
                 tx.commit();
-                this.context.addSendFailMetric();
                 return Status.READY;
             }
-            //
-            ProxyEvent proxyEvent = (ProxyEvent) event;
-            this.dispatchManager.addEvent(proxyEvent);
             tx.commit();
+            this.context.addSendFailMetric();
             return Status.READY;
         } catch (Throwable t) {
             LOG.error("Process event failed!" + this.getName(), t);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
index e26b37714..ba4141ff2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink.tubezone;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -34,6 +30,10 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.SinkContext;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
 /**
  * 
  * TubeZoneSinkContext
@@ -216,6 +216,21 @@ public class TubeZoneSinkContext extends SinkContext {
         dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, 
inlongStreamId);
     }
 
+    /**
+     * processSendFail
+     * @param currentRecord
+     * @param producerTopic
+     * @param sendTime
+     */
+    public void processSendFail(DispatchProfile currentRecord, String 
producerTopic, long sendTime) {
+        if (currentRecord.isResend()) {
+            dispatchQueue.offer(currentRecord);
+            this.addSendResultMetric(currentRecord, producerTopic, false, 
sendTime);
+        } else {
+            currentRecord.fail();
+        }
+    }
+
     /**
      * addSendResultMetric
      * 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
index 6dccd660f..4a1c6f59b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
@@ -18,11 +18,13 @@
 package org.apache.inlong.dataproxy.source.tcp;
 
 import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.SourceContext;
 import org.apache.inlong.sdk.commons.protocol.EventUtils;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePack;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
@@ -33,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -98,9 +101,9 @@ public class InlongTcpChannelHandler extends 
ChannelInboundHandlerAdapter {
             // save index, reset it if buffer is not satisfied.
             cb.markReaderIndex();
             int totalPackLength = cb.readInt();
-            cb.resetReaderIndex();
             if (readableLength < totalPackLength + LENGTH_PARAM_LENGTH) {
                 // reset index.
+                cb.resetReaderIndex();
                 this.addMetric(false, 0, null);
                 throw new Exception("err msg, channel buffer is not satisfied, 
and  readableLength="
                         + readableLength + ", and totalPackLength=" + 
totalPackLength);
@@ -137,7 +140,65 @@ public class InlongTcpChannelHandler extends 
ChannelInboundHandlerAdapter {
         }
         // uncompress
         List<ProxyEvent> events = EventUtils.decodeSdkPack(packObject);
-        // topic
+        // response success if event size is zero
+        if (events.size() == 0) {
+            this.responsePackage(ctx, ResultCode.SUCCUSS, packObject);
+        }
+        // process
+        if (!CommonPropertiesHolder.isResponseAfterSave()) {
+            this.processAndResponse(ctx, packObject, events);
+        } else {
+            this.processAndWaitingSave(ctx, packObject, events);
+        }
+    }
+
+    /**
+     * processAndWaitingSave
+     * @param ctx
+     * @param packObject
+     * @param events
+     * @throws Exception
+     */
+    private void processAndWaitingSave(ChannelHandlerContext ctx, MessagePack 
packObject, List<ProxyEvent> events)
+            throws Exception {
+        MessagePackHeader header = packObject.getHeader();
+        InlongTcpSourceCallback callback = new InlongTcpSourceCallback(ctx, 
header);
+        String inlongGroupId = header.getInlongGroupId();
+        String inlongStreamId = header.getInlongStreamId();
+        ProxyPackEvent packEvent = new ProxyPackEvent(inlongGroupId, 
inlongStreamId, events, callback);
+        // put to channel
+        try {
+            
sourceContext.getSource().getChannelProcessor().processEvent(packEvent);
+            events.forEach(event -> {
+                this.addMetric(true, event.getBody().length, event);
+            });
+            boolean awaitResult = 
callback.getLatch().await(CommonPropertiesHolder.getMaxResponseTimeout(),
+                    TimeUnit.MILLISECONDS);
+            if (!awaitResult) {
+                if (!callback.getHasResponsed().getAndSet(true)) {
+                    this.responsePackage(ctx, ResultCode.ERR_REJECT, 
packObject);
+                }
+            }
+        } catch (Throwable ex) {
+            LOG.error("Process Controller Event error can't write event to 
channel.", ex);
+            events.forEach(event -> {
+                this.addMetric(false, event.getBody().length, event);
+            });
+            if (!callback.getHasResponsed().getAndSet(true)) {
+                this.responsePackage(ctx, ResultCode.ERR_REJECT, packObject);
+            }
+        }
+    }
+
+    /**
+     * processAndResponse
+     * @param ctx
+     * @param packObject
+     * @param events
+     * @throws Exception
+     */
+    private void processAndResponse(ChannelHandlerContext ctx, MessagePack 
packObject, List<ProxyEvent> events)
+            throws Exception {
         for (ProxyEvent event : events) {
             String uid = event.getUid();
             String topic = sourceContext.getIdHolder().getTopic(uid);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSourceCallback.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSourceCallback.java
new file mode 100644
index 000000000..eae335b5e
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSourceCallback.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source.tcp;
+
+import org.apache.inlong.sdk.commons.protocol.SourceCallback;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+/**
+ * InlongTcpEventCallback
+ * 
+ */
+public class InlongTcpSourceCallback implements SourceCallback {
+
+    public static final Logger LOG = 
LoggerFactory.getLogger(InlongTcpSourceCallback.class);
+
+    private final ChannelHandlerContext ctx;
+    private final MessagePackHeader header;
+    private final CountDownLatch latch;
+    private final AtomicBoolean hasResponsed = new AtomicBoolean(false);
+
+    /**
+     * Constructor
+     * @param ctx
+     * @param header
+     */
+    public InlongTcpSourceCallback(ChannelHandlerContext ctx, 
MessagePackHeader header) {
+        this.ctx = ctx;
+        this.header = header;
+        this.latch = new CountDownLatch(1);
+    }
+
+    /**
+     * callback
+     * @param resultCode
+     */
+    @Override
+    public void callback(ResultCode resultCode) {
+        // If DataProxy have sent timeout response to SDK, DataProxy do not 
send success response to SDK again when
+        // event is success to save.
+        if (this.hasResponsed.getAndSet(true)) {
+            return;
+        }
+        // response
+        try {
+            ResponseInfo.Builder builder = ResponseInfo.newBuilder();
+            builder.setResult(resultCode);
+            builder.setPackId(header.getPackId());
+
+            // encode
+            byte[] responseBytes = builder.build().toByteArray();
+            //
+            ByteBuf buffer = Unpooled.wrappedBuffer(responseBytes);
+            Channel remoteChannel = ctx.channel();
+            if (remoteChannel.isWritable()) {
+                remoteChannel.write(buffer);
+            } else {
+                LOG.warn("the send buffer2 is full, so disconnect it!"
+                        + "please check remote client; Connection info:{}",
+                        remoteChannel);
+                buffer.release();
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        } finally {
+            // notice TCP session
+            this.latch.countDown();
+        }
+    }
+
+    /**
+     * get hasResponsed
+     * @return the hasResponsed
+     */
+    public AtomicBoolean getHasResponsed() {
+        return hasResponsed;
+    }
+
+    /**
+     * get latch
+     * @return the latch
+     */
+    public CountDownLatch getLatch() {
+        return latch;
+    }
+
+}
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index e299431a7..7ae2d16eb 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -145,6 +145,12 @@
             <groupId>org.powermock</groupId>
             <artifactId>powermock-module-testng</artifactId>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.testng</groupId>
+                    <artifactId>testng</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.powermock</groupId>
diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
index 1ca5b7fde..1b5029d74 100644
--- 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
@@ -39,7 +39,7 @@ public interface AdminHttpSourceHandler extends Configurable {
      * <p>
      *
      * @param  request                 The request to be parsed into Flume 
events.
-     * @param  response
+     * @param  response                The response to be parsed into Flume 
events.
      * @return                         List of Flume events generated from the 
request.
      * @throws HTTPBadRequestException If the was not parsed correctly into an 
event because the request was not in the
      *                                 expected format.
diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
index a4d798b81..391ae9d5e 100644
--- 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -17,12 +17,11 @@
 
 package org.apache.inlong.sdk.commons.protocol;
 
-import java.util.Map;
-
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 
+import java.util.Map;
+
 /**
- * 
  * ProxyEvent
  */
 public class ProxyEvent extends SdkEvent {
diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyPackEvent.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyPackEvent.java
new file mode 100644
index 000000000..b3959684c
--- /dev/null
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyPackEvent.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+
+import java.util.List;
+
+/**
+ * ProxyPackEvent
+ */
+public class ProxyPackEvent extends SdkEvent {
+
+    protected long sourceTime;
+    private List<ProxyEvent> events;
+    private SourceCallback callback;
+
+    /**
+     * Constructor
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param events
+     * @param callback
+     */
+    public ProxyPackEvent(String inlongGroupId, String inlongStreamId, 
List<ProxyEvent> events,
+            SourceCallback callback) {
+        this.inlongGroupId = inlongGroupId;
+        this.inlongStreamId = inlongStreamId;
+        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        this.sourceTime = System.currentTimeMillis();
+        this.events = events;
+        this.callback = callback;
+    }
+
+    /**
+     * acknowledge
+     * @param resultCode
+     */
+    public void acknowledge(ResultCode resultCode) {
+        callback.callback(resultCode);
+    }
+
+    /**
+     * get sourceTime
+     * @return the sourceTime
+     */
+    public long getSourceTime() {
+        return sourceTime;
+    }
+
+    /**
+     * set sourceTime
+     * @param sourceTime the sourceTime to set
+     */
+    public void setSourceTime(long sourceTime) {
+        this.sourceTime = sourceTime;
+    }
+
+    /**
+     * get events
+     * @return the events
+     */
+    public List<ProxyEvent> getEvents() {
+        return events;
+    }
+
+    /**
+     * set events
+     * @param events the events to set
+     */
+    public void setEvents(List<ProxyEvent> events) {
+        this.events = events;
+    }
+
+    /**
+     * get callback
+     * @return the callback
+     */
+    public SourceCallback getCallback() {
+        return callback;
+    }
+
+    /**
+     * set callback
+     * @param callback the callback to set
+     */
+    public void setCallback(SourceCallback callback) {
+        this.callback = callback;
+    }
+
+}
diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SourceCallback.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SourceCallback.java
new file mode 100644
index 000000000..ecd5930d6
--- /dev/null
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SourceCallback.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+
+/**
+ * SourceCallback 
+ */
+public interface SourceCallback {
+
+    void callback(ResultCode resultCode);
+}

Reply via email to