This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1c9dd7df9d [INLONG-11599][SDK] Optimize the configuration related
content in the ProxyClientConfig class (#11600)
1c9dd7df9d is described below
commit 1c9dd7df9db2e5679457c9cee6acca7171f0bf45
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Dec 16 10:06:18 2024 +0800
[INLONG-11599][SDK] Optimize the configuration related content in the
ProxyClientConfig class (#11600)
Co-authored-by: gosonzhang <[email protected]>
---
.../plugin/sinks/filecollect/SenderManager.java | 7 +-
.../sinks/filecollect/TestSenderManager.java | 1 -
.../inlong/sdk/dataproxy/ConfigConstants.java | 7 +-
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 117 +++-----
.../apache/inlong/sdk/dataproxy/MessageSender.java | 3 +-
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 63 ++++-
.../sdk/dataproxy/config/ProxyConfigEntry.java | 1 +
.../sdk/dataproxy/config/ProxyConfigManager.java | 301 +++++++++++++++------
.../inlong/sdk/dataproxy/network/ClientMgr.java | 2 +-
.../inlong/sdk/dataproxy/network/QueueObject.java | 5 +-
.../inlong/sdk/dataproxy/network/Sender.java | 39 +--
.../sdk/dataproxy/network/SyncMessageCallable.java | 11 +-
.../sdk/dataproxy/threads/MetricWorkerThread.java | 3 +-
.../sdk/dataproxy/threads/TimeoutScanThread.java | 10 +-
14 files changed, 356 insertions(+), 214 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index ec4502a7fb..9ac9083ad8 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -203,6 +203,7 @@ public class SenderManager {
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
+ proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
proxyClientConfig.setIoThreadNum(ioThreadNum);
proxyClientConfig.setEnableBusyWait(enableBusyWait);
@@ -242,7 +243,7 @@ public class SenderManager {
message.getTotalSize(), auditVersion);
asyncSendByMessageSender(cb, message.getDataList(),
message.getGroupId(),
message.getStreamId(), message.getDataTime(),
SEQUENTIAL_ID.getNextUuid(),
- maxSenderTimeout, TimeUnit.SECONDS,
message.getExtraMap(), proxySend);
+ message.getExtraMap(), proxySend);
getMetricItem(message.getGroupId(),
message.getStreamId()).pluginSendCount.addAndGet(
message.getMsgCnt());
suc = true;
@@ -270,11 +271,9 @@ public class SenderManager {
private void asyncSendByMessageSender(SendMessageCallback cb,
List<byte[]> bodyList, String groupId, String streamId, long
dataTime, String msgUUID,
- long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxysdkException {
sender.asyncSendMessage(cb, bodyList, groupId,
- streamId, dataTime, msgUUID,
- timeout, timeUnit, extraAttrMap, isProxySend);
+ streamId, dataTime, msgUUID, extraAttrMap, isProxySend);
}
/**
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 9655e757ef..508e21588f 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -96,7 +96,6 @@ public class TestSenderManager {
return null;
}).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.anyLong(), Mockito.any(),
- Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean());
senderManager.Start();
Long offset = 0L;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 7adc5087af..0216b77c2c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -32,7 +32,8 @@ public class ConfigConstants {
public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey";
// authorization key
public static final String BASIC_AUTH_HEADER = "authorization";
-
+ // default region name
+ public static final String VAL_DEF_REGION_NAME = "";
// config info sync interval in minutes
public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
@@ -43,6 +44,10 @@ public class ConfigConstants {
public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5;
// cache config expired time in ms
public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L;
+ // cache config fail status expired time in ms
+ public static final long VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS = 1000L;
+ public static final long VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS = 3 * 60 *
1000L;
+
// node force choose interval in ms
public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L;
public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 61a68e0d9c..02157fe95a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -40,15 +40,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
public class DefaultMessageSender implements MessageSender {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultMessageSender.class);
- private static final long DEFAULT_SEND_TIMEOUT = 100;
- private static final TimeUnit DEFAULT_SEND_TIMEUNIT =
TimeUnit.MILLISECONDS;
private static final ConcurrentHashMap<Integer, DefaultMessageSender>
CACHE_SENDER =
new ConcurrentHashMap<>();
private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new
AtomicBoolean(false);
@@ -137,6 +134,20 @@ public class DefaultMessageSender implements MessageSender
{
CACHE_SENDER.clear();
}
+ @Override
+ public void close() {
+ LOGGER.info("ready to close resources, may need five minutes !");
+ if (sender.getClusterId() != -1) {
+ CACHE_SENDER.remove(sender.getClusterId());
+ }
+ sender.close();
+ shutdownInternalThreads();
+ }
+
+ public ProxyClientConfig getProxyClientConfig() {
+ return sender.getConfigure();
+ }
+
public boolean isSupportLF() {
return isSupportLF;
}
@@ -433,25 +444,25 @@ public class DefaultMessageSender implements
MessageSender {
@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
String msgUUID, Map<String, String> extraAttrMap) throws
ProxysdkException {
-
+ asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
extraAttrMap, false);
}
@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
String msgUUID) throws ProxysdkException {
-
+ asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
false);
}
@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
long dt, String msgUUID) throws ProxysdkException {
-
+ asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
false);
}
@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
long dt, String msgUUID, Map<String, String> extraAttrMap) throws
ProxysdkException {
-
+ asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
extraAttrMap, false);
}
/**
@@ -508,11 +519,6 @@ public class DefaultMessageSender implements MessageSender
{
return null;
}
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
- asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
timeout, timeUnit, false);
- }
-
/**
* async send single message
*
@@ -522,13 +528,11 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @throws ProxysdkException
*/
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit, boolean
isProxySend) throws ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId,
+ String streamId, long dt, String msgUUID, boolean isProxySend)
throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -549,7 +553,7 @@ public class DefaultMessageSender implements MessageSender {
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
- sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
+ sender.asyncSendMessage(encodeObject, callback, msgUUID);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompressEnd) {
if (isProxySend) {
@@ -558,23 +562,15 @@ public class DefaultMessageSender implements
MessageSender {
sender.asyncSendMessage(new
EncodeObject(Collections.singletonList(body), "groupId="
+ groupId + "&streamId=" + streamId + "&dt=" + dt +
"&cp=snappy" + proxySend,
idGenerator.getNextId(), this.getMsgtype(), true,
groupId),
- callback, msgUUID, timeout, timeUnit);
+ callback, msgUUID);
} else {
sender.asyncSendMessage(
new EncodeObject(Collections.singletonList(body),
"groupId=" + groupId + "&streamId="
+ streamId + "&dt=" + dt + proxySend,
idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
- callback,
- msgUUID, timeout, timeUnit);
+ callback, msgUUID);
}
}
-
- }
-
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap)
- throws ProxysdkException {
- asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
timeout, timeUnit, extraAttrMap, false);
}
/**
@@ -586,15 +582,12 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @throws ProxysdkException
*/
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap, boolean isProxySend)
- throws ProxysdkException {
+ String msgUUID, Map<String, String> extraAttrMap, boolean
isProxySend) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -615,28 +608,23 @@ public class DefaultMessageSender implements
MessageSender {
isReport, isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
- sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
+ sender.asyncSendMessage(encodeObject, callback, msgUUID);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
sender.asyncSendMessage(new
EncodeObject(Collections.singletonList(body), attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true,
groupId),
- callback, msgUUID, timeout, timeUnit);
+ callback, msgUUID);
} else {
sender.asyncSendMessage(
new EncodeObject(Collections.singletonList(body),
attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
- callback, msgUUID, timeout, timeUnit);
+ callback, msgUUID);
}
}
}
- public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
- long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
- asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
timeout, timeUnit, false);
- }
-
/**
* async send a batch of messages
*
@@ -646,14 +634,11 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report time
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @throws ProxysdkException
*/
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList,
- String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, boolean isProxySend) throws
ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID, boolean
isProxySend) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -671,7 +656,7 @@ public class DefaultMessageSender implements MessageSender {
isReport, isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
- sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
+ sender.asyncSendMessage(encodeObject, callback, msgUUID);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
proxySend = "&" + proxySend;
@@ -682,24 +667,18 @@ public class DefaultMessageSender implements
MessageSender {
+ "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size() + proxySend,
idGenerator.getNextId(),
this.getMsgtype(), true, groupId),
- callback, msgUUID, timeout, timeUnit);
+ callback, msgUUID);
} else {
sender.asyncSendMessage(
new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + dt + "&cnt=" + bodyList.size()
+ proxySend,
idGenerator.getNextId(), this.getMsgtype(),
false, groupId),
- callback, msgUUID, timeout, timeUnit);
+ callback, msgUUID);
}
}
}
- public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
throws ProxysdkException {
- asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
timeout, timeUnit, extraAttrMap, false);
- }
-
/**
* async send a batch of messages
*
@@ -709,15 +688,12 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report time
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @throws ProxysdkException
*/
public void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(
@@ -739,17 +715,17 @@ public class DefaultMessageSender implements
MessageSender {
isCompress, isReport, isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
- sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
+ sender.asyncSendMessage(encodeObject, callback, msgUUID);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
if (isCompress) {
attrs.append("&cp=snappy");
sender.asyncSendMessage(new EncodeObject(bodyList,
attrs.toString(), idGenerator.getNextId(),
- this.getMsgtype(), true, groupId), callback, msgUUID,
timeout, timeUnit);
+ this.getMsgtype(), true, groupId), callback, msgUUID);
} else {
sender.asyncSendMessage(new EncodeObject(bodyList,
attrs.toString(), idGenerator.getNextId(),
- this.getMsgtype(), false, groupId), callback, msgUUID,
timeout, timeUnit);
+ this.getMsgtype(), false, groupId), callback, msgUUID);
}
}
@@ -767,8 +743,8 @@ public class DefaultMessageSender implements MessageSender {
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback)
throws ProxysdkException {
- this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(),
- idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT);
+ this.asyncSendMessage(callback, body, inlongGroupId,
+ inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId());
}
/**
@@ -783,8 +759,8 @@ public class DefaultMessageSender implements MessageSender {
*/
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback,
boolean isProxySend) throws ProxysdkException {
- this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(),
- idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT, isProxySend);
+ this.asyncSendMessage(callback, body, inlongGroupId,
+ inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), isProxySend);
}
/**
@@ -799,8 +775,8 @@ public class DefaultMessageSender implements MessageSender {
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
SendMessageCallback callback) throws ProxysdkException {
- this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
- idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT);
+ this.asyncSendMessage(callback, bodyList, inlongGroupId,
+ inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId());
}
/**
@@ -815,8 +791,8 @@ public class DefaultMessageSender implements MessageSender {
*/
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
SendMessageCallback callback, boolean isProxySend) throws
ProxysdkException {
- this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
- idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT, isProxySend);
+ this.asyncSendMessage(callback, bodyList, inlongGroupId,
+ inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), isProxySend);
}
private void addIndexCnt(String groupId, String streamId, long cnt) {
@@ -837,13 +813,4 @@ public class DefaultMessageSender implements MessageSender
{
indexCol.shutDown();
MANAGER_FETCHER_THREAD_STARTED.set(false);
}
-
- public void close() {
- LOGGER.info("ready to close resources, may need five minutes !");
- if (sender.getClusterId() != -1) {
- CACHE_SENDER.remove(sender.getClusterId());
- }
- sender.close();
- shutdownInternalThreads();
- }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index e980e65974..2a4ae6313a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -26,6 +26,8 @@ import java.util.Map;
public interface MessageSender {
+ void close();
+
/**
* This method provides a synchronized function which you want to send
data without packing
*
@@ -137,5 +139,4 @@ public interface MessageSender {
void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
SendMessageCallback callback) throws ProxysdkException;
- void close();
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 7af91dba34..6a80fbf21c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -39,19 +39,24 @@ public class ProxyClientConfig {
private String configStoreBasePath = System.getProperty("user.dir");
// max expired time for config cache.
private long configCacheExpiredMs =
ConfigConstants.VAL_DEF_CACHE_CONFIG_EXPIRED_MS;
+ // max expired time for config query failure status
+ private long configFailStatusExpiredMs =
ConfigConstants.VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS;
// nodes force choose interval ms
private long forceReChooseInrMs =
ConfigConstants.VAL_DEF_FORCE_CHOOSE_INR_MS;
private boolean enableAuthentication = false;
private String authSecretId = "";
private String authSecretKey = "";
private String inlongGroupId;
+ private String regionName = ConfigConstants.VAL_DEF_REGION_NAME;
private int aliveConnections = ConfigConstants.VAL_DEF_ALIVE_CONNECTIONS;
+ // data encrypt info
+ private boolean enableDataEncrypt = false;
+ private String rsaPubKeyUrl = "";
+ private String userName = "";
private int syncThreadPoolSize;
private int asyncCallbackSize;
- private boolean isNeedDataEncry = false;
- private String rsaPubKeyUrl = "";
private String tlsServerCertFilePathAndName;
private String tlsServerKey;
private String tlsVersion = "TLSv1.2";
@@ -240,6 +245,15 @@ public class ProxyClientConfig {
this.configCacheExpiredMs = configCacheExpiredMs;
}
+ public long getConfigFailStatusExpiredMs() {
+ return configFailStatusExpiredMs;
+ }
+
+ public void setConfigFailStatusExpiredMs(long configFailStatusExpiredMs) {
+ this.configFailStatusExpiredMs =
+ Math.min(configFailStatusExpiredMs,
ConfigConstants.VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS);
+ }
+
public long getForceReChooseInrMs() {
return forceReChooseInrMs;
}
@@ -253,6 +267,16 @@ public class ProxyClientConfig {
return inlongGroupId;
}
+ public String getRegionName() {
+ return regionName;
+ }
+
+ public void setRegionName(String regionName) {
+ if (StringUtils.isNotBlank(regionName)) {
+ this.regionName = regionName.trim();
+ }
+ }
+
public int getAliveConnections() {
return this.aliveConnections;
}
@@ -262,6 +286,33 @@ public class ProxyClientConfig {
Math.max(ConfigConstants.VAL_MIN_ALIVE_CONNECTIONS,
aliveConnections);
}
+ public boolean isEnableDataEncrypt() {
+ return enableDataEncrypt;
+ }
+
+ public String getRsaPubKeyUrl() {
+ return rsaPubKeyUrl;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void enableDataEncrypt(boolean needDataEncrypt, String userName,
String rsaPubKeyUrl) {
+ this.enableDataEncrypt = needDataEncrypt;
+ if (!this.enableDataEncrypt) {
+ return;
+ }
+ if (StringUtils.isBlank(userName)) {
+ throw new IllegalArgumentException("userName is Blank!");
+ }
+ if (StringUtils.isBlank(rsaPubKeyUrl)) {
+ throw new IllegalArgumentException("rsaPubKeyUrl is Blank!");
+ }
+ this.userName = userName.trim();
+ this.rsaPubKeyUrl = rsaPubKeyUrl.trim();
+ }
+
public String getTlsServerCertFilePathAndName() {
return tlsServerCertFilePathAndName;
}
@@ -370,14 +421,6 @@ public class ProxyClientConfig {
this.maxMsgInFlightPerConn = maxMsgInFlightPerConn;
}
- public String getRsaPubKeyUrl() {
- return rsaPubKeyUrl;
- }
-
- public boolean isNeedDataEncry() {
- return isNeedDataEncry;
- }
-
public void setHttpsInfo(String tlsServerCertFilePathAndName, String
tlsServerKey) {
if (StringUtils.isBlank(tlsServerCertFilePathAndName)) {
throw new IllegalArgumentException("tlsServerCertFilePathAndName
is Blank!");
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index e37b9b2c0a..ebf76464ea 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -62,6 +62,7 @@ public class ProxyConfigEntry implements java.io.Serializable
{
public void setHostMap(Map<String, HostInfo> hostMap) {
this.hostMap = hostMap;
}
+
public boolean isNodesEmpty() {
return this.hostMap.isEmpty();
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 033395667b..97952eb1ff 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -74,9 +74,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -90,17 +91,22 @@ public class ProxyConfigManager extends Thread {
private static final Logger logger =
LoggerFactory.getLogger(ProxyConfigManager.class);
private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
private static final LogCounter parseCounter = new LogCounter(10, 100000,
60 * 1000L);
+ private static final Map<String, Tuple2<AtomicLong, String>>
fetchFailProxyMap =
+ new ConcurrentHashMap<>();
+ private static final Map<String, Tuple2<AtomicLong, String>>
fetchFailEncryptMap =
+ new ConcurrentHashMap<>();
private static final ReentrantReadWriteLock fileRw = new
ReentrantReadWriteLock();
private final String callerId;
- private ProxyClientConfig clientConfig;
private final Gson gson = new Gson();
private final ClientMgr clientManager;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private final AtomicBoolean shutDown = new AtomicBoolean(false);
// proxy configure info
+ private ProxyClientConfig clientConfig = null;
private String localProxyConfigStoreFile;
private String proxyConfigVisitUrl;
+ private String proxyQueryFailKey;
private String proxyConfigCacheFile;
private List<HostInfo> proxyInfoList = new ArrayList<>();
private int oldStat = 0;
@@ -108,6 +114,7 @@ public class ProxyConfigManager extends Thread {
private long lstUpdateTime = 0;
// encrypt configure info
private String encryptConfigVisitUrl;
+ private String encryptQueryFailKey;
private String encryptConfigCacheFile;
private EncryptConfigEntry userEncryptConfigEntry;
@@ -118,7 +125,9 @@ public class ProxyConfigManager extends Thread {
public ProxyConfigManager(String callerId, ProxyClientConfig configure,
ClientMgr clientManager) {
this.callerId = callerId;
this.clientManager = clientManager;
- this.storeAndBuildMetaConfigure(configure);
+ if (configure != null) {
+ this.storeAndBuildMetaConfigure(configure);
+ }
if (this.clientManager != null) {
this.setName("ConfigManager-" + this.callerId);
logger.info("ConfigManager({}) started, groupId={}",
@@ -130,19 +139,20 @@ public class ProxyConfigManager extends Thread {
* Update proxy client configure for query case
*
* @param configure proxy client configure
- * @throws Exception exception
+ * @return process result
*/
- public void updProxyClientConfig(ProxyClientConfig configure) throws
Exception {
+ public Tuple2<Boolean, String> updProxyClientConfig(ProxyClientConfig
configure) {
+ if (this.shutDown.get()) {
+ return new Tuple2<>(false, "SDK has shutdown!");
+ }
if (configure == null) {
- throw new Exception("ProxyClientConfig is null");
+ return new Tuple2<>(false, "ProxyClientConfig is null");
}
if (this.clientManager != null) {
- throw new Exception("Not allowed for non meta-query case!");
- }
- if (shutDown.get()) {
- return;
+ return new Tuple2<>(false, "Not allowed for non meta-query case!");
}
this.storeAndBuildMetaConfigure(configure);
+ return new Tuple2<>(true, "OK");
}
public void shutDown() {
@@ -166,6 +176,9 @@ public class ProxyConfigManager extends Thread {
if (shutDown.get()) {
return new Tuple2<>(null, "SDK has shutdown!");
}
+ if (clientConfig == null) {
+ return new Tuple2<>(null, "Configure not initialized!");
+ }
if (clientConfig.isOnlyUseLocalProxyConfig()) {
return getLocalProxyListFromFile(this.localProxyConfigStoreFile);
} else {
@@ -183,7 +196,7 @@ public class ProxyConfigManager extends Thread {
break;
}
// sleep then retry
- TimeUnit.MILLISECONDS.sleep(500);
+ Thread.sleep(500L);
} while (++retryCount <
clientConfig.getConfigSyncMaxRetryIfFail());
}
if (shutDown.get()) {
@@ -205,12 +218,15 @@ public class ProxyConfigManager extends Thread {
* @throws Exception ex
*/
public Tuple2<EncryptConfigEntry, String> getEncryptConfigure(boolean
needRetry) throws Exception {
- if (!clientConfig.isNeedDataEncry()) {
+ if (!clientConfig.isEnableDataEncrypt()) {
return new Tuple2<>(null, "Not need data encrypt!");
}
if (shutDown.get()) {
return new Tuple2<>(null, "SDK has shutdown!");
}
+ if (clientConfig == null) {
+ return new Tuple2<>(null, "Configure not initialized!");
+ }
EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
if (encryptEntry != null) {
return new Tuple2<>(encryptEntry, "Ok");
@@ -228,7 +244,7 @@ public class ProxyConfigManager extends Thread {
break;
}
// sleep then retry
- TimeUnit.MILLISECONDS.sleep(500);
+ Thread.sleep(500L);
} while (++retryCount <
clientConfig.getConfigSyncMaxRetryIfFail());
}
if (shutDown.get()) {
@@ -258,7 +274,7 @@ public class ProxyConfigManager extends Thread {
}
}
// update encrypt configure
- if (clientConfig.isNeedDataEncry()) {
+ if (clientConfig.isEnableDataEncrypt()) {
try {
doEncryptConfigEntryQueryWork();
} catch (Throwable ex) {
@@ -288,7 +304,7 @@ public class ProxyConfigManager extends Thread {
* @throws Exception
*/
public void doProxyEntryQueryWork() throws Exception {
- if (shutDown.get()) {
+ if (shutDown.get() || this.clientManager == null) {
return;
}
/* Request the configuration from manager. */
@@ -306,7 +322,7 @@ public class ProxyConfigManager extends Thread {
break;
}
// sleep then retry.
- TimeUnit.SECONDS.sleep(2);
+ Thread.sleep(2000L);
} while (++retryCnt <
this.clientConfig.getConfigSyncMaxRetryIfFail() && !shutDown.get());
if (shutDown.get()) {
return;
@@ -340,7 +356,7 @@ public class ProxyConfigManager extends Thread {
}
private void doEncryptConfigEntryQueryWork() throws Exception {
- if (shutDown.get()) {
+ if (shutDown.get() || this.clientManager == null) {
return;
}
int retryCount = 0;
@@ -351,7 +367,7 @@ public class ProxyConfigManager extends Thread {
break;
}
// sleep then retry
- TimeUnit.MILLISECONDS.sleep(500);
+ Thread.sleep(500L);
} while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail());
if (shutDown.get()) {
return;
@@ -380,15 +396,21 @@ public class ProxyConfigManager extends Thread {
if (StringUtils.isBlank(strRet)) {
return new Tuple2<>(null, "Blank configure local file from " +
filePath);
}
- return getProxyConfigEntry(strRet);
+ return getProxyConfigEntry(false, strRet);
}
private Tuple2<ProxyConfigEntry, String> requestProxyEntryQuietly() {
- List<BasicNameValuePair> params = buildProxyNodeQueryParams();
+ // check cache failure
+ String qryResult = getManagerQryResultInFailStatus(true);
+ if (qryResult != null) {
+ return new Tuple2<>(null, "Query fail(" + qryResult + ") just now,
please retry later!");
+ }
// request meta info from manager
+ List<BasicNameValuePair> params = buildProxyNodeQueryParams();
logger.debug("ConfigManager({}) request configure to manager({}),
param={}",
this.callerId, this.proxyConfigVisitUrl, params);
- Tuple2<Boolean, String> queryResult =
requestConfiguration(this.proxyConfigVisitUrl, params);
+ Tuple2<Boolean, String> queryResult =
+ requestConfiguration(true, this.proxyConfigVisitUrl, params);
if (!queryResult.getF0()) {
return new Tuple2<>(null, queryResult.getF1());
}
@@ -396,12 +418,20 @@ public class ProxyConfigManager extends Thread {
logger.debug("ConfigManager({}) received configure, from manager({}),
groupId={}, result={}",
callerId, proxyConfigVisitUrl,
clientConfig.getInlongGroupId(), queryResult.getF1());
try {
- return getProxyConfigEntry(queryResult.getF1());
+ Tuple2<ProxyConfigEntry, String> parseResult =
+ getProxyConfigEntry(true, queryResult.getF1());
+ if (parseResult.getF0() == null) {
+ bookManagerQryFailStatus(true, parseResult.getF1());
+ } else {
+ rmvManagerQryFailStatus(true);
+ }
+ return parseResult;
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("ConfigManager({}) parse failure, from
manager({}), groupId={}, result={}",
callerId, proxyConfigVisitUrl,
clientConfig.getInlongGroupId(), queryResult.getF1(), ex);
}
+ bookManagerQryFailStatus(true, ex.getMessage());
return new Tuple2<>(null, ex.getMessage());
}
}
@@ -518,16 +548,23 @@ public class ProxyConfigManager extends Thread {
}
private Tuple2<EncryptConfigEntry, String> requestPubKeyFromManager() {
- List<BasicNameValuePair> params = buildPubKeyQueryParams();
+ // check cache failure
+ String qryResult = getManagerQryResultInFailStatus(false);
+ if (qryResult != null) {
+ return new Tuple2<>(null, "Query fail(" + qryResult + ") just now,
please retry later!");
+ }
// request meta info from manager
+ List<BasicNameValuePair> params = buildPubKeyQueryParams();
logger.debug("ConfigManager({}) request pubkey to manager({}),
param={}",
this.callerId, this.encryptConfigVisitUrl, params);
- Tuple2<Boolean, String> queryResult =
requestConfiguration(this.encryptConfigVisitUrl, params);
+ Tuple2<Boolean, String> queryResult =
+ requestConfiguration(false, this.encryptConfigVisitUrl,
params);
if (!queryResult.getF0()) {
return new Tuple2<>(null, queryResult.getF1());
}
logger.debug("ConfigManager({}) received pubkey from manager({}),
result={}",
this.callerId, this.encryptConfigVisitUrl,
queryResult.getF1());
+ String errorMsg;
JsonObject pubKeyConf;
try {
pubKeyConf =
JsonParser.parseString(queryResult.getF1()).getAsJsonObject();
@@ -536,62 +573,72 @@ public class ProxyConfigManager extends Thread {
logger.warn("ConfigManager({}) parse failure, secretId={},
config={}!",
this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
}
- return new Tuple2<>(null, "parse pubkey failure:" +
ex.getMessage());
+ errorMsg = "parse pubkey failure:" + ex.getMessage();
+ bookManagerQryFailStatus(false, errorMsg);
+ return new Tuple2<>(null, errorMsg);
}
if (pubKeyConf == null) {
- return new Tuple2<>(null, "No public key information");
+ errorMsg = "No public key information";
+ bookManagerQryFailStatus(false, errorMsg);
+ return new Tuple2<>(null, errorMsg);
}
- if (!pubKeyConf.has("resultCode")) {
- if (parseCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) config failure: resultCode
field not exist, secretId={}, config={}!",
- this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
- }
- return new Tuple2<>(null, "resultCode field not exist");
- }
- int resultCode = pubKeyConf.get("resultCode").getAsInt();
- if (resultCode != 0) {
- if (parseCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) config failure: resultCode !=
0, secretId={}, config={}!",
- this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
- }
- return new Tuple2<>(null, "resultCode != 0!");
- }
- if (!pubKeyConf.has("resultData")) {
- if (parseCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) config failure: resultData
field not exist, secretId={}, config={}!",
- this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
- }
- return new Tuple2<>(null, "resultData field not exist");
- }
- JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
- if (resultData != null) {
- String publicKey = resultData.get("publicKey").getAsString();
- if (StringUtils.isBlank(publicKey)) {
+ try {
+ if (!pubKeyConf.has("resultCode")) {
if (parseCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) config failure: publicKey
is blank, secretId={}, config={}!",
+ logger.warn("ConfigManager({}) config failure: resultCode
field not exist, secretId={}, config={}!",
this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
}
- return new Tuple2<>(null, "publicKey is blank!");
+ throw new Exception("resultCode field not exist");
}
- String username = resultData.get("username").getAsString();
- if (StringUtils.isBlank(username)) {
+ int resultCode = pubKeyConf.get("resultCode").getAsInt();
+ if (resultCode != 0) {
if (parseCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) config failure: username is
blank, secretId={}, config={}!",
+ logger.warn("ConfigManager({}) config failure: resultCode
!= 0, secretId={}, config={}!",
this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
}
- return new Tuple2<>(null, "username is blank!");
+ throw new Exception("resultCode != 0!");
}
- String versionStr = resultData.get("version").getAsString();
- if (StringUtils.isBlank(versionStr)) {
+ if (!pubKeyConf.has("resultData")) {
if (parseCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) config failure: version is
blank, secretId={}, config={}!",
+ logger.warn("ConfigManager({}) config failure: resultData
field not exist, secretId={}, config={}!",
this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
}
- return new Tuple2<>(null, "version is blank!");
+ throw new Exception("resultData field not exist");
+ }
+ JsonObject resultData =
pubKeyConf.get("resultData").getAsJsonObject();
+ if (resultData != null) {
+ String publicKey = resultData.get("publicKey").getAsString();
+ if (StringUtils.isBlank(publicKey)) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure:
publicKey is blank, secretId={}, config={}!",
+ this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+ }
+ throw new Exception("publicKey is blank!");
+ }
+ String username = resultData.get("username").getAsString();
+ if (StringUtils.isBlank(username)) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure:
username is blank, secretId={}, config={}!",
+ this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+ }
+ throw new Exception("username is blank!");
+ }
+ String versionStr = resultData.get("version").getAsString();
+ if (StringUtils.isBlank(versionStr)) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure: version
is blank, secretId={}, config={}!",
+ this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+ }
+ throw new Exception("version is blank!");
+ }
+ rmvManagerQryFailStatus(false);
+ return new Tuple2<>(new EncryptConfigEntry(username,
versionStr, publicKey), "Ok");
}
- return new Tuple2<>(new EncryptConfigEntry(username, versionStr,
publicKey), "Ok");
+ throw new Exception("resultData value is null!");
+ } catch (Throwable ex) {
+ bookManagerQryFailStatus(false, ex.getMessage());
+ return new Tuple2<>(null, ex.getMessage());
}
- return new Tuple2<>(null, "resultData value is null!");
}
private void updateEncryptConfigEntry(EncryptConfigEntry newEncryptEntry) {
@@ -674,7 +721,8 @@ public class ProxyConfigManager extends Thread {
}
/* Request new configurations from Manager. */
- private Tuple2<Boolean, String> requestConfiguration(String url,
List<BasicNameValuePair> params) {
+ private Tuple2<Boolean, String> requestConfiguration(
+ boolean queryProxyInfo, String url, List<BasicNameValuePair>
params) {
HttpParams myParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(myParams,
clientConfig.getManagerConnTimeoutMs());
HttpConnectionParams.setSoTimeout(myParams,
clientConfig.getManagerSocketTimeoutMs());
@@ -702,13 +750,20 @@ public class ProxyConfigManager extends Thread {
new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
httpPost.setEntity(urlEncodedFormEntity);
HttpResponse response = httpClient.execute(httpPost);
+ String errMsg;
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- return new Tuple2<>(false,
response.getStatusLine().getStatusCode()
- + ":" + response.getStatusLine().getStatusCode());
+ errMsg = response.getStatusLine().getStatusCode()
+ + ":" + response.getStatusLine().getReasonPhrase();
+ if (response.getStatusLine().getStatusCode() >= 500) {
+ bookManagerQryFailStatus(queryProxyInfo, errMsg);
+ }
+ return new Tuple2<>(false, errMsg);
}
String returnStr = EntityUtils.toString(response.getEntity());
if (StringUtils.isBlank(returnStr)) {
- return new Tuple2<>(false, "query result is blank!");
+ errMsg = "server return blank entity!";
+ bookManagerQryFailStatus(queryProxyInfo, errMsg);
+ return new Tuple2<>(false, errMsg);
}
return new Tuple2<>(true, returnStr);
} catch (Throwable ex) {
@@ -755,6 +810,11 @@ public class ProxyConfigManager extends Thread {
.append(ConfigConstants.MANAGER_DATAPROXY_API).append(clientConfig.getInlongGroupId())
.toString();
strBuff.delete(0, strBuff.length());
+ this.proxyQueryFailKey = strBuff
+ .append("proxy:").append(clientConfig.getInlongGroupId())
+ .append("#").append(clientConfig.getRegionName())
+ .append("#").append(clientConfig.getProtocolType()).toString();
+ strBuff.delete(0, strBuff.length());
this.localProxyConfigStoreFile = strBuff
.append(clientConfig.getConfigStoreBasePath())
.append(ConfigConstants.META_STORE_SUB_DIR)
@@ -770,6 +830,9 @@ public class ProxyConfigManager extends Thread {
.toString();
strBuff.delete(0, strBuff.length());
this.encryptConfigVisitUrl = clientConfig.getRsaPubKeyUrl();
+ this.encryptQueryFailKey = strBuff
+
.append("encrypt:").append(clientConfig.getUserName()).toString();
+ strBuff.delete(0, strBuff.length());
this.encryptConfigCacheFile = strBuff
.append(clientConfig.getConfigStoreBasePath())
.append(ConfigConstants.META_STORE_SUB_DIR)
@@ -795,23 +858,85 @@ public class ProxyConfigManager extends Thread {
private List<BasicNameValuePair> buildPubKeyQueryParams() {
List<BasicNameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("operation", "query"));
- params.add(new BasicNameValuePair("username",
clientConfig.getAuthSecretId()));
+ params.add(new BasicNameValuePair("username",
clientConfig.getUserName()));
return params;
}
- private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(String
strRet) {
- DataProxyNodeResponse proxyCluster;
- try {
- proxyCluster = gson.fromJson(strRet, DataProxyNodeResponse.class);
- } catch (Throwable ex) {
- if (parseCounter.shouldPrint()) {
- logger.warn("ConfigManager({}) parse exception, groupId={},
config={}",
- this.callerId, clientConfig.getInlongGroupId(),
strRet, ex);
+ private void bookManagerQryFailStatus(boolean proxyQry, String errMsg) {
+ if (proxyQry) {
+ fetchFailProxyMap.put(proxyQueryFailKey,
+ new Tuple2<>(new AtomicLong(System.currentTimeMillis()),
errMsg));
+ } else {
+ fetchFailEncryptMap.put(encryptQueryFailKey,
+ new Tuple2<>(new AtomicLong(System.currentTimeMillis()),
errMsg));
+ }
+ }
+
+ private void rmvManagerQryFailStatus(boolean proxyQry) {
+ if (proxyQry) {
+ fetchFailProxyMap.remove(proxyQueryFailKey);
+ } else {
+ fetchFailEncryptMap.remove(encryptQueryFailKey);
+ }
+ }
+
+ private String getManagerQryResultInFailStatus(boolean proxyQry) {
+ if (clientConfig.getConfigFailStatusExpiredMs() <= 0) {
+ return null;
+ }
+ Tuple2<AtomicLong, String> queryResult;
+ if (proxyQry) {
+ queryResult = fetchFailProxyMap.get(proxyQueryFailKey);
+ } else {
+ queryResult = fetchFailEncryptMap.get(encryptQueryFailKey);
+ }
+ if (queryResult != null
+ && (System.currentTimeMillis() - queryResult.getF0().get() <
clientConfig
+ .getConfigFailStatusExpiredMs())) {
+ return queryResult.getF1();
+ }
+ return null;
+ }
+
+ private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(boolean
fromManager, String strRet) {
+ DataProxyNodeResponse proxyNodeConfig;
+ if (fromManager) {
+ ProxyClusterConfig clusterConfig;
+ try {
+ clusterConfig = gson.fromJson(strRet,
ProxyClusterConfig.class);
+ } catch (Throwable ex) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) parse exception,
groupId={}, config={}",
+ this.callerId, clientConfig.getInlongGroupId(),
strRet, ex);
+ }
+ return new Tuple2<>(null, "parse failure:" + ex.getMessage());
+ }
+ if (clusterConfig == null) {
+ return new Tuple2<>(null, "content parse result is null!");
+ }
+ if (!clusterConfig.isSuccess()) {
+ return new Tuple2<>(null, clusterConfig.getErrMsg());
+ }
+ if (clusterConfig.getData() == null) {
+ return new Tuple2<>(null, "return data content is null!");
+ }
+ proxyNodeConfig = clusterConfig.getData();
+ } else {
+ try {
+ proxyNodeConfig = gson.fromJson(strRet,
DataProxyNodeResponse.class);
+ } catch (Throwable ex) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) parse local file exception,
groupId={}, config={}",
+ this.callerId, clientConfig.getInlongGroupId(),
strRet, ex);
+ }
+ return new Tuple2<>(null, "parse file failure:" +
ex.getMessage());
+ }
+ if (proxyNodeConfig == null) {
+ return new Tuple2<>(null, "file content parse result is
null!");
}
- return new Tuple2<>(null, "parse failure:" + ex.getMessage());
}
// parse nodeList
- List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
+ List<DataProxyNodeInfo> nodeList = proxyNodeConfig.getNodeList();
if (CollectionUtils.isEmpty(nodeList)) {
return new Tuple2<>(null, "nodeList is empty!");
}
@@ -836,23 +961,23 @@ public class ProxyConfigManager extends Thread {
}
// parse clusterId
int clusterId = -1;
- if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) {
- clusterId = proxyCluster.getClusterId();
+ if (ObjectUtils.isNotEmpty(proxyNodeConfig.getClusterId())) {
+ clusterId = proxyNodeConfig.getClusterId();
}
// parse load
int load = ConfigConstants.LOAD_THRESHOLD;
- if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) {
- load = proxyCluster.getLoad() > 200 ? 200 :
(Math.max(proxyCluster.getLoad(), 0));
+ if (ObjectUtils.isNotEmpty(proxyNodeConfig.getLoad())) {
+ load = proxyNodeConfig.getLoad() > 200 ? 200 :
(Math.max(proxyNodeConfig.getLoad(), 0));
}
// parse isIntranet
boolean isIntranet = true;
- if (ObjectUtils.isNotEmpty(proxyCluster.getIsIntranet())) {
- isIntranet = proxyCluster.getIsIntranet() == 1;
+ if (ObjectUtils.isNotEmpty(proxyNodeConfig.getIsIntranet())) {
+ isIntranet = proxyNodeConfig.getIsIntranet() == 1;
}
// parse isSwitch
int isSwitch = 0;
- if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
- isSwitch = proxyCluster.getIsSwitch();
+ if (ObjectUtils.isNotEmpty(proxyNodeConfig.getIsSwitch())) {
+ isSwitch = proxyNodeConfig.getIsSwitch();
}
// build ProxyConfigEntry
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
@@ -863,7 +988,7 @@ public class ProxyConfigManager extends Thread {
proxyEntry.setSwitchStat(isSwitch);
proxyEntry.setLoad(load);
proxyEntry.setMaxPacketLength(
- proxyCluster.getMaxPacketLength() != null ?
proxyCluster.getMaxPacketLength() : -1);
+ proxyNodeConfig.getMaxPacketLength() != null ?
proxyNodeConfig.getMaxPacketLength() : -1);
return new Tuple2<>(proxyEntry, "ok");
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index 0444edbfd5..ea9fc15a51 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -345,7 +345,7 @@ public class ClientMgr {
if (!realHosts.isEmpty()) {
break;
}
- Thread.sleep(1000);
+ Thread.sleep(1000L);
} while (--maxCycleCnt > 0);
// update active nodes
if (realHosts.isEmpty()) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
index 3699213176..009d513bdb 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sdk.dataproxy.network;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class QueueObject {
@@ -32,11 +31,11 @@ public class QueueObject {
private final int size;
public QueueObject(NettyClient client, long sendTimeInMillis,
- SendMessageCallback callback, int size, long timeout, TimeUnit
timeUnit) {
+ SendMessageCallback callback, int size, long timeoutMs) {
this.client = client;
this.sendTimeInMillis = sendTimeInMillis;
this.callback = callback;
- this.timeoutInMillis = TimeUnit.MILLISECONDS.convert(timeout,
timeUnit);
+ this.timeoutInMillis = timeoutMs;
this.size = size;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 13920fb7a6..b56781c0ce 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -97,7 +97,7 @@ public class Sender {
if (!configure.isEnableAuthentication()) {
throw new Exception("In OutNetwork isNeedAuthentication must
be true!");
}
- if (!configure.isNeedDataEncry()) {
+ if (!configure.isEnableDataEncrypt()) {
throw new Exception("In OutNetwork isNeedDataEncry must be
true!");
}
}
@@ -201,15 +201,13 @@ public class Sender {
clientMgr.getStreamIdNum(encodeObject.getStreamId()));
}
}
- if (this.configure.isNeedDataEncry()) {
+ if (this.configure.isEnableDataEncrypt()) {
encodeObject.setEncryptEntry(true,
configure.getAuthSecretId(),
clientMgr.getEncryptConfigureInfo());
- } else {
- encodeObject.setEncryptEntry(false, null, null);
}
encodeObject.setMsgUUID(msgUUID);
- SyncMessageCallable callable = new
SyncMessageCallable(clientResult.getF1(),
- encodeObject, configure.getRequestTimeoutMs(),
TimeUnit.MILLISECONDS);
+ SyncMessageCallable callable = new SyncMessageCallable(
+ clientResult.getF1(), encodeObject,
configure.getRequestTimeoutMs());
syncCallables.put(encodeObject.getMessageId(), callable);
Future<SendResult> future = threadPool.submit(callable);
message = future.get(configure.getRequestTimeoutMs(),
TimeUnit.MILLISECONDS);
@@ -309,8 +307,8 @@ public class Sender {
/**
* Following methods used by asynchronously message sending.
*/
- public void asyncSendMessage(EncodeObject encodeObject,
SendMessageCallback callback, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ public void asyncSendMessage(EncodeObject encodeObject,
+ SendMessageCallback callback, String msgUUID) throws
ProxysdkException {
if (!started.get()) {
if (callback != null) {
callback.onMessageAck(SendResult.SENDER_CLOSED);
@@ -381,8 +379,8 @@ public class Sender {
}
int size = 1;
if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
- currentBufferSize.decrementAndGet();
clientResult.getF1().decMsgInFlight();
+ currentBufferSize.decrementAndGet();
if (callback != null) {
callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL);
return;
@@ -393,7 +391,8 @@ public class Sender {
ConcurrentHashMap<String, QueueObject> msgQueueMap =
callbacks.computeIfAbsent(clientResult.getF1().getChannel(),
(k) -> new ConcurrentHashMap<>());
QueueObject queueObject =
msgQueueMap.putIfAbsent(encodeObject.getMessageId(),
- new QueueObject(clientResult.getF1(),
System.currentTimeMillis(), callback, size, timeout, timeUnit));
+ new QueueObject(clientResult.getF1(),
System.currentTimeMillis(), callback,
+ size, configure.getRequestTimeoutMs()));
if (queueObject != null) {
if (reqChkLoggCount.shouldPrint()) {
logger.warn("Sender({}) found message id {} has existed.",
@@ -407,11 +406,9 @@ public class Sender {
clientMgr.getStreamIdNum(encodeObject.getStreamId()));
}
}
- if (this.configure.isNeedDataEncry()) {
+ if (this.configure.isEnableDataEncrypt()) {
encodeObject.setEncryptEntry(true,
configure.getAuthSecretId(),
clientMgr.getEncryptConfigureInfo());
- } else {
- encodeObject.setEncryptEntry(false, null, null);
}
encodeObject.setMsgUUID(msgUUID);
clientResult.getF1().write(encodeObject);
@@ -501,7 +498,7 @@ public class Sender {
}
}
- /* Deal with unexpected exception. only used for asyc send */
+ /* Deal with unexpected exception. only used for async send */
public void waitForAckForChannel(Channel channel) {
if (channel == null) {
return;
@@ -513,14 +510,14 @@ public class Sender {
}
try {
while (!queueObjMap.isEmpty()) {
+ if (System.currentTimeMillis() - startTime >=
configure.getConCloseWaitPeriodMs()) {
+ break;
+ }
try {
- Thread.sleep(100);
+ Thread.sleep(100L);
} catch (InterruptedException ex1) {
//
}
- if (System.currentTimeMillis() - startTime >=
configure.getConCloseWaitPeriodMs()) {
- break;
- }
}
} catch (Throwable ex) {
if (exptCnt.shouldPrint()) {
@@ -559,13 +556,17 @@ public class Sender {
return clientMgr;
}
+ public ProxyClientConfig getConfigure() {
+ return configure;
+ }
+
private void checkCallbackList() {
// max wait for 1 min
try {
long startTime = System.currentTimeMillis();
while (currentBufferSize.get() > 0
&& System.currentTimeMillis() - startTime <
configure.getConCloseWaitPeriodMs()) {
- TimeUnit.MILLISECONDS.sleep(300);
+ Thread.sleep(300L);
}
if (currentBufferSize.get() > 0) {
logger.warn("Sender({}) callback size({}) not empty, force
quit!",
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
index bce2ad4468..fa3f2bf14e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
@@ -37,17 +37,14 @@ public class SyncMessageCallable implements
Callable<SendResult> {
private final NettyClient client;
private final CountDownLatch awaitLatch = new CountDownLatch(1);
private final EncodeObject encodeObject;
- private final long timeout;
- private final TimeUnit timeUnit;
+ private final long timeoutMs;
private SendResult message;
- public SyncMessageCallable(NettyClient client, EncodeObject encodeObject,
- long timeout, TimeUnit timeUnit) {
+ public SyncMessageCallable(NettyClient client, EncodeObject encodeObject,
long timeoutMs) {
this.client = client;
this.encodeObject = encodeObject;
- this.timeout = timeout;
- this.timeUnit = timeUnit;
+ this.timeoutMs = timeoutMs;
}
public void update(SendResult message) {
@@ -61,7 +58,7 @@ public class SyncMessageCallable implements
Callable<SendResult> {
return SendResult.WRITE_OVER_WATERMARK;
}
ChannelFuture channelFuture = client.write(encodeObject);
- awaitLatch.await(timeout, timeUnit);
+ awaitLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Throwable ex) {
if (exptCnt.shouldPrint()) {
logger.warn("SyncMessageCallable write data throw exception",
ex);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 39f8562b52..4d5459b8e1 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -193,8 +193,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
callBack.increaseRetry();
try {
if (callBack.getRetryCount() < 4) {
- sender.asyncSendMessage(encodeObject, callBack,
- String.valueOf(System.currentTimeMillis()), 20,
TimeUnit.SECONDS);
+ sender.asyncSendMessage(encodeObject, callBack,
String.valueOf(System.currentTimeMillis()));
} else {
logger.error("Send metric failure: {}",
encodeObject.getBodylist());
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
index f53af9b7aa..5211ba9907 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
/**
* Daemon threads to check timeout for asynchronous callback.
@@ -162,7 +161,6 @@ public class TimeoutScanThread extends Thread {
checkMessageIdBasedCallbacks(entry.getKey(),
entry.getValue());
}
checkTimeoutChannel();
- TimeUnit.SECONDS.sleep(1);
} catch (Throwable ex) {
if (exptCnt.shouldPrint()) {
logger.warn("TimeoutScanThread({}) throw exception",
sender.getInstanceId(), ex);
@@ -172,6 +170,14 @@ public class TimeoutScanThread extends Thread {
logger.info("TimeoutScanThread({}) scan, currentBufferSize={}",
sender.getInstanceId(),
sender.getCurrentBufferSize().get());
}
+ if (bShutDown) {
+ break;
+ }
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ //
+ }
}
logger.info("TimeoutScanThread({}) thread existed !",
sender.getInstanceId());
}