This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new da2c602f2 [INLONG-4056][DataProxy] Return the response to the
SDK/Agent after saving the event to the cache cluster (#4064)
da2c602f2 is described below
commit da2c602f2520d0042782d59b5e78addb4093072c
Author: 卢春亮 <[email protected]>
AuthorDate: Mon May 23 14:21:45 2022 +0800
[INLONG-4056][DataProxy] Return the response to the SDK/Agent after saving
the event to the cache cluster (#4064)
---
.../dataproxy/config/RemoteConfigManager.java | 41 ++++----
.../config/holder/CommonPropertiesHolder.java | 63 +++++++++++-
.../inlong/dataproxy/dispatch/DispatchManager.java | 51 ++++++++--
.../inlong/dataproxy/dispatch/DispatchProfile.java | 48 ++++++++-
.../dispatch/DispatchProfileCallback.java | 61 +++++++++++
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 17 ++--
.../apache/inlong/dataproxy/node/Application.java | 33 +++++-
.../sink/kafkazone/KafkaClusterProducer.java | 28 ++---
.../dataproxy/sink/kafkazone/KafkaZoneSink.java | 32 +++---
.../sink/kafkazone/KafkaZoneSinkContext.java | 23 ++++-
.../sink/pulsarzone/PulsarClusterProducer.java | 37 ++++---
.../dataproxy/sink/pulsarzone/PulsarZoneSink.java | 33 +++---
.../sink/pulsarzone/PulsarZoneSinkContext.java | 23 ++++-
.../sink/tubezone/TubeClusterProducer.java | 26 +++--
.../dataproxy/sink/tubezone/TubeZoneSink.java | 32 +++---
.../sink/tubezone/TubeZoneSinkContext.java | 23 ++++-
.../source/tcp/InlongTcpChannelHandler.java | 65 +++++++++++-
.../source/tcp/InlongTcpSourceCallback.java | 113 +++++++++++++++++++++
inlong-dataproxy/pom.xml | 6 ++
.../sdk/commons/admin/AdminHttpSourceHandler.java | 2 +-
.../inlong/sdk/commons/protocol/ProxyEvent.java | 5 +-
.../sdk/commons/protocol/ProxyPackEvent.java | 106 +++++++++++++++++++
.../sdk/commons/protocol/SourceCallback.java | 28 +++++
23 files changed, 749 insertions(+), 147 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 7611d3057..861e364e6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -17,17 +17,7 @@
package org.apache.inlong.dataproxy.config;
-import java.security.SecureRandom;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
@@ -50,10 +40,21 @@ import
org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.common.pojo.dataproxy.ProxySink;
import org.apache.inlong.common.pojo.dataproxy.ProxySource;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
+import java.security.SecureRandom;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* RemoteConfigManager
@@ -102,12 +103,10 @@ public class RemoteConfigManager implements IRepository {
if (!isInit) {
instance = new RemoteConfigManager();
try {
- String strReloadInterval =
ConfigManager.getInstance().getCommonProperties()
- .get(KEY_CONFIG_CHECK_INTERVAL);
+ String strReloadInterval =
CommonPropertiesHolder.getString(KEY_CONFIG_CHECK_INTERVAL);
instance.reloadInterval =
NumberUtils.toLong(strReloadInterval, DEFAULT_HEARTBEAT_INTERVAL_MS);
//
- String ipListParserType =
ConfigManager.getInstance().getCommonProperties()
- .get(IManagerIpListParser.KEY_MANAGER_TYPE);
+ String ipListParserType =
CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE);
Class<? extends IManagerIpListParser> ipListParserClass;
ipListParserClass = (Class<? extends
IManagerIpListParser>) Class
.forName(ipListParserType);
@@ -134,13 +133,13 @@ public class RemoteConfigManager implements IRepository {
*/
public void reload() {
LOGGER.info("start to reload config.");
- String proxyClusterName =
ConfigManager.getInstance().getCommonProperties().get(KEY_PROXY_CLUSTER_NAME);
- String setName =
ConfigManager.getInstance().getCommonProperties().get(KEY_SET_NAME);
+ String proxyClusterName =
CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
+ String setName = CommonPropertiesHolder.getString(KEY_SET_NAME);
if (StringUtils.isBlank(proxyClusterName) ||
StringUtils.isBlank(setName)) {
return;
}
//
-
this.ipListParser.setCommonProperties(ConfigManager.getInstance().getCommonProperties());
+ this.ipListParser.setCommonProperties(CommonPropertiesHolder.get());
List<String> managerIpList = this.ipListParser.getIpList();
if (managerIpList == null || managerIpList.size() == 0) {
return;
@@ -255,7 +254,7 @@ public class RemoteConfigManager implements IRepository {
if (currentClusterConfig != null) {
return currentClusterConfig.getProxyCluster().getName();
}
- return
ConfigManager.getInstance().getCommonProperties().get(KEY_PROXY_CLUSTER_NAME);
+ return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
}
/**
@@ -268,7 +267,7 @@ public class RemoteConfigManager implements IRepository {
if (currentClusterConfig != null) {
return currentClusterConfig.getProxyCluster().getSetName();
}
- return
ConfigManager.getInstance().getCommonProperties().get(KEY_SET_NAME);
+ return CommonPropertiesHolder.getString(KEY_SET_NAME);
}
/**
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 592ad10b1..a811db310 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -17,10 +17,8 @@
package org.apache.inlong.dataproxy.config.holder;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang3.BooleanUtils;
import org.apache.flume.Context;
import
org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
@@ -28,6 +26,9 @@ import
org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
*
* CommonPropertiesHolder
@@ -38,10 +39,16 @@ public class CommonPropertiesHolder {
public static final String KEY_COMMON_PROPERTIES =
"common-properties-loader";
public static final String DEFAULT_LOADER =
ClassResourceCommonPropertiesLoader.class.getName();
public static final String KEY_CLUSTER_ID = "proxy.cluster.name";
+ public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
+ public static final boolean DEFAULT_RESPONSE_AFTER_SAVE = false;
+ public static final String KEY_MAX_RESPONSE_TIMEOUT_MS =
"maxResponseTimeoutMs";
+ public static final long DEFAULT_MAX_RESPONSE_TIMEOUT_MS = 10000L;
private static Map<String, String> props;
private static long auditFormatInterval = 60000L;
+ private static boolean isResponseAfterSave = DEFAULT_RESPONSE_AFTER_SAVE;
+ private static long maxResponseTimeout = DEFAULT_MAX_RESPONSE_TIMEOUT_MS;
/**
* init
@@ -61,6 +68,10 @@ public class CommonPropertiesHolder {
LOG.info("loaderClass:{},properties:{}",
loaderClassName, props);
auditFormatInterval = NumberUtils
.toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
+ isResponseAfterSave = BooleanUtils
+
.toBoolean(CommonPropertiesHolder.getString(KEY_RESPONSE_AFTER_SAVE));
+ maxResponseTimeout =
CommonPropertiesHolder.getLong(KEY_MAX_RESPONSE_TIMEOUT_MS,
+ DEFAULT_MAX_RESPONSE_TIMEOUT_MS);
}
} catch (Throwable t) {
LOG.error("Fail to init
CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -150,6 +161,36 @@ public class CommonPropertiesHolder {
return getInteger(key, null);
}
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public static Long getLong(String key, Long defaultValue) {
+ String value = get().get(key);
+ if (value != null) {
+ return Long.valueOf(Long.parseLong(value.trim()));
+ }
+ return defaultValue;
+ }
+
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ * <p>
+ * Note that this method returns an object as opposed to a primitive. The
configuration key requested may not be
+ * mapped to a value and by returning the primitive object wrapper we can
return null. If the key does not exist the
+ * return value of this method is assigned directly to a primitive, a
{@link NullPointerException} will be thrown.
+ * </p>
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public static Long getLong(String key) {
+ return getLong(key, null);
+ }
+
/**
* getAuditFormatInterval
*
@@ -159,4 +200,20 @@ public class CommonPropertiesHolder {
return auditFormatInterval;
}
+ /**
+ * isResponseAfterSave
+ * @return
+ */
+ public static boolean isResponseAfterSave() {
+ return isResponseAfterSave;
+ }
+
+ /**
+ * get maxResponseTimeout
+ * @return the maxResponseTimeout
+ */
+ public static long getMaxResponseTimeout() {
+ return maxResponseTimeout;
+ }
+
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
index 57b365e88..1c4320fd7 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
@@ -17,6 +17,12 @@
package org.apache.inlong.dataproxy.dispatch;
+import org.apache.flume.Context;
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@@ -25,11 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.flume.Context;
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* DispatchManager
*/
@@ -97,6 +98,40 @@ public class DispatchManager {
inCounter.incrementAndGet();
}
+ /**
+ * addPackEvent
+ * @param packEvent
+ */
+ public void addPackEvent(ProxyPackEvent packEvent) {
+ String eventUid = packEvent.getUid();
+ long dispatchTime = packEvent.getMsgTime() - packEvent.getMsgTime() %
MINUTE_MS;
+ DispatchProfile dispatchProfile = new DispatchProfile(eventUid,
packEvent.getInlongGroupId(),
+ packEvent.getInlongStreamId(), dispatchTime);
+ // callback
+ DispatchProfileCallback callback = new
DispatchProfileCallback(packEvent.getEvents().size(),
+ packEvent.getCallback());
+ dispatchProfile.setCallback(callback);
+ // offer queue
+ for (ProxyEvent event : packEvent.getEvents()) {
+ inCounter.incrementAndGet();
+ boolean addResult = dispatchProfile.addEvent(event, maxPackCount,
maxPackSize);
+ // dispatch profile is full
+ if (!addResult) {
+ outCounter.addAndGet(dispatchProfile.getCount());
+ this.dispatchQueue.offer(dispatchProfile);
+ dispatchProfile = new DispatchProfile(eventUid,
event.getInlongGroupId(), event.getInlongStreamId(),
+ dispatchTime);
+ dispatchProfile.setCallback(callback);
+ dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
+ }
+ }
+ // last dispatch profile
+ if (dispatchProfile.getEvents().size() > 0) {
+ outCounter.addAndGet(dispatchProfile.getCount());
+ this.dispatchQueue.offer(dispatchProfile);
+ }
+ }
+
/**
* outputOvertimeData
*
@@ -124,13 +159,13 @@ public class DispatchManager {
removeKeys.forEach((key) -> {
DispatchProfile dispatchProfile = this.profileCache.remove(key);
if (dispatchProfile != null) {
- dispatchQueue.offer(dispatchProfile);
- outCounter.addAndGet(dispatchProfile.getCount());
+ dispatchQueue.offer(dispatchProfile);
+ outCounter.addAndGet(dispatchProfile.getCount());
}
});
LOG.info("end to outputOvertimeData
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+ "inCounter:{},outCounter:{}",
- profileCache.size(), dispatchQueue.size(), eventCount,
+ profileCache.size(), dispatchQueue.size(), eventCount,
inCounter.getAndSet(0), outCounter.getAndSet(0));
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
index e6c28014c..bf0f3fde1 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfile.java
@@ -17,11 +17,11 @@
package org.apache.inlong.dataproxy.dispatch;
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+
import java.util.ArrayList;
import java.util.List;
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-
/**
*
* DispatchProfile
@@ -36,6 +36,7 @@ public class DispatchProfile {
private long count = 0;
private long size = 0;
private long dispatchTime;
+ private DispatchProfileCallback callback;
/**
* Constructor
@@ -171,4 +172,47 @@ public class DispatchProfile {
return dispatchTime;
}
+ /**
+ * ack
+ */
+ public void ack() {
+ if (callback != null) {
+ callback.ack(this.events.size());
+ }
+ }
+
+ /**
+ * fail
+ * @return
+ */
+ public void fail() {
+ if (callback != null) {
+ callback.fail();
+ }
+ }
+
+ /**
+ * isResend
+ * @return
+ */
+ public boolean isResend() {
+ return callback == null;
+ }
+
+ /**
+ * get callback
+ * @return the callback
+ */
+ public DispatchProfileCallback getCallback() {
+ return callback;
+ }
+
+ /**
+ * set callback
+ * @param callback the callback to set
+ */
+ public void setCallback(DispatchProfileCallback callback) {
+ this.callback = callback;
+ }
+
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfileCallback.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfileCallback.java
new file mode 100644
index 000000000..92db5911d
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchProfileCallback.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.dispatch;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+import org.apache.inlong.sdk.commons.protocol.SourceCallback;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * DispatchProfileCallback
+ *
+ */
+public class DispatchProfileCallback {
+
+ private AtomicInteger ackingCount;
+ private SourceCallback callback;
+
+ /**
+ * Constructor
+ * @param totalCount
+ * @param callback
+ */
+ public DispatchProfileCallback(int totalCount, SourceCallback callback) {
+ this.ackingCount = new AtomicInteger(totalCount);
+ this.callback = callback;
+ }
+
+ /**
+ * ack
+ * @param eventCount
+ */
+ public void ack(int eventCount) {
+ int currentCount = this.ackingCount.addAndGet(-eventCount);
+ if (currentCount <= 0) {
+ this.callback.callback(ResultCode.SUCCUSS);
+ }
+ }
+
+ /**
+ * fail
+ */
+ public void fail() {
+ this.callback.callback(ResultCode.ERR_REJECT);
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 2ecd55811..21ed8090e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -17,26 +17,26 @@
package org.apache.inlong.dataproxy.metrics.audit;
-import java.util.HashSet;
-import java.util.Map;
-
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.audit.util.AuditConfig;
-import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
+import java.util.HashSet;
+import java.util.Map;
+
/**
*
* AuditUtils
*/
public class AuditUtils {
+
public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
@@ -54,10 +54,10 @@ public class AuditUtils {
*/
public static void initAudit() {
// IS_AUDIT
- IS_AUDIT =
BooleanUtils.toBoolean(ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_IS_AUDIT));
+ IS_AUDIT =
BooleanUtils.toBoolean(CommonPropertiesHolder.getString(AUDIT_KEY_IS_AUDIT));
if (IS_AUDIT) {
// AuditProxy
- String strIpPorts =
ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_PROXYS);
+ String strIpPorts =
CommonPropertiesHolder.getString(AUDIT_KEY_PROXYS);
HashSet<String> proxys = new HashSet<>();
if (!StringUtils.isBlank(strIpPorts)) {
String[] ipPorts = strIpPorts.split("\\s+");
@@ -67,10 +67,9 @@ public class AuditUtils {
}
AuditImp.getInstance().setAuditProxy(proxys);
// AuditConfig
- String filePath =
ConfigManager.getInstance().getCommonProperties().getOrDefault(AUDIT_KEY_FILE_PATH,
- AUDIT_DEFAULT_FILE_PATH);
+ String filePath =
CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
int maxCacheRow = NumberUtils.toInt(
-
ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_MAX_CACHE_ROWS),
+ CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
AUDIT_DEFAULT_MAX_CACHE_ROWS);
AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
AuditImp.getInstance().setAuditConfig(auditConfig);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index 31c51ce3d..42d9adb91 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -48,7 +48,6 @@ import
org.apache.flume.node.StaticZooKeeperConfigurationProvider;
import org.apache.flume.util.SSLUtil;
import org.apache.inlong.common.config.IDataProxyConfigHolder;
import org.apache.inlong.common.metric.MetricObserver;
-import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -85,15 +84,25 @@ public class Application {
private final ReentrantLock lifecycleLock = new ReentrantLock();
private AdminTask adminTask;
+ /**
+ * Constructor
+ */
public Application() {
this(new ArrayList<LifecycleAware>(0));
}
+ /**
+ * Constructor
+ * @param components
+ */
public Application(List<LifecycleAware> components) {
this.components = components;
supervisor = new LifecycleSupervisor();
}
+ /**
+ * start
+ */
public void start() {
lifecycleLock.lock();
try {
@@ -115,6 +124,10 @@ public class Application {
}
}
+ /**
+ * handleConfigurationEvent
+ * @param conf
+ */
@Subscribe
public void handleConfigurationEvent(MaterializedConfiguration conf) {
try {
@@ -132,6 +145,9 @@ public class Application {
}
}
+ /**
+ * stop
+ */
public void stop() {
lifecycleLock.lock();
stopAllComponents();
@@ -149,6 +165,9 @@ public class Application {
}
}
+ /**
+ * stopAllComponents
+ */
private void stopAllComponents() {
if (this.materializedConfiguration != null) {
logger.info("Shutting down configuration: {}",
this.materializedConfiguration);
@@ -187,6 +206,10 @@ public class Application {
}
}
+ /**
+ * startAllComponents
+ * @param materializedConfiguration
+ */
private void startAllComponents(MaterializedConfiguration
materializedConfiguration) {
logger.info("Starting new configuration:{}",
materializedConfiguration);
@@ -244,6 +267,9 @@ public class Application {
this.loadMonitoring();
}
+ /**
+ * loadMonitoring
+ */
@SuppressWarnings("unchecked")
private void loadMonitoring() {
Properties systemProps = System.getProperties();
@@ -403,7 +429,7 @@ public class Application {
}
}
// metrics
-
MetricObserver.init(ConfigManager.getInstance().getCommonProperties());
+ MetricObserver.init(CommonPropertiesHolder.get());
// audit
AuditUtils.initAudit();
@@ -431,8 +457,7 @@ public class Application {
* @param commandLine
*/
private static void startByManagerConf(CommandLine commandLine) {
- String proxyName = ConfigManager.getInstance().getCommonProperties()
- .get(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+ String proxyName =
CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
ManagerPropertiesConfigurationProvider configurationProvider = new
ManagerPropertiesConfigurationProvider(
proxyName);
Application application = new Application();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
index 0d6daace1..ff21d563b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaClusterProducer.java
@@ -17,13 +17,6 @@
package org.apache.inlong.dataproxy.sink.kafkazone;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
import org.apache.flume.Context;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
@@ -40,6 +33,13 @@ import
org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
/**
* KafkaClusterProducer
*/
@@ -128,8 +128,7 @@ public class KafkaClusterProducer implements LifecycleAware
{
}
// create producer failed
if (producer == null) {
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, topic, false, 0);
+ sinkContext.processSendFail(event, topic, 0);
return false;
}
// headers
@@ -154,10 +153,14 @@ public class KafkaClusterProducer implements
LifecycleAware {
if (ex != null) {
LOG.error("Send fail:{}", ex.getMessage());
LOG.error(ex.getMessage(), ex);
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, topic, false,
sendTime);
+ if (event.isResend()) {
+ sinkContext.processSendFail(event, topic,
sendTime);
+ } else {
+ event.fail();
+ }
} else {
sinkContext.addSendResultMetric(event, topic, true,
sendTime);
+ event.ack();
}
}
};
@@ -165,8 +168,7 @@ public class KafkaClusterProducer implements LifecycleAware
{
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+ sinkContext.processSendFail(event, event.getUid(), 0);
return false;
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
index 70b8d82b9..a6c344089 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSink.java
@@ -17,13 +17,6 @@
package org.apache.inlong.dataproxy.sink.kafkazone;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -34,9 +27,17 @@ import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.dataproxy.dispatch.DispatchManager;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* KafkaZoneSink
*/
@@ -133,15 +134,22 @@ public class KafkaZoneSink extends AbstractSink
implements Configurable {
tx.commit();
return Status.BACKOFF;
}
- if (!(event instanceof ProxyEvent)) {
+ // ProxyEvent
+ if (event instanceof ProxyEvent) {
+ ProxyEvent proxyEvent = (ProxyEvent) event;
+ this.dispatchManager.addEvent(proxyEvent);
+ tx.commit();
+ return Status.READY;
+ }
+ // ProxyPackEvent
+ if (event instanceof ProxyPackEvent) {
+ ProxyPackEvent packEvent = (ProxyPackEvent) event;
+ this.dispatchManager.addPackEvent(packEvent);
tx.commit();
- this.context.addSendFailMetric();
return Status.READY;
}
- //
- ProxyEvent proxyEvent = (ProxyEvent) event;
- this.dispatchManager.addEvent(proxyEvent);
tx.commit();
+ this.context.addSendFailMetric();
return Status.READY;
} catch (Throwable t) {
LOG.error("Process event failed!" + this.getName(), t);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
index 99ee3d6d7..3c9b1d6ef 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
@@ -17,10 +17,6 @@
package org.apache.inlong.dataproxy.sink.kafkazone;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -34,6 +30,10 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.SinkContext;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
/**
*
* KafkaZoneSinkContext
@@ -216,6 +216,21 @@ public class KafkaZoneSinkContext extends SinkContext {
dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID,
inlongStreamId);
}
+ /**
+ * processSendFail
+ * @param currentRecord
+ * @param producerTopic
+ * @param sendTime
+ */
+ public void processSendFail(DispatchProfile currentRecord, String
producerTopic, long sendTime) {
+ if (currentRecord.isResend()) {
+ dispatchQueue.offer(currentRecord);
+ this.addSendResultMetric(currentRecord, producerTopic, false,
sendTime);
+ } else {
+ currentRecord.fail();
+ }
+ }
+
/**
* addSendResultMetric
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
index 835a863a9..7a13fbbc1 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -17,17 +17,6 @@
package org.apache.inlong.dataproxy.sink.pulsarzone;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
-import java.security.SecureRandom;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
import org.apache.flume.Context;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
@@ -48,6 +37,17 @@ import org.apache.pulsar.client.api.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
/**
* PulsarClusterProducer
*/
@@ -131,8 +131,8 @@ public class PulsarClusterProducer implements
LifecycleAware {
.connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
.build();
this.baseBuilder = client.newProducer();
-// Map<String, Object> builderConf = new HashMap<>();
-// builderConf.putAll(context.getParameters());
+ // Map<String, Object> builderConf = new HashMap<>();
+ // builderConf.putAll(context.getParameters());
this.baseBuilder
.sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0),
TimeUnit.MILLISECONDS)
.maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
@@ -222,6 +222,7 @@ public class PulsarClusterProducer implements
LifecycleAware {
String baseTopic =
sinkContext.getIdTopicHolder().getTopic(event.getUid());
if (baseTopic == null) {
sinkContext.addSendResultMetric(event, event.getUid(), false,
0);
+ event.fail();
return false;
}
// get producer
@@ -251,8 +252,7 @@ public class PulsarClusterProducer implements
LifecycleAware {
}
// create producer failed
if (producer == null) {
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, producerTopic, false,
0);
+ sinkContext.processSendFail(event, producerTopic, 0);
return false;
}
// headers
@@ -268,17 +268,16 @@ public class PulsarClusterProducer implements
LifecycleAware {
if (ex != null) {
LOG.error("Send fail:{}", ex.getMessage());
LOG.error(ex.getMessage(), ex);
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, producerTopic,
false, sendTime);
+ sinkContext.processSendFail(event, producerTopic,
sendTime);
} else {
sinkContext.addSendResultMetric(event, producerTopic,
true, sendTime);
+ event.ack();
}
});
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+ sinkContext.processSendFail(event, event.getUid(), 0);
return false;
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
index 1fa8448ef..75ec7718b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSink.java
@@ -17,13 +17,6 @@
package org.apache.inlong.dataproxy.sink.pulsarzone;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -34,9 +27,17 @@ import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.dataproxy.dispatch.DispatchManager;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* PulsarZoneSink
*/
@@ -129,19 +130,27 @@ public class PulsarZoneSink extends AbstractSink
implements Configurable {
tx.begin();
try {
Event event = channel.take();
+ // no data
if (event == null) {
tx.commit();
return Status.BACKOFF;
}
- if (!(event instanceof ProxyEvent)) {
+ // ProxyEvent
+ if (event instanceof ProxyEvent) {
+ ProxyEvent proxyEvent = (ProxyEvent) event;
+ this.dispatchManager.addEvent(proxyEvent);
+ tx.commit();
+ return Status.READY;
+ }
+ // ProxyPackEvent
+ if (event instanceof ProxyPackEvent) {
+ ProxyPackEvent packEvent = (ProxyPackEvent) event;
+ this.dispatchManager.addPackEvent(packEvent);
tx.commit();
- this.context.addSendFailMetric();
return Status.READY;
}
- //
- ProxyEvent proxyEvent = (ProxyEvent) event;
- this.dispatchManager.addEvent(proxyEvent);
tx.commit();
+ this.context.addSendFailMetric();
return Status.READY;
} catch (Throwable t) {
LOG.error("Process event failed!" + this.getName(), t);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
index e9ea54214..fd13e6df1 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -17,10 +17,6 @@
package org.apache.inlong.dataproxy.sink.pulsarzone;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -34,6 +30,10 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.SinkContext;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
/**
*
* PulsarZoneSinkContext
@@ -216,6 +216,21 @@ public class PulsarZoneSinkContext extends SinkContext {
dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID,
inlongStreamId);
}
+ /**
+ * processSendFail
+ * @param currentRecord
+ * @param producerTopic
+ * @param sendTime
+ */
+ public void processSendFail(DispatchProfile currentRecord, String
producerTopic, long sendTime) {
+ if (currentRecord.isResend()) {
+ dispatchQueue.offer(currentRecord);
+ this.addSendResultMetric(currentRecord, producerTopic, false,
sendTime);
+ } else {
+ currentRecord.fail();
+ }
+ }
+
/**
* addSendResultMetric
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
index 0b8c1a01b..505cb1639 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.java
@@ -17,14 +17,6 @@
package org.apache.inlong.dataproxy.sink.tubezone;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.flume.Context;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
@@ -43,6 +35,14 @@ import org.apache.inlong.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
+import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
+
/**
* TubeClusterProducer
*/
@@ -189,8 +189,7 @@ public class TubeClusterProducer implements LifecycleAware {
}
// create producer failed
if (producer == null) {
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, topic, false, 0);
+ sinkContext.processSendFail(event, topic, 0);
return false;
}
// headers
@@ -210,22 +209,21 @@ public class TubeClusterProducer implements
LifecycleAware {
@Override
public void onMessageSent(MessageSentResult result) {
sinkContext.addSendResultMetric(event, topic, true,
sendTime);
+ event.ack();
}
@Override
public void onException(Throwable ex) {
LOG.error("Send fail:{}", ex.getMessage());
LOG.error(ex.getMessage(), ex);
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, topic, false,
sendTime);
+ sinkContext.processSendFail(event, topic, sendTime);
}
};
producer.sendMessage(message, callback);
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
+ sinkContext.processSendFail(event, event.getUid(), 0);
return false;
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
index f16148ada..bdd1b37e6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSink.java
@@ -17,13 +17,6 @@
package org.apache.inlong.dataproxy.sink.tubezone;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -34,9 +27,17 @@ import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.dataproxy.dispatch.DispatchManager;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* TubeZoneSink
*/
@@ -133,15 +134,22 @@ public class TubeZoneSink extends AbstractSink implements
Configurable {
tx.commit();
return Status.BACKOFF;
}
- if (!(event instanceof ProxyEvent)) {
+ // ProxyEvent
+ if (event instanceof ProxyEvent) {
+ ProxyEvent proxyEvent = (ProxyEvent) event;
+ this.dispatchManager.addEvent(proxyEvent);
+ tx.commit();
+ return Status.READY;
+ }
+ // ProxyPackEvent
+ if (event instanceof ProxyPackEvent) {
+ ProxyPackEvent packEvent = (ProxyPackEvent) event;
+ this.dispatchManager.addPackEvent(packEvent);
tx.commit();
- this.context.addSendFailMetric();
return Status.READY;
}
- //
- ProxyEvent proxyEvent = (ProxyEvent) event;
- this.dispatchManager.addEvent(proxyEvent);
tx.commit();
+ this.context.addSendFailMetric();
return Status.READY;
} catch (Throwable t) {
LOG.error("Process event failed!" + this.getName(), t);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
index e26b37714..ba4141ff2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
@@ -17,10 +17,6 @@
package org.apache.inlong.dataproxy.sink.tubezone;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -34,6 +30,10 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.SinkContext;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
/**
*
* TubeZoneSinkContext
@@ -216,6 +216,21 @@ public class TubeZoneSinkContext extends SinkContext {
dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID,
inlongStreamId);
}
+ /**
+ * processSendFail
+ * @param currentRecord
+ * @param producerTopic
+ * @param sendTime
+ */
+ public void processSendFail(DispatchProfile currentRecord, String
producerTopic, long sendTime) {
+ if (currentRecord.isResend()) {
+ dispatchQueue.offer(currentRecord);
+ this.addSendResultMetric(currentRecord, producerTopic, false,
sendTime);
+ } else {
+ currentRecord.fail();
+ }
+ }
+
/**
* addSendResultMetric
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
index 6dccd660f..4a1c6f59b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
@@ -18,11 +18,13 @@
package org.apache.inlong.dataproxy.source.tcp;
import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.SourceContext;
import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePack;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
@@ -33,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -98,9 +101,9 @@ public class InlongTcpChannelHandler extends
ChannelInboundHandlerAdapter {
// save index, reset it if buffer is not satisfied.
cb.markReaderIndex();
int totalPackLength = cb.readInt();
- cb.resetReaderIndex();
if (readableLength < totalPackLength + LENGTH_PARAM_LENGTH) {
// reset index.
+ cb.resetReaderIndex();
this.addMetric(false, 0, null);
throw new Exception("err msg, channel buffer is not satisfied,
and readableLength="
+ readableLength + ", and totalPackLength=" +
totalPackLength);
@@ -137,7 +140,65 @@ public class InlongTcpChannelHandler extends
ChannelInboundHandlerAdapter {
}
// uncompress
List<ProxyEvent> events = EventUtils.decodeSdkPack(packObject);
- // topic
+ // response success if event size is zero
+ if (events.size() == 0) {
+ this.responsePackage(ctx, ResultCode.SUCCUSS, packObject);
+ }
+ // process
+ if (!CommonPropertiesHolder.isResponseAfterSave()) {
+ this.processAndResponse(ctx, packObject, events);
+ } else {
+ this.processAndWaitingSave(ctx, packObject, events);
+ }
+ }
+
+ /**
+ * processAndWaitingSave
+ * @param ctx
+ * @param packObject
+ * @param events
+ * @throws Exception
+ */
+ private void processAndWaitingSave(ChannelHandlerContext ctx, MessagePack
packObject, List<ProxyEvent> events)
+ throws Exception {
+ MessagePackHeader header = packObject.getHeader();
+ InlongTcpSourceCallback callback = new InlongTcpSourceCallback(ctx,
header);
+ String inlongGroupId = header.getInlongGroupId();
+ String inlongStreamId = header.getInlongStreamId();
+ ProxyPackEvent packEvent = new ProxyPackEvent(inlongGroupId,
inlongStreamId, events, callback);
+ // put to channel
+ try {
+
sourceContext.getSource().getChannelProcessor().processEvent(packEvent);
+ events.forEach(event -> {
+ this.addMetric(true, event.getBody().length, event);
+ });
+ boolean awaitResult =
callback.getLatch().await(CommonPropertiesHolder.getMaxResponseTimeout(),
+ TimeUnit.MILLISECONDS);
+ if (!awaitResult) {
+ if (!callback.getHasResponsed().getAndSet(true)) {
+ this.responsePackage(ctx, ResultCode.ERR_REJECT,
packObject);
+ }
+ }
+ } catch (Throwable ex) {
+ LOG.error("Process Controller Event error can't write event to
channel.", ex);
+ events.forEach(event -> {
+ this.addMetric(false, event.getBody().length, event);
+ });
+ if (!callback.getHasResponsed().getAndSet(true)) {
+ this.responsePackage(ctx, ResultCode.ERR_REJECT, packObject);
+ }
+ }
+ }
+
+ /**
+ * processAndResponse
+ * @param ctx
+ * @param packObject
+ * @param events
+ * @throws Exception
+ */
+ private void processAndResponse(ChannelHandlerContext ctx, MessagePack
packObject, List<ProxyEvent> events)
+ throws Exception {
for (ProxyEvent event : events) {
String uid = event.getUid();
String topic = sourceContext.getIdHolder().getTopic(uid);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSourceCallback.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSourceCallback.java
new file mode 100644
index 000000000..eae335b5e
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSourceCallback.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source.tcp;
+
+import org.apache.inlong.sdk.commons.protocol.SourceCallback;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+/**
+ * InlongTcpEventCallback
+ *
+ */
+public class InlongTcpSourceCallback implements SourceCallback {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(InlongTcpSourceCallback.class);
+
+ private final ChannelHandlerContext ctx;
+ private final MessagePackHeader header;
+ private final CountDownLatch latch;
+ private final AtomicBoolean hasResponsed = new AtomicBoolean(false);
+
+ /**
+ * Constructor
+ * @param ctx
+ * @param header
+ */
+ public InlongTcpSourceCallback(ChannelHandlerContext ctx,
MessagePackHeader header) {
+ this.ctx = ctx;
+ this.header = header;
+ this.latch = new CountDownLatch(1);
+ }
+
+ /**
+ * callback
+ * @param resultCode
+ */
+ @Override
+ public void callback(ResultCode resultCode) {
+ // If DataProxy have sent timeout response to SDK, DataProxy do not
send success response to SDK again when
+ // event is success to save.
+ if (this.hasResponsed.getAndSet(true)) {
+ return;
+ }
+ // response
+ try {
+ ResponseInfo.Builder builder = ResponseInfo.newBuilder();
+ builder.setResult(resultCode);
+ builder.setPackId(header.getPackId());
+
+ // encode
+ byte[] responseBytes = builder.build().toByteArray();
+ //
+ ByteBuf buffer = Unpooled.wrappedBuffer(responseBytes);
+ Channel remoteChannel = ctx.channel();
+ if (remoteChannel.isWritable()) {
+ remoteChannel.write(buffer);
+ } else {
+ LOG.warn("the send buffer2 is full, so disconnect it!"
+ + "please check remote client; Connection info:{}",
+ remoteChannel);
+ buffer.release();
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ // notice TCP session
+ this.latch.countDown();
+ }
+ }
+
+ /**
+ * get hasResponsed
+ * @return the hasResponsed
+ */
+ public AtomicBoolean getHasResponsed() {
+ return hasResponsed;
+ }
+
+ /**
+ * get latch
+ * @return the latch
+ */
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+}
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index e299431a7..7ae2d16eb 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -145,6 +145,12 @@
<groupId>org.powermock</groupId>
<artifactId>powermock-module-testng</artifactId>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
diff --git
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
index 1ca5b7fde..1b5029d74 100644
---
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
+++
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/admin/AdminHttpSourceHandler.java
@@ -39,7 +39,7 @@ public interface AdminHttpSourceHandler extends Configurable {
* <p>
*
* @param request The request to be parsed into Flume
events.
- * @param response
+ * @param response The response to be parsed into Flume
events.
* @return List of Flume events generated from the
request.
* @throws HTTPBadRequestException If the was not parsed correctly into an
event because the request was not in the
* expected format.
diff --git
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
index a4d798b81..391ae9d5e 100644
---
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
+++
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -17,12 +17,11 @@
package org.apache.inlong.sdk.commons.protocol;
-import java.util.Map;
-
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+import java.util.Map;
+
/**
- *
* ProxyEvent
*/
public class ProxyEvent extends SdkEvent {
diff --git
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyPackEvent.java
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyPackEvent.java
new file mode 100644
index 000000000..b3959684c
--- /dev/null
+++
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyPackEvent.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+
+import java.util.List;
+
+/**
+ * ProxyPackEvent
+ */
+public class ProxyPackEvent extends SdkEvent {
+
+ protected long sourceTime;
+ private List<ProxyEvent> events;
+ private SourceCallback callback;
+
+ /**
+ * Constructor
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param events
+ * @param callback
+ */
+ public ProxyPackEvent(String inlongGroupId, String inlongStreamId,
List<ProxyEvent> events,
+ SourceCallback callback) {
+ this.inlongGroupId = inlongGroupId;
+ this.inlongStreamId = inlongStreamId;
+ this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ this.sourceTime = System.currentTimeMillis();
+ this.events = events;
+ this.callback = callback;
+ }
+
+ /**
+ * acknowledge
+ * @param resultCode
+ */
+ public void acknowledge(ResultCode resultCode) {
+ callback.callback(resultCode);
+ }
+
+ /**
+ * get sourceTime
+ * @return the sourceTime
+ */
+ public long getSourceTime() {
+ return sourceTime;
+ }
+
+ /**
+ * set sourceTime
+ * @param sourceTime the sourceTime to set
+ */
+ public void setSourceTime(long sourceTime) {
+ this.sourceTime = sourceTime;
+ }
+
+ /**
+ * get events
+ * @return the events
+ */
+ public List<ProxyEvent> getEvents() {
+ return events;
+ }
+
+ /**
+ * set events
+ * @param events the events to set
+ */
+ public void setEvents(List<ProxyEvent> events) {
+ this.events = events;
+ }
+
+ /**
+ * get callback
+ * @return the callback
+ */
+ public SourceCallback getCallback() {
+ return callback;
+ }
+
+ /**
+ * set callback
+ * @param callback the callback to set
+ */
+ public void setCallback(SourceCallback callback) {
+ this.callback = callback;
+ }
+
+}
diff --git
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SourceCallback.java
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SourceCallback.java
new file mode 100644
index 000000000..ecd5930d6
--- /dev/null
+++
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SourceCallback.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
+
+/**
+ * SourceCallback
+ */
+public interface SourceCallback {
+
+ void callback(ResultCode resultCode);
+}