This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 36a9017c1f [INLONG-11589][SDK] Optimize the implementation of proxy
configuration management (#11590)
36a9017c1f is described below
commit 36a9017c1f92e8a907b7654f2e997ce7287e259a
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Dec 9 11:36:06 2024 +0800
[INLONG-11589][SDK] Optimize the implementation of proxy configuration
management (#11590)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/ConfigConstants.java | 48 +-
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 16 +-
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 328 +++---
.../sdk/dataproxy/config/ProxyConfigEntry.java | 33 +-
.../sdk/dataproxy/config/ProxyConfigManager.java | 1111 +++++++++++---------
.../sdk/dataproxy/example/HttpClientExample.java | 5 +-
.../sdk/dataproxy/example/TcpClientExample.java | 4 +-
.../inlong/sdk/dataproxy/network/ClientMgr.java | 37 +-
.../sdk/dataproxy/network/HttpProxySender.java | 20 +-
.../inlong/sdk/dataproxy/network/Sender.java | 19 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 8 +-
.../apache/inlong/sdk/dataproxy/utils/Tuple2.java | 71 ++
.../sdk/dataproxy/ProxyConfigManagerTest.java | 9 +-
.../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 2 +-
14 files changed, 995 insertions(+), 716 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 26f8d131b4..81b9f0dad0 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -21,6 +21,42 @@ public class ConfigConstants {
public static final String PROXY_SDK_VERSION = "1.2.11";
+ public static String HTTP = "http://";
+ public static String HTTPS = "https://";
+
+ // dataproxy node config
+ public static final String MANAGER_DATAPROXY_API =
"/inlong/manager/openapi/dataproxy/getIpList/";
+ public static final String META_STORE_SUB_DIR = "/.inlong/";
+ public static final String LOCAL_DP_CONFIG_FILE_SUFFIX = ".local";
+ public static final String REMOTE_DP_CACHE_FILE_SUFFIX = ".proxyip";
+ public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey";
+ // authorization key
+ public static final String BASIC_AUTH_HEADER = "authorization";
+
+ // config info sync interval in minutes
+ public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
+ public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
+ public static final int VAL_MAX_CONFIG_SYNC_INTERVAL_MIN = 30;
+ public static final long VAL_UNIT_MIN_TO_MS = 60 * 1000L;
+ // config info sync max retry if failure
+ public static final int VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL = 3;
+ public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5;
+ // cache config expired time in ms
+ public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L;
+ // node force choose interval in ms
+ public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L;
+ public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L;
+
+ // connection timeout in milliseconds
+ public static final int VAL_DEF_CONNECT_TIMEOUT_MS = 10000;
+ public static final int VAL_MIN_CONNECT_TIMEOUT_MS = 2000;
+ public static final int VAL_MAX_CONNECT_TIMEOUT_MS = 60000;
+ public static final int VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500;
+ // socket timeout in milliseconds
+ public static final int VAL_DEF_SOCKET_TIMEOUT_MS = 20000;
+ public static final int VAL_MIN_SOCKET_TIMEOUT_MS = 2000;
+ public static final int VAL_MAX_SOCKET_TIMEOUT_MS = 60000;
+
public static final int ALIVE_CONNECTIONS = 3;
public static final int MAX_TIMEOUT_CNT = 3;
public static final int LOAD_THRESHOLD = 0;
@@ -43,17 +79,11 @@ public class ConfigConstants {
/* one hour interval */
public static final int PROXY_HTTP_UPDATE_INTERVAL_MINUTES = 60;
- public static final int PROXY_UPDATE_MAX_RETRY = 10;
-
public static final int MAX_LINE_CNT = 30;
- // connection timeout in milliseconds
- public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L;
- public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L;
- public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L;
// request timeout in milliseconds
public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
- public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;
+ public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 500L;
public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
@@ -65,14 +95,10 @@ public class ConfigConstants {
public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
- public static final String MANAGER_DATAPROXY_API =
"/inlong/manager/openapi/dataproxy/getIpList/";
public static LoadBalance DEFAULT_LOAD_BALANCE = LoadBalance.ROBIN;
public static int DEFAULT_VIRTUAL_NODE = 1000;
public static int DEFAULT_RANDOM_MAX_RETRY = 1000;
- public static String HTTP = "http://";
- public static String HTTPS = "https://";
-
public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
/* Reserved attribute data size(bytes). */
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index dd22e63cce..b99f3726db 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,17 +108,20 @@ public class DefaultMessageSender implements
MessageSender {
}
LOGGER.info("Initial tcp sender, configure is {}", configure);
// initial sender object
- ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(configure, null);
- proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
- ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
- DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
+ ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(configure);
+ Tuple2<ProxyConfigEntry, String> result =
+ proxyConfigManager.getGroupIdConfigure(true);
+ if (result.getF0() == null) {
+ throw new Exception(result.getF1());
+ }
+ DefaultMessageSender sender =
CACHE_SENDER.get(result.getF0().getClusterId());
if (sender != null) {
return sender;
} else {
DefaultMessageSender tmpMessageSender =
new DefaultMessageSender(configure, selfDefineFactory);
- tmpMessageSender.setMaxPacketLength(entry.getMaxPacketLength());
- CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender);
+
tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength());
+ CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender);
return tmpMessageSender;
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index ce8a3b3b39..412b1950c5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -27,41 +27,40 @@ import org.apache.commons.lang3.StringUtils;
@Data
public class ProxyClientConfig {
+ private String managerIP = "";
+ private int managerPort = 8099;
+ private boolean visitManagerByHttp = true;
+ private boolean onlyUseLocalProxyConfig = false;
+ private int managerConnTimeoutMs =
ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS;
+ private int managerSocketTimeoutMs =
ConfigConstants.VAL_DEF_SOCKET_TIMEOUT_MS;
+ private long managerConfigSyncInrMs =
+ ConfigConstants.VAL_DEF_CONFIG_SYNC_INTERVAL_MIN *
ConfigConstants.VAL_UNIT_MIN_TO_MS;
+ private int configSyncMaxRetryIfFail =
ConfigConstants.VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL;
+ private String configStoreBasePath = System.getProperty("user.dir");
+ // max expired time for config cache.
+ private long configCacheExpiredMs =
ConfigConstants.VAL_DEF_CACHE_CONFIG_EXPIRED_MS;
+ // nodes force choose interval ms
+ private long forceReChooseInrMs =
ConfigConstants.VAL_DEF_FORCE_CHOOSE_INR_MS;
+ private boolean enableAuthentication = false;
+ private String authSecretId = "";
+ private String authSecretKey = "";
+
private int aliveConnections;
private int syncThreadPoolSize;
private int asyncCallbackSize;
- private int managerPort = 8099;
- private String managerIP = "";
- private String managerAddress;
- private String managerUrl = "";
- private int proxyUpdateIntervalMinutes;
- private int proxyUpdateMaxRetry;
private String inlongGroupId;
- private boolean requestByHttp = true;
private boolean isNeedDataEncry = false;
- private boolean needAuthentication = false;
- private String userName = "";
- private String secretKey = "";
private String rsaPubKeyUrl = "";
- private String confStoreBasePath = System.getProperty("user.dir") +
"/.inlong/";
private String tlsServerCertFilePathAndName;
private String tlsServerKey;
private String tlsVersion = "TLSv1.2";
private int maxTimeoutCnt = ConfigConstants.MAX_TIMEOUT_CNT;
- private String authSecretId;
- private String authSecretKey;
private String protocolType;
// metric configure
private MetricConfig metricConfig = new MetricConfig();
- private int managerConnectionTimeout = 10000;
- // http socket timeout in milliseconds
- private int managerSocketTimeout = 30 * 1000;
-
- private boolean readProxyIPFromLocal = false;
-
// connect timeout in milliseconds
private long connectTimeoutMs = ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS;
// request timeout in milliseconds
@@ -79,8 +78,6 @@ public class ProxyClientConfig {
// interval for async worker in microseconds.
private int asyncWorkerInterval = 500;
private boolean cleanHttpCacheWhenClosing = false;
- // max cache time for proxy config.
- private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
private int ioThreadNum = Runtime.getRuntime().availableProcessors();
private boolean enableBusyWait = false;
@@ -93,32 +90,30 @@ public class ProxyClientConfig {
private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT;
/* pay attention to the last url parameter ip */
- public ProxyClientConfig(String localHost, boolean requestByHttp, String
managerIp,
+ public ProxyClientConfig(String localHost, boolean visitManagerByHttp,
String managerIp,
int managerPort, String inlongGroupId, String authSecretId, String
authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws
ProxysdkException {
if (StringUtils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
}
if (StringUtils.isBlank(managerIp)) {
- throw new IllegalArgumentException("managerIp is Blank!");
+ throw new ProxysdkException("managerIp is Blank!");
+ }
+ if (managerPort <= 0) {
+ throw new ProxysdkException("managerPort <= 0!");
}
if (StringUtils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
- this.inlongGroupId = inlongGroupId;
- this.requestByHttp = requestByHttp;
+ this.inlongGroupId = inlongGroupId.trim();
+ this.visitManagerByHttp = visitManagerByHttp;
this.managerPort = managerPort;
this.managerIP = managerIp;
- this.managerAddress = getManagerAddress(managerIp, managerPort,
requestByHttp);
- this.managerUrl =
- getManagerUrl(managerAddress, inlongGroupId);
IpUtils.validLocalIp(localHost);
this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
- this.proxyUpdateIntervalMinutes =
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
this.proxyHttpUpdateIntervalMinutes =
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
- this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
this.authSecretId = authSecretId;
this.authSecretKey = authSecretKey;
this.loadBalance = loadBalance;
@@ -129,25 +124,15 @@ public class ProxyClientConfig {
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String managerAddress, String inlongGroupId,
String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws
ProxysdkException {
- if (StringUtils.isBlank(managerAddress) ||
(!managerAddress.startsWith(ConfigConstants.HTTP)
- && !managerAddress.startsWith(ConfigConstants.HTTPS))) {
- throw new ProxysdkException("managerAddress is blank or missing
http/https protocol ");
- }
+ checkAndParseAddress(managerAddress);
if (StringUtils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
- if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
- this.requestByHttp = false;
- }
- this.managerAddress = managerAddress;
- this.managerUrl = getManagerUrl(managerAddress, inlongGroupId);
- this.inlongGroupId = inlongGroupId;
+ this.inlongGroupId = inlongGroupId.trim();
this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
- this.proxyUpdateIntervalMinutes =
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
this.proxyHttpUpdateIntervalMinutes =
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
- this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
this.authSecretId = authSecretId;
this.authSecretKey = authSecretKey;
this.loadBalance = loadBalance;
@@ -155,21 +140,9 @@ public class ProxyClientConfig {
this.maxRetry = maxRetry;
}
- private String getManagerUrl(String managerAddress, String inlongGroupId) {
- return managerAddress + ConfigConstants.MANAGER_DATAPROXY_API +
inlongGroupId;
- }
-
- private String getManagerAddress(String managerIp, int managerPort,
boolean requestByHttp) {
- String protocolType = ConfigConstants.HTTPS;
- if (requestByHttp) {
- protocolType = ConfigConstants.HTTP;
- }
- return protocolType + managerIp + ":" + managerPort;
- }
-
- public ProxyClientConfig(String localHost, boolean requestByHttp, String
managerIp, int managerPort,
+ public ProxyClientConfig(String localHost, boolean visitManagerByHttp,
String managerIp, int managerPort,
String inlongGroupId, String authSecretId, String authSecretKey)
throws ProxysdkException {
- this(localHost, requestByHttp, managerIp, managerPort, inlongGroupId,
authSecretId, authSecretKey,
+ this(localHost, visitManagerByHttp, managerIp, managerPort,
inlongGroupId, authSecretId, authSecretKey,
ConfigConstants.DEFAULT_LOAD_BALANCE,
ConfigConstants.DEFAULT_VIRTUAL_NODE,
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
}
@@ -181,40 +154,131 @@ public class ProxyClientConfig {
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
}
- public String getTlsServerCertFilePathAndName() {
- return tlsServerCertFilePathAndName;
+ public String getManagerIP() {
+ return managerIP;
}
- public String getTlsServerKey() {
- return tlsServerKey;
+ public int getManagerPort() {
+ return managerPort;
}
- public boolean isRequestByHttp() {
- return requestByHttp;
+ public boolean isVisitManagerByHttp() {
+ return visitManagerByHttp;
}
- public String getInlongGroupId() {
- return inlongGroupId;
+ public boolean isOnlyUseLocalProxyConfig() {
+ return onlyUseLocalProxyConfig;
}
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
+ public void setOnlyUseLocalProxyConfig(boolean onlyUseLocalProxyConfig) {
+ this.onlyUseLocalProxyConfig = onlyUseLocalProxyConfig;
}
- public int getManagerPort() {
- return managerPort;
+ public boolean isEnableAuthentication() {
+ return this.enableAuthentication;
}
- public String getManagerIP() {
- return managerIP;
+ public String getAuthSecretId() {
+ return authSecretId;
}
- public String getConfStoreBasePath() {
- return confStoreBasePath;
+ public String getAuthSecretKey() {
+ return authSecretKey;
}
- public void setConfStoreBasePath(String confStoreBasePath) {
- this.confStoreBasePath = confStoreBasePath;
+ public void setAuthenticationInfo(boolean needAuthentication, String
secretId, String secretKey) {
+ this.enableAuthentication = needAuthentication;
+ if (!this.enableAuthentication) {
+ return;
+ }
+ if (StringUtils.isBlank(secretId)) {
+ throw new IllegalArgumentException("secretId is Blank!");
+ }
+ if (StringUtils.isBlank(secretKey)) {
+ throw new IllegalArgumentException("secretKey is Blank!");
+ }
+ this.authSecretId = secretId.trim();
+ this.authSecretKey = secretKey.trim();
+ }
+
+ public long getManagerConfigSyncInrMs() {
+ return managerConfigSyncInrMs;
+ }
+
+ public void setManagerConfigSyncInrMin(int managerConfigSyncInrMin) {
+ int tmpValue =
+ Math.min(ConfigConstants.VAL_MAX_CONFIG_SYNC_INTERVAL_MIN,
+
Math.max(ConfigConstants.VAL_MIN_CONFIG_SYNC_INTERVAL_MIN,
managerConfigSyncInrMin));
+ this.managerConfigSyncInrMs = tmpValue *
ConfigConstants.VAL_UNIT_MIN_TO_MS;
+ }
+
+ public int getManagerConnTimeoutMs() {
+ return managerConnTimeoutMs;
+ }
+
+ public void setManagerConnTimeoutMs(int managerConnTimeoutMs) {
+ this.managerConnTimeoutMs =
+ Math.min(ConfigConstants.VAL_MAX_CONNECT_TIMEOUT_MS,
+ Math.max(ConfigConstants.VAL_MIN_CONNECT_TIMEOUT_MS,
managerConnTimeoutMs));
+ }
+
+ public int getManagerSocketTimeoutMs() {
+ return managerSocketTimeoutMs;
+ }
+
+ public void setManagerSocketTimeoutMs(int managerSocketTimeoutMs) {
+ this.managerSocketTimeoutMs =
+ Math.min(ConfigConstants.VAL_MAX_SOCKET_TIMEOUT_MS,
+ Math.max(ConfigConstants.VAL_MIN_SOCKET_TIMEOUT_MS,
managerSocketTimeoutMs));
+ }
+
+ public int getConfigSyncMaxRetryIfFail() {
+ return configSyncMaxRetryIfFail;
+ }
+
+ public void setConfigSyncMaxRetryIfFail(int configSyncMaxRetryIfFail) {
+ this.configSyncMaxRetryIfFail =
+ Math.min(configSyncMaxRetryIfFail,
ConfigConstants.VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL);
+ }
+
+ public String getConfigStoreBasePath() {
+ return configStoreBasePath;
+ }
+
+ public void setConfigStoreBasePath(String configStoreBasePath) {
+ if (StringUtils.isBlank(configStoreBasePath)) {
+ return;
+ }
+ this.configStoreBasePath = configStoreBasePath.trim();
+ }
+
+ public long getConfigCacheExpiredMs() {
+ return configCacheExpiredMs;
+ }
+
+ public void setConfigCacheExpiredMs(long configCacheExpiredMs) {
+ this.configCacheExpiredMs = configCacheExpiredMs;
+ }
+
+ public long getForceReChooseInrMs() {
+ return forceReChooseInrMs;
+ }
+
+ public void setForceReChooseInrMs(long forceReChooseInrMs) {
+ this.forceReChooseInrMs =
+ Math.max(ConfigConstants.VAL_MIN_FORCE_CHOOSE_INR_MS,
forceReChooseInrMs);
+ }
+
+ public String getTlsServerCertFilePathAndName() {
+ return tlsServerCertFilePathAndName;
+ }
+
+ public String getTlsServerKey() {
+ return tlsServerKey;
+ }
+
+ public String getInlongGroupId() {
+ return inlongGroupId;
}
public int getAliveConnections() {
@@ -244,10 +308,6 @@ public class ProxyClientConfig {
this.asyncCallbackSize = asyncCallbackSize;
}
- public String getManagerUrl() {
- return managerUrl;
- }
-
public int getMaxTimeoutCnt() {
return maxTimeoutCnt;
}
@@ -259,22 +319,6 @@ public class ProxyClientConfig {
this.maxTimeoutCnt = maxTimeoutCnt;
}
- public int getProxyUpdateIntervalMinutes() {
- return proxyUpdateIntervalMinutes;
- }
-
- public void setProxyUpdateIntervalMinutes(int proxyUpdateIntervalMinutes) {
- this.proxyUpdateIntervalMinutes = proxyUpdateIntervalMinutes;
- }
-
- public int getProxyUpdateMaxRetry() {
- return proxyUpdateMaxRetry;
- }
-
- public void setProxyUpdateMaxRetry(int proxyUpdateMaxRetry) {
- this.proxyUpdateMaxRetry = proxyUpdateMaxRetry;
- }
-
public long getConnectTimeoutMs() {
return connectTimeoutMs;
}
@@ -315,26 +359,6 @@ public class ProxyClientConfig {
return isNeedDataEncry;
}
- public boolean isNeedAuthentication() {
- return this.needAuthentication;
- }
-
- public void setAuthenticationInfo(boolean needAuthentication, boolean
needDataEncry,
- final String userName, final String secretKey) {
- this.needAuthentication = needAuthentication;
- this.isNeedDataEncry = needDataEncry;
- if (this.needAuthentication || this.isNeedDataEncry) {
- if (StringUtils.isBlank(userName)) {
- throw new IllegalArgumentException("userName is Blank!");
- }
- if (StringUtils.isBlank(secretKey)) {
- throw new IllegalArgumentException("secretKey is Blank!");
- }
- }
- this.userName = userName.trim();
- this.secretKey = secretKey.trim();
- }
-
public void setHttpsInfo(String tlsServerCertFilePathAndName, String
tlsServerKey) {
if (StringUtils.isBlank(tlsServerCertFilePathAndName)) {
throw new IllegalArgumentException("tlsServerCertFilePathAndName
is Blank!");
@@ -354,22 +378,6 @@ public class ProxyClientConfig {
this.tlsVersion = tlsVersion;
}
- public String getUserName() {
- return userName;
- }
-
- public String getSecretKey() {
- return secretKey;
- }
-
- public boolean isReadProxyIPFromLocal() {
- return readProxyIPFromLocal;
- }
-
- public void setReadProxyIPFromLocal(boolean readProxyIPFromLocal) {
- this.readProxyIPFromLocal = readProxyIPFromLocal;
- }
-
public int getProxyHttpUpdateIntervalMinutes() {
return proxyHttpUpdateIntervalMinutes;
}
@@ -402,14 +410,6 @@ public class ProxyClientConfig {
this.asyncWorkerInterval = asyncWorkerInterval;
}
- public int getManagerSocketTimeout() {
- return managerSocketTimeout;
- }
-
- public void setManagerSocketTimeout(int managerSocketTimeout) {
- this.managerSocketTimeout = managerSocketTimeout;
- }
-
public boolean isCleanHttpCacheWhenClosing() {
return cleanHttpCacheWhenClosing;
}
@@ -418,22 +418,6 @@ public class ProxyClientConfig {
this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing;
}
- public long getMaxProxyCacheTimeInMs() {
- return maxProxyCacheTimeInMs;
- }
-
- public void setMaxProxyCacheTimeInMs(long maxProxyCacheTimeInMs) {
- this.maxProxyCacheTimeInMs = maxProxyCacheTimeInMs;
- }
-
- public int getManagerConnectionTimeout() {
- return managerConnectionTimeout;
- }
-
- public void setManagerConnectionTimeout(int managerConnectionTimeout) {
- this.managerConnectionTimeout = managerConnectionTimeout;
- }
-
public MetricConfig getMetricConfig() {
return metricConfig;
}
@@ -495,4 +479,40 @@ public class ProxyClientConfig {
public void setSenderAttempt(int senderMaxAttempt) {
this.senderMaxAttempt = senderMaxAttempt;
}
+
+ private void checkAndParseAddress(String managerAddress) throws
ProxysdkException {
+ if (StringUtils.isBlank(managerAddress)
+ || (!managerAddress.startsWith(ConfigConstants.HTTP)
+ && !managerAddress.startsWith(ConfigConstants.HTTPS)))
{
+ throw new ProxysdkException("managerAddress is blank or missing
http/https protocol");
+ }
+ String hostPortInfo;
+ if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
+ this.visitManagerByHttp = false;
+ hostPortInfo =
managerAddress.substring(ConfigConstants.HTTPS.length() + 1);
+ } else {
+ hostPortInfo =
managerAddress.substring(ConfigConstants.HTTP.length() + 1);
+ }
+ if (StringUtils.isBlank(hostPortInfo)) {
+ throw new ProxysdkException("managerAddress must include host:port
info!");
+ }
+ String[] fields = hostPortInfo.split(":");
+ if (fields.length == 1) {
+ throw new ProxysdkException("managerAddress must include port
info!");
+ } else if (fields.length > 2) {
+ throw new ProxysdkException("managerAddress must only include
host:port info!");
+ }
+ if (StringUtils.isBlank(fields[0])) {
+ throw new ProxysdkException("managerAddress's host is blank!");
+ }
+ this.managerIP = fields[0].trim();
+ if (StringUtils.isBlank(fields[1])) {
+ throw new ProxysdkException("managerAddress's port is blank!");
+ }
+ try {
+ this.managerPort = Integer.parseInt(fields[1]);
+ } catch (Throwable ex) {
+ throw new ProxysdkException("managerAddress's port must be
number!");
+ }
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index 441c62abe5..e37b9b2c0a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -17,13 +17,14 @@
package org.apache.inlong.sdk.dataproxy.config;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
import java.util.Map;
public class ProxyConfigEntry implements java.io.Serializable {
private int clusterId;
private String groupId;
- private int size;
private Map<String, HostInfo> hostMap;
private int load;
private int switchStat;
@@ -59,16 +60,14 @@ public class ProxyConfigEntry implements
java.io.Serializable {
}
public void setHostMap(Map<String, HostInfo> hostMap) {
- this.size = hostMap.size();
this.hostMap = hostMap;
}
-
- public int getSize() {
- return size;
+ public boolean isNodesEmpty() {
+ return this.hostMap.isEmpty();
}
- public void setSize(int size) {
- this.size = size;
+ public int getSize() {
+ return hostMap.size();
}
public String getGroupId() {
@@ -87,13 +86,6 @@ public class ProxyConfigEntry implements
java.io.Serializable {
isInterVisit = interVisit;
}
- @Override
- public String toString() {
- return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ",
size=" + size + ", isInterVisit="
- + isInterVisit + ", groupId=" + groupId + ", switch=" +
switchStat + ", maxPacketLength="
- + maxPacketLength + "]";
- }
-
public int getClusterId() {
return clusterId;
}
@@ -101,4 +93,17 @@ public class ProxyConfigEntry implements
java.io.Serializable {
public void setClusterId(int clusterId) {
this.clusterId = clusterId;
}
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("clusterId", clusterId)
+ .append("groupId", groupId)
+ .append("hostMap", hostMap)
+ .append("load", load)
+ .append("switchStat", switchStat)
+ .append("isInterVisit", isInterVisit)
+ .append("maxPacketLength", maxPacketLength)
+ .toString();
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index d259cdff08..986c59457e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -21,20 +21,18 @@ import
org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.common.util.BasicAuth;
import org.apache.inlong.sdk.dataproxy.ConfigConstants;
-import org.apache.inlong.sdk.dataproxy.LoadBalance;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
-import org.apache.inlong.sdk.dataproxy.network.HashRing;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import com.google.gson.Gson;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
@@ -44,7 +42,6 @@ import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClients;
@@ -67,7 +64,7 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException;
@@ -77,8 +74,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -89,161 +87,199 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class ProxyConfigManager extends Thread {
- public static final String APPLICATION_JSON = "application/json";
private static final Logger logger =
LoggerFactory.getLogger(ProxyConfigManager.class);
- private final ProxyClientConfig clientConfig;
- private final ClientMgr clientManager;
- private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
+ private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
+ private static final LogCounter parseCounter = new LogCounter(10, 100000,
60 * 1000L);
+ private static final ReentrantReadWriteLock fileRw = new
ReentrantReadWriteLock();
+
+ private final String callerId;
+ private ProxyClientConfig clientConfig;
private final Gson gson = new Gson();
- private final HashRing hashRing = HashRing.getInstance();
- private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
- /* the status of the cluster.if this value is changed,we need rechoose
three proxy */
+ private final ClientMgr clientManager;
+ private final ThreadLocalRandom random = ThreadLocalRandom.current();
+ private final AtomicBoolean shutDown = new AtomicBoolean(false);
+ // proxy configure info
+ private String localProxyConfigStoreFile;
+ private String proxyConfigVisitUrl;
+ private String proxyConfigCacheFile;
+ private List<HostInfo> proxyInfoList = new ArrayList<>();
private int oldStat = 0;
- private String inlongGroupId;
private String localMd5;
- private boolean bShutDown = false;
- private long lstUpdatedTime = 0;
+ private long lstUpdateTime = 0;
+ // encrypt configure info
+ private String encryptConfigVisitUrl;
+ private String encryptConfigCacheFile;
private EncryptConfigEntry userEncryptConfigEntry;
- public ProxyConfigManager(final ProxyClientConfig configure, final
ClientMgr clientManager) {
- this.clientConfig = configure;
- this.clientManager = clientManager;
- this.hashRing.setVirtualNode(configure.getVirtualNode());
+ public ProxyConfigManager(ProxyClientConfig configure) {
+ this("MetaQuery", configure, null);
}
- public String getInlongGroupId() {
- return inlongGroupId;
+ public ProxyConfigManager(String callerId, ProxyClientConfig configure,
ClientMgr clientManager) {
+ this.callerId = callerId;
+ this.clientManager = clientManager;
+ this.storeAndBuildMetaConfigure(configure);
+ if (this.clientManager != null) {
+ this.setName("ConfigManager-" + this.callerId);
+ logger.info("ConfigManager({}) started, groupId={}",
+ this.callerId, clientConfig.getInlongGroupId());
+ }
}
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
+ /**
+ * Update proxy client configure for query case
+ *
+ * @param configure proxy client configure
+ * @throws Exception exception
+ */
+ public void updProxyClientConfig(ProxyClientConfig configure) throws
Exception {
+ if (configure == null) {
+ throw new Exception("ProxyClientConfig is null");
+ }
+ if (this.clientManager != null) {
+ throw new Exception("Not allowed for non meta-query case!");
+ }
+ if (shutDown.get()) {
+ return;
+ }
+ this.storeAndBuildMetaConfigure(configure);
}
public void shutDown() {
- logger.info("Begin to shut down ProxyConfigManager!");
- bShutDown = true;
- }
-
- @Override
- public void run() {
- while (!bShutDown) {
- try {
- doProxyEntryQueryWork();
- updateEncryptConfigEntry();
- logger.info("ProxyConf update!");
- } catch (Throwable e) {
- logger.error("Refresh proxy ip list runs into exception {},
{}", e.toString(), e.getStackTrace());
- e.printStackTrace();
- }
-
- /* Sleep some time.240-360s */
- try {
- Random random = new Random();
- int proxyUpdateIntervalSec =
this.clientConfig.getProxyUpdateIntervalMinutes() * 60;
-
- int sleepTimeSec = proxyUpdateIntervalSec;
- if (proxyUpdateIntervalSec > 5) {
- sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() %
(proxyUpdateIntervalSec / 5);
- }
- logger.info("sleep time {}", sleepTimeSec);
- Thread.sleep(sleepTimeSec * 1000);
- } catch (Throwable e2) {
- //
- }
+ if (clientManager == null) {
+ return;
+ }
+ if (shutDown.compareAndSet(false, true)) {
+ this.interrupt();
+ logger.info("ConfigManager({}) begin to shutdown, groupId={}!",
+ this.callerId, clientConfig.getInlongGroupId());
}
- logger.info("ProxyConfigManager worker existed!");
}
/**
- * try to read cache of proxy entry
+ * get groupId config
*
- * @return
+ * @return proxyConfigEntry
+ * @throws Exception ex
*/
- private ProxyConfigEntry tryToReadCacheProxyEntry(String configCachePath) {
- rw.readLock().lock();
- try {
- File file = new File(configCachePath);
- long diffTime = System.currentTimeMillis() - file.lastModified();
-
- if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
- JsonReader reader = new JsonReader(new
FileReader(configCachePath));
- ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader,
ProxyConfigEntry.class);
- logger.info("{} has a backup! {}", inlongGroupId,
proxyConfigEntry);
- return proxyConfigEntry;
- }
- } catch (Exception ex) {
- logger.warn("try to read local cache, caught {}", ex.getMessage());
- } finally {
- rw.readLock().unlock();
+ public Tuple2<ProxyConfigEntry, String> getGroupIdConfigure(boolean
needRetry) throws Exception {
+ if (shutDown.get()) {
+ return new Tuple2<>(null, "SDK has shutdown!");
}
- return null;
- }
-
- private void tryToWriteCacheProxyEntry(ProxyConfigEntry entry, String
configCachePath) {
- rw.writeLock().lock();
- try {
- File file = new File(configCachePath);
- if (!file.getParentFile().exists()) {
- // try to create parent
- file.getParentFile().mkdirs();
+ if (clientConfig.isOnlyUseLocalProxyConfig()) {
+ return getLocalProxyListFromFile(this.localProxyConfigStoreFile);
+ } else {
+ boolean readFromRmt = false;
+ Tuple2<ProxyConfigEntry, String> result;
+ result = tryToReadCacheProxyEntry();
+ if (result.getF0() == null) {
+ int retryCount = 0;
+ do {
+ result = requestProxyEntryQuietly();
+ if (result.getF0() != null || !needRetry ||
shutDown.get()) {
+ if (result.getF0() != null) {
+ readFromRmt = true;
+ }
+ break;
+ }
+ // sleep then retry
+ TimeUnit.MILLISECONDS.sleep(500);
+ } while (++retryCount <
clientConfig.getConfigSyncMaxRetryIfFail());
}
- logger.info("try to write {}} to local cache {}", entry,
configCachePath);
- FileWriter fileWriter = new FileWriter(configCachePath);
- gson.toJson(entry, fileWriter);
- fileWriter.flush();
- fileWriter.close();
- } catch (Exception ex) {
- logger.warn("try to write local cache, caught {}",
ex.getMessage());
- } finally {
- rw.writeLock().unlock();
- }
- }
-
- private ProxyConfigEntry requestProxyEntryQuietly() {
- try {
- return requestProxyList(this.clientConfig.getManagerUrl());
- } catch (Exception e) {
- logger.warn("try to request proxy list by http, caught {}",
e.getMessage());
+ if (shutDown.get()) {
+ return new Tuple2<>(null, "SDK has shutdown!");
+ }
+ if (result.getF0() == null) {
+ return new Tuple2<>(null, "Visit manager error:" +
result.getF1());
+ } else if (readFromRmt) {
+ tryToWriteCacheProxyEntry(result.getF0());
+ }
+ return result;
}
- return null;
}
/**
- * get groupId config
+ * get encrypt config
*
* @return proxyConfigEntry
- * @throws Exception
+ * @throws Exception ex
*/
- public ProxyConfigEntry getGroupIdConfigure() throws Exception {
- ProxyConfigEntry proxyEntry;
- String configAddr = clientConfig.getConfStoreBasePath() +
inlongGroupId;
- if (this.clientConfig.isReadProxyIPFromLocal()) {
- configAddr = configAddr + ".local";
- proxyEntry = getLocalProxyListFromFile(configAddr);
- } else {
- configAddr = configAddr + ".proxyip";
-
- proxyEntry = tryToReadCacheProxyEntry(configAddr);
- if (proxyEntry == null) {
- proxyEntry = requestProxyEntryQuietly();
- int requestCount = 0;
-
- while (requestCount < 3 && proxyEntry == null) {
- proxyEntry = requestProxyEntryQuietly();
- requestCount += 1;
- if (proxyEntry == null) {
- // sleep then retry
- TimeUnit.MILLISECONDS.sleep(500);
+ public Tuple2<EncryptConfigEntry, String> getEncryptConfigure(boolean
needRetry) throws Exception {
+ if (!clientConfig.isNeedDataEncry()) {
+ return new Tuple2<>(null, "Not need data encrypt!");
+ }
+ if (shutDown.get()) {
+ return new Tuple2<>(null, "SDK has shutdown!");
+ }
+ EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
+ if (encryptEntry != null) {
+ return new Tuple2<>(encryptEntry, "Ok");
+ }
+ boolean readFromRmt = false;
+ Tuple2<EncryptConfigEntry, String> result = readCachedPubKeyEntry();
+ if (result.getF0() == null) {
+ int retryCount = 0;
+ do {
+ result = requestPubKeyFromManager();
+ if (result.getF0() != null || !needRetry || shutDown.get()) {
+ if (result.getF0() != null) {
+ readFromRmt = true;
}
+ break;
+ }
+ // sleep then retry
+ TimeUnit.MILLISECONDS.sleep(500);
+ } while (++retryCount <
clientConfig.getConfigSyncMaxRetryIfFail());
+ }
+ if (shutDown.get()) {
+ return new Tuple2<>(null, "SDK has shutdown!");
+ }
+ if (result.getF0() == null) {
+ return new Tuple2<>(null, "Visit manager error:" + result.getF1());
+ } else if (readFromRmt) {
+ updateEncryptConfigEntry(result.getF0());
+ writeCachePubKeyEntryFile(result.getF0());
+ }
+ return result;
+ }
+
+ @Override
+ public void run() {
+ logger.info("ConfigManager({}) thread start, groupId={}",
+ this.callerId, clientConfig.getInlongGroupId());
+ while (!shutDown.get()) {
+ // update proxy nodes meta configures
+ try {
+ doProxyEntryQueryWork();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) refresh proxy configure
exception, groupId={}",
+ this.callerId, clientConfig.getInlongGroupId(),
ex);
}
}
- if (proxyEntry == null) {
- throw new Exception("Visit manager error, please check log!");
- } else {
- tryToWriteCacheProxyEntry(proxyEntry, configAddr);
+ // update encrypt configure
+ if (clientConfig.isNeedDataEncry()) {
+ try {
+ doEncryptConfigEntryQueryWork();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) refresh encrypt info
exception, groupId={}",
+ this.callerId,
clientConfig.getInlongGroupId(), ex);
+ }
+ }
+ }
+ if (shutDown.get()) {
+ break;
+ }
+ // sleep some time
+ try {
+ Thread.sleep(clientConfig.getManagerConfigSyncInrMs() +
random.nextInt(100) * 100);
+ } catch (Throwable e2) {
+ //
}
}
- return proxyEntry;
+ logger.info("ConfigManager({}) worker existed, groupId={}",
+ this.callerId, this.clientConfig.getInlongGroupId());
}
/**
@@ -252,55 +288,140 @@ public class ProxyConfigManager extends Thread {
* @throws Exception
*/
public void doProxyEntryQueryWork() throws Exception {
+ if (shutDown.get()) {
+ return;
+ }
/* Request the configuration from manager. */
if (localMd5 == null) {
localMd5 = calcHostInfoMd5(proxyInfoList);
}
- ProxyConfigEntry proxyEntry = null;
- String configAddr = clientConfig.getConfStoreBasePath() +
inlongGroupId;
- if (clientConfig.isReadProxyIPFromLocal()) {
- configAddr = configAddr + ".local";
- proxyEntry = getLocalProxyListFromFile(configAddr);
+ Tuple2<ProxyConfigEntry, String> result;
+ if (clientConfig.isOnlyUseLocalProxyConfig()) {
+ result = getLocalProxyListFromFile(this.localProxyConfigStoreFile);
} else {
- /* Do a compare and see if it needs to re-choose the channel. */
- configAddr = configAddr + ".managerip";
- int retryCount = 1;
- while (proxyEntry == null && retryCount <
this.clientConfig.getProxyUpdateMaxRetry()) {
- proxyEntry = requestProxyEntryQuietly();
- retryCount++;
- if (proxyEntry == null) {
- // sleep then retry.
- TimeUnit.SECONDS.sleep(1);
+ int retryCnt = 0;
+ do {
+ result = requestProxyEntryQuietly();
+ if (result.getF0() != null || shutDown.get()) {
+ break;
}
+ // sleep then retry.
+ TimeUnit.SECONDS.sleep(2);
+ } while (++retryCnt <
this.clientConfig.getConfigSyncMaxRetryIfFail() && !shutDown.get());
+ if (shutDown.get()) {
+ return;
}
- if (proxyEntry != null) {
- tryToWriteCacheProxyEntry(proxyEntry, configAddr);
+ if (result.getF0() != null) {
+ tryToWriteCacheProxyEntry(result.getF0());
}
- /* We should exit if no local IP list and can't request it from
manager. */
- if (localMd5 == null && proxyEntry == null) {
- logger.error("Can't connect manager at the start of proxy API
{}",
- this.clientConfig.getManagerUrl());
- proxyEntry = tryToReadCacheProxyEntry(configAddr);
+ /* We should exit if no local IP list and can't request it from
TDManager. */
+ if (localMd5 == null && result.getF0() == null) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) connect manager({})
failure, get cached configure, groupId={}",
+ this.callerId, this.proxyConfigVisitUrl,
this.clientConfig.getInlongGroupId());
+ }
+ result = tryToReadCacheProxyEntry();
}
- if (localMd5 != null && proxyEntry == null && proxyInfoList !=
null) {
- StringBuffer s = new StringBuffer();
- for (HostInfo tmp : proxyInfoList) {
-
s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber())
- .append(",");
+ if (localMd5 != null && result.getF0() == null && proxyInfoList !=
null) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) connect manager({})
failure, using the last configure, groupId={}",
+ this.callerId, this.proxyConfigVisitUrl,
this.clientConfig.getInlongGroupId());
}
- logger.warn("Backup proxyEntry [{}]", s);
}
}
- if (localMd5 == null && proxyEntry == null && proxyInfoList == null) {
- if (clientConfig.isReadProxyIPFromLocal()) {
- throw new Exception("Local proxy address configure "
- + "read failure, please check first!");
+ if (localMd5 == null && result.getF0() == null && proxyInfoList ==
null) {
+ if (clientConfig.isOnlyUseLocalProxyConfig()) {
+ throw new Exception("Read local proxy configure failure,
please check first!");
} else {
throw new Exception("Connect Manager failure, please check
first!");
}
}
- compareProxyList(proxyEntry);
+ compareAndUpdateProxyList(result.getF0());
+ }
+
+ private void doEncryptConfigEntryQueryWork() throws Exception {
+ if (shutDown.get()) {
+ return;
+ }
+ int retryCount = 0;
+ Tuple2<EncryptConfigEntry, String> result;
+ do {
+ result = requestPubKeyFromManager();
+ if (result.getF0() != null || shutDown.get()) {
+ break;
+ }
+ // sleep then retry
+ TimeUnit.MILLISECONDS.sleep(500);
+ } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail());
+ if (shutDown.get()) {
+ return;
+ }
+ if (result.getF0() == null) {
+ if (this.userEncryptConfigEntry != null) {
+ logger.warn("ConfigManager({}) connect manager({}) failure,
using the last pubKey, secretId={}",
+ this.callerId, this.encryptConfigVisitUrl,
this.clientConfig.getAuthSecretId());
+ return;
+ }
+ throw new Exception("Visit manager error:" + result.getF1());
+ }
+ updateEncryptConfigEntry(result.getF0());
+ writeCachePubKeyEntryFile(result.getF0());
+ }
+
+ public Tuple2<ProxyConfigEntry, String> getLocalProxyListFromFile(String
filePath) {
+ String strRet;
+ try {
+ byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
+ strRet = new String(fileBytes);
+ } catch (Throwable ex) {
+ return new Tuple2<>(null, "Read local configure failure from "
+ + filePath + ", reason is " + ex.getMessage());
+ }
+ if (StringUtils.isBlank(strRet)) {
+ return new Tuple2<>(null, "Blank configure local file from " +
filePath);
+ }
+ return getProxyConfigEntry(strRet);
+ }
+ private Tuple2<ProxyConfigEntry, String> requestProxyEntryQuietly() {
+ List<BasicNameValuePair> params = buildProxyNodeQueryParams();
+ // request meta info from manager
+ logger.debug("ConfigManager({}) request configure to manager({}),
param={}",
+ this.callerId, this.proxyConfigVisitUrl, params);
+ Tuple2<Boolean, String> queryResult =
requestConfiguration(this.proxyConfigVisitUrl, params);
+ if (!queryResult.getF0()) {
+ return new Tuple2<>(null, queryResult.getF1());
+ }
+ // parse result
+ logger.debug("ConfigManager({}) received configure, from manager({}),
groupId={}, result={}",
+ callerId, proxyConfigVisitUrl,
clientConfig.getInlongGroupId(), queryResult.getF1());
+ try {
+ return getProxyConfigEntry(queryResult.getF1());
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) parse failure, from
manager({}), groupId={}, result={}",
+ callerId, proxyConfigVisitUrl,
clientConfig.getInlongGroupId(), queryResult.getF1(), ex);
+ }
+ return new Tuple2<>(null, ex.getMessage());
+ }
+ }
+
+ private String calcHostInfoMd5(List<HostInfo> hostInfoList) {
+ if (hostInfoList == null || hostInfoList.isEmpty()) {
+ return null;
+ }
+ Collections.sort(hostInfoList);
+ StringBuilder hostInfoMd5 = new StringBuilder();
+ for (HostInfo hostInfo : hostInfoList) {
+ if (hostInfo == null) {
+ continue;
+ }
+ hostInfoMd5.append(hostInfo.getHostName());
+ hostInfoMd5.append(":");
+ hostInfoMd5.append(hostInfo.getPortNumber());
+ hostInfoMd5.append(";");
+ }
+ return DigestUtils.md5Hex(hostInfoMd5.toString());
}
/**
@@ -308,139 +429,206 @@ public class ProxyConfigManager extends Thread {
*
* @param proxyEntry
*/
- private void compareProxyList(ProxyConfigEntry proxyEntry) {
- if (proxyEntry != null) {
- logger.info("{}", proxyEntry.toString());
- if (proxyEntry.getSize() != 0) {
- /* Initialize the current proxy information list first. */
- clientManager.setLoadThreshold(proxyEntry.getLoad());
-
- List<HostInfo> newProxyInfoList = new ArrayList<HostInfo>();
- for (Map.Entry<String, HostInfo> entry :
proxyEntry.getHostMap().entrySet()) {
- newProxyInfoList.add(entry.getValue());
- }
-
- String newMd5 = calcHostInfoMd5(newProxyInfoList);
- String oldMd5 = calcHostInfoMd5(proxyInfoList);
- if (newMd5 != null && !newMd5.equals(oldMd5)) {
- /* Choose random alive connections to send messages. */
- logger.info("old md5 {} new md5 {}", oldMd5, newMd5);
- proxyInfoList.clear();
- proxyInfoList = newProxyInfoList;
- clientManager.setProxyInfoList(proxyInfoList);
- lstUpdatedTime = System.currentTimeMillis();
- } else if (proxyEntry.getSwitchStat() != oldStat) {
- /* judge cluster's switch state */
- oldStat = proxyEntry.getSwitchStat();
- if ((System.currentTimeMillis() - lstUpdatedTime) > 3 * 60
* 1000) {
- logger.info("switch the cluster!");
- proxyInfoList.clear();
- proxyInfoList = newProxyInfoList;
- clientManager.setProxyInfoList(proxyInfoList);
- } else {
- logger.info("only change oldStat ");
- }
- } else {
- newProxyInfoList.clear();
- logger.info("proxy IP list doesn't change, load {}",
proxyEntry.getLoad());
- }
- if (clientConfig.getLoadBalance() ==
LoadBalance.CONSISTENCY_HASH) {
- updateHashRing(proxyInfoList);
- }
- } else {
- logger.error("proxyEntry's size is zero");
+ private void compareAndUpdateProxyList(ProxyConfigEntry proxyEntry) {
+ if ((proxyEntry == null || proxyEntry.isNodesEmpty())
+ && (proxyInfoList.isEmpty()
+ || (System.currentTimeMillis() - lstUpdateTime) <
clientConfig.getForceReChooseInrMs())) {
+ return;
+ }
+ int newSwitchStat;
+ List<HostInfo> newBusInfoList;
+ if (proxyEntry == null || proxyEntry.isNodesEmpty()) {
+ newSwitchStat = oldStat;
+ newBusInfoList = new ArrayList<>(proxyInfoList.size());
+ newBusInfoList.addAll(proxyInfoList);
+ } else {
+ /* Initialize the current nodes information list first. */
+ clientManager.setLoadThreshold(proxyEntry.getLoad());
+ newSwitchStat = proxyEntry.getSwitchStat();
+ newBusInfoList = new ArrayList<>(proxyEntry.getSize());
+ for (Map.Entry<String, HostInfo> entry :
proxyEntry.getHostMap().entrySet()) {
+ newBusInfoList.add(entry.getValue());
}
}
+ String newMd5 = calcHostInfoMd5(newBusInfoList);
+ String oldMd5 = calcHostInfoMd5(proxyInfoList);
+ boolean nodeChanged = newMd5 != null && !newMd5.equals(oldMd5);
+ if (nodeChanged || newSwitchStat != oldStat
+ || (System.currentTimeMillis() - lstUpdateTime) >=
clientConfig.getForceReChooseInrMs()) {
+ proxyInfoList = newBusInfoList;
+ clientManager.setProxyInfoList(proxyInfoList);
+ lstUpdateTime = System.currentTimeMillis();
+ oldStat = newSwitchStat;
+ }
}
- public EncryptConfigEntry getEncryptConfigEntry(final String userName) {
- if (StringUtils.isBlank(userName)) {
- return null;
+ private void tryToWriteCacheProxyEntry(ProxyConfigEntry entry) {
+ logger.debug("ConfigManager({}) write {} to cache file ({})",
+ this.callerId, entry, this.proxyConfigCacheFile);
+ fileRw.writeLock().lock();
+ try {
+ File file = new File(this.proxyConfigCacheFile);
+ if (!file.getParentFile().exists()) {
+ // try to create parent
+ file.getParentFile().mkdirs();
+ }
+ FileWriter fileWriter = new FileWriter(this.proxyConfigCacheFile);
+ gson.toJson(entry, fileWriter);
+ fileWriter.flush();
+ fileWriter.close();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) write cache file({}) exception,
groupId={}, data={}",
+ this.callerId, this.clientConfig.getInlongGroupId(),
+ this.proxyConfigCacheFile, entry.toString(), ex);
+ }
+ } finally {
+ fileRw.writeLock().unlock();
}
- EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
- if (encryptEntry == null) {
- int retryCount = 0;
- encryptEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(),
userName, false);
- while (encryptEntry == null && retryCount <
this.clientConfig.getProxyUpdateMaxRetry()) {
- encryptEntry =
requestPubKey(this.clientConfig.getRsaPubKeyUrl(), userName, false);
- retryCount++;
- }
- if (encryptEntry == null) {
- encryptEntry = getStoredPubKeyEntry(userName);
- if (encryptEntry != null) {
- encryptEntry.getRsaEncryptedKey();
- synchronized (this) {
- if (this.userEncryptConfigEntry == null) {
- this.userEncryptConfigEntry = encryptEntry;
- } else {
- encryptEntry = this.userEncryptConfigEntry;
- }
- }
+ }
+
+ /**
+ * try to read cache of proxy entry
+ *
+ * @return read result
+ */
+ private Tuple2<ProxyConfigEntry, String> tryToReadCacheProxyEntry() {
+ fileRw.readLock().lock();
+ try {
+ File file = new File(this.proxyConfigCacheFile);
+ if (file.exists()) {
+ long diffTime = System.currentTimeMillis() -
file.lastModified();
+ if (clientConfig.getConfigCacheExpiredMs() > 0
+ && diffTime < clientConfig.getConfigCacheExpiredMs()) {
+ JsonReader reader = new JsonReader(new
FileReader(this.proxyConfigCacheFile));
+ ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader,
ProxyConfigEntry.class);
+ return new Tuple2<>(proxyConfigEntry, "Ok");
}
+ return new Tuple2<>(null, "cache configure expired!");
} else {
- synchronized (this) {
- if (this.userEncryptConfigEntry == null ||
this.userEncryptConfigEntry != encryptEntry) {
- storePubKeyEntry(encryptEntry);
- encryptEntry.getRsaEncryptedKey();
- this.userEncryptConfigEntry = encryptEntry;
- } else {
- encryptEntry = this.userEncryptConfigEntry;
- }
- }
+ return new Tuple2<>(null, "no cache configure!");
+ }
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) read cache file({}) exception,
groupId={}",
+ this.callerId, this.proxyConfigCacheFile,
this.clientConfig.getInlongGroupId(), ex);
}
+ return new Tuple2<>(null, "read cache configure failure:" +
ex.getMessage());
+ } finally {
+ fileRw.readLock().unlock();
}
- return encryptEntry;
}
- private void updateEncryptConfigEntry() {
- if (StringUtils.isBlank(this.clientConfig.getUserName())) {
- return;
+ private Tuple2<EncryptConfigEntry, String> requestPubKeyFromManager() {
+ List<BasicNameValuePair> params = buildPubKeyQueryParams();
+ // request meta info from manager
+ logger.debug("ConfigManager({}) request pubkey to manager({}),
param={}",
+ this.callerId, this.encryptConfigVisitUrl, params);
+ Tuple2<Boolean, String> queryResult =
requestConfiguration(this.encryptConfigVisitUrl, params);
+ if (!queryResult.getF0()) {
+ return new Tuple2<>(null, queryResult.getF1());
+ }
+ logger.debug("ConfigManager({}) received pubkey from manager({}),
result={}",
+ this.callerId, this.encryptConfigVisitUrl,
queryResult.getF1());
+ JsonObject pubKeyConf;
+ try {
+ pubKeyConf =
JsonParser.parseString(queryResult.getF1()).getAsJsonObject();
+ } catch (Throwable ex) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) parse failure, secretId={},
config={}!",
+ this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
+ }
+ return new Tuple2<>(null, "parse pubkey failure:" +
ex.getMessage());
}
- int retryCount = 0;
- EncryptConfigEntry encryptConfigEntry =
requestPubKey(this.clientConfig.getRsaPubKeyUrl(),
- this.clientConfig.getUserName(), false);
- while (encryptConfigEntry == null && retryCount <
this.clientConfig.getProxyUpdateMaxRetry()) {
- encryptConfigEntry =
requestPubKey(this.clientConfig.getRsaPubKeyUrl(),
- this.clientConfig.getUserName(), false);
- retryCount++;
- }
- if (encryptConfigEntry == null) {
- return;
+ if (pubKeyConf == null) {
+ return new Tuple2<>(null, "No public key information");
+ }
+ if (!pubKeyConf.has("resultCode")) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure: resultCode
field not exist, secretId={}, config={}!",
+ this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
+ }
+ return new Tuple2<>(null, "resultCode field not exist");
+ }
+ int resultCode = pubKeyConf.get("resultCode").getAsInt();
+ if (resultCode != 0) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure: resultCode !=
0, secretId={}, config={}!",
+ this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
+ }
+ return new Tuple2<>(null, "resultCode != 0!");
}
- synchronized (this) {
- if (this.userEncryptConfigEntry == null ||
this.userEncryptConfigEntry != encryptConfigEntry) {
- storePubKeyEntry(encryptConfigEntry);
- encryptConfigEntry.getRsaEncryptedKey();
- this.userEncryptConfigEntry = encryptConfigEntry;
+ if (!pubKeyConf.has("resultData")) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure: resultData
field not exist, secretId={}, config={}!",
+ this.callerId, this.clientConfig.getAuthSecretId(),
queryResult.getF1());
}
+ return new Tuple2<>(null, "resultData field not exist");
}
- return;
+ JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
+ if (resultData != null) {
+ String publicKey = resultData.get("publicKey").getAsString();
+ if (StringUtils.isBlank(publicKey)) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure: publicKey
is blank, secretId={}, config={}!",
+ this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+ }
+ return new Tuple2<>(null, "publicKey is blank!");
+ }
+ String username = resultData.get("username").getAsString();
+ if (StringUtils.isBlank(username)) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure: username is
blank, secretId={}, config={}!",
+ this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+ }
+ return new Tuple2<>(null, "username is blank!");
+ }
+ String versionStr = resultData.get("version").getAsString();
+ if (StringUtils.isBlank(versionStr)) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) config failure: version is
blank, secretId={}, config={}!",
+ this.callerId,
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+ }
+ return new Tuple2<>(null, "version is blank!");
+ }
+ return new Tuple2<>(new EncryptConfigEntry(username, versionStr,
publicKey), "Ok");
+ }
+ return new Tuple2<>(null, "resultData value is null!");
}
- private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
- if (StringUtils.isBlank(userName)) {
- logger.warn(" userName(" + userName + ") is not available");
- return null;
- }
- EncryptConfigEntry entry;
+ private void updateEncryptConfigEntry(EncryptConfigEntry newEncryptEntry) {
+ newEncryptEntry.getRsaEncryptedKey();
+ this.userEncryptConfigEntry = newEncryptEntry;
+ }
+
+ private Tuple2<EncryptConfigEntry, String> readCachedPubKeyEntry() {
+ ObjectInputStream is;
FileInputStream fis = null;
- ObjectInputStream is = null;
- rw.readLock().lock();
+ EncryptConfigEntry entry;
+ fileRw.readLock().lock();
try {
- File file = new File(clientConfig.getConfStoreBasePath() +
userName + ".pubKey");
+ File file = new File(this.encryptConfigCacheFile);
if (file.exists()) {
- fis = new FileInputStream(file);
- is = new ObjectInputStream(fis);
- entry = (EncryptConfigEntry) is.readObject();
- // is.close();
- fis.close();
- return entry;
+ long diffTime = System.currentTimeMillis() -
file.lastModified();
+ if (clientConfig.getConfigCacheExpiredMs() > 0
+ && diffTime < clientConfig.getConfigCacheExpiredMs()) {
+ fis = new FileInputStream(file);
+ is = new ObjectInputStream(fis);
+ entry = (EncryptConfigEntry) is.readObject();
+ // is.close();
+ fis.close();
+ return new Tuple2<>(entry, "Ok");
+ }
+ return new Tuple2<>(null, "cache PubKeyEntry expired!");
} else {
- return null;
+ return new Tuple2<>(null, "no PubKeyEntry file!");
}
- } catch (Throwable e1) {
- logger.error("Read " + userName + " stored PubKeyEntry error ",
e1);
- return null;
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) read({}) file exception,
secretId={}",
+ callerId, encryptConfigCacheFile,
clientConfig.getAuthSecretId(), ex);
+ }
+ return new Tuple2<>(null, "read PubKeyEntry file failure:" +
ex.getMessage());
} finally {
if (fis != null) {
try {
@@ -449,16 +637,16 @@ public class ProxyConfigManager extends Thread {
//
}
}
- rw.readLock().unlock();
+ fileRw.readLock().unlock();
}
}
- private void storePubKeyEntry(EncryptConfigEntry entry) {
+ private void writeCachePubKeyEntryFile(EncryptConfigEntry entry) {
+ ObjectOutputStream p;
FileOutputStream fos = null;
- ObjectOutputStream p = null;
- rw.writeLock().lock();
+ fileRw.writeLock().lock();
try {
- File file = new File(clientConfig.getConfStoreBasePath() +
entry.getUserName() + ".pubKey");
+ File file = new File(this.encryptConfigCacheFile);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdir();
}
@@ -470,9 +658,11 @@ public class ProxyConfigManager extends Thread {
p.writeObject(entry);
p.flush();
// p.close();
- } catch (Throwable e) {
- logger.error("store EncryptConfigEntry " + entry.toString() + "
exception ", e);
- e.printStackTrace();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) write file({}) exception,
secretId={}, content={}",
+ callerId, encryptConfigCacheFile,
clientConfig.getAuthSecretId(), entry.toString(), ex);
+ }
} finally {
if (fos != null) {
try {
@@ -481,153 +671,192 @@ public class ProxyConfigManager extends Thread {
//
}
}
- rw.writeLock().unlock();
+ fileRw.writeLock().unlock();
}
}
- private String calcHostInfoMd5(List<HostInfo> hostInfoList) {
- if (hostInfoList == null || hostInfoList.isEmpty()) {
- return null;
- }
- Collections.sort(hostInfoList);
- StringBuffer hostInfoMd5 = new StringBuffer();
- for (HostInfo hostInfo : hostInfoList) {
- if (hostInfo == null) {
- continue;
+ /* Request new configurations from Manager. */
+ private Tuple2<Boolean, String> requestConfiguration(String url,
List<BasicNameValuePair> params) {
+ HttpParams myParams = new BasicHttpParams();
+ HttpConnectionParams.setConnectionTimeout(myParams,
clientConfig.getManagerConnTimeoutMs());
+ HttpConnectionParams.setSoTimeout(myParams,
clientConfig.getManagerSocketTimeoutMs());
+ CloseableHttpClient httpClient;
+ // build http(s) client
+ try {
+ if (this.clientConfig.isVisitManagerByHttp()) {
+ httpClient = new DefaultHttpClient(myParams);
+ } else {
+ httpClient = getCloseableHttpClient(params);
}
- hostInfoMd5.append(hostInfo.getHostName());
- hostInfoMd5.append(";");
- hostInfoMd5.append(hostInfo.getPortNumber());
- hostInfoMd5.append(";");
- }
-
- return DigestUtils.md5Hex(hostInfoMd5.toString());
- }
-
- private EncryptConfigEntry requestPubKey(String pubKeyUrl, String
userName, boolean needGet) {
- if (StringUtils.isBlank(userName)) {
- logger.error("Queried userName is null!");
- return null;
- }
- List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
- params.add(new BasicNameValuePair("operation", "query"));
- params.add(new BasicNameValuePair("username", userName));
- String returnStr = requestConfiguration(pubKeyUrl, params);
- if (StringUtils.isBlank(returnStr)) {
- logger.info("No public key information returned from manager");
- return null;
- }
- JsonObject pubKeyConf =
JsonParser.parseString(returnStr).getAsJsonObject();
- if (pubKeyConf == null) {
- logger.info("No public key information returned from manager");
- return null;
- }
- if (!pubKeyConf.has("resultCode")) {
- logger.info("Parse pubKeyConf failure: No resultCode key
information returned from manager");
- return null;
- }
- int resultCode = pubKeyConf.get("resultCode").getAsInt();
- if (resultCode != 0) {
- logger.info("query pubKeyConf failure, error code is " +
resultCode + ", errInfo is "
- + pubKeyConf.get("message").getAsString());
- return null;
- }
- if (!pubKeyConf.has("resultData")) {
- logger.info("Parse pubKeyConf failure: No resultData key
information returned from manager");
- return null;
+ } catch (Throwable eHttp) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) create Http(s) client failure,
url={}, params={}",
+ this.callerId, url, params, eHttp);
+ }
+ return new Tuple2<>(false, eHttp.getMessage());
}
- JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
- if (resultData != null) {
- String publicKey = resultData.get("publicKey").getAsString();
- if (StringUtils.isBlank(publicKey)) {
- return null;
+ // post request and get response
+ HttpPost httpPost = null;
+ try {
+ httpPost = new HttpPost(url);
+ this.addAuthorizationInfo(httpPost);
+ UrlEncodedFormEntity urlEncodedFormEntity =
+ new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
+ httpPost.setEntity(urlEncodedFormEntity);
+ HttpResponse response = httpClient.execute(httpPost);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ return new Tuple2<>(false,
response.getStatusLine().getStatusCode()
+ + ":" + response.getStatusLine().getStatusCode());
}
- String username = resultData.get("username").getAsString();
- if (StringUtils.isBlank(username)) {
- return null;
+ String returnStr = EntityUtils.toString(response.getEntity());
+ if (StringUtils.isBlank(returnStr)) {
+ return new Tuple2<>(false, "query result is blank!");
}
- String versionStr = resultData.get("version").getAsString();
- if (StringUtils.isBlank(versionStr)) {
- return null;
+ return new Tuple2<>(true, returnStr);
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) connect manager({}) exception,
params={}",
+ this.callerId, url, params, ex);
+ }
+ return new Tuple2<>(false, ex.getMessage());
+ } finally {
+ if (httpPost != null) {
+ httpPost.releaseConnection();
+ }
+ if (httpClient != null) {
+ httpClient.getConnectionManager().shutdown();
}
- return new EncryptConfigEntry(username, versionStr, publicKey);
}
- return null;
}
- public ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws
Exception {
- DataProxyNodeResponse proxyCluster;
- try {
- byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
- proxyCluster = gson.fromJson(new String(fileBytes),
DataProxyNodeResponse.class);
- } catch (Throwable e) {
- throw new Exception("Read local proxyList File failure by " +
filePath + ", reason is " + e.getCause());
- }
- if (ObjectUtils.isEmpty(proxyCluster)) {
- logger.warn("no proxyCluster configure from local file");
- return null;
+ private CloseableHttpClient
getCloseableHttpClient(List<BasicNameValuePair> params)
+ throws NoSuchAlgorithmException, KeyManagementException {
+ CloseableHttpClient httpClient;
+ ArrayList<Header> headers = new ArrayList<>();
+ for (BasicNameValuePair paramItem : params) {
+ headers.add(new BasicHeader(paramItem.getName(),
paramItem.getValue()));
}
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout(clientConfig.getManagerConnTimeoutMs())
+
.setSocketTimeout(clientConfig.getManagerSocketTimeoutMs()).build();
+ SSLContext sslContext = SSLContexts.custom().build();
+ SSLConnectionSocketFactory sslSf = new
SSLConnectionSocketFactory(sslContext,
+ new String[]{clientConfig.getTlsVersion()}, null,
+ SSLConnectionSocketFactory.getDefaultHostnameVerifier());
+ httpClient =
HttpClients.custom().setDefaultHeaders(headers).setDefaultRequestConfig(requestConfig)
+ .setSSLSocketFactory(sslSf).build();
+ return httpClient;
+ }
- return getProxyConfigEntry(proxyCluster);
+ private void storeAndBuildMetaConfigure(ProxyClientConfig config) {
+ this.clientConfig = config;
+ StringBuilder strBuff = new StringBuilder(512);
+ this.proxyConfigVisitUrl = strBuff
+ .append(clientConfig.isVisitManagerByHttp() ?
ConfigConstants.HTTP : ConfigConstants.HTTPS)
+
.append(clientConfig.getManagerIP()).append(":").append(clientConfig.getManagerPort())
+
.append(ConfigConstants.MANAGER_DATAPROXY_API).append(clientConfig.getInlongGroupId())
+ .toString();
+ strBuff.delete(0, strBuff.length());
+ this.localProxyConfigStoreFile = strBuff
+ .append(clientConfig.getConfigStoreBasePath())
+ .append(ConfigConstants.META_STORE_SUB_DIR)
+ .append(clientConfig.getInlongGroupId())
+ .append(ConfigConstants.LOCAL_DP_CONFIG_FILE_SUFFIX)
+ .toString();
+ strBuff.delete(0, strBuff.length());
+ this.proxyConfigCacheFile = strBuff
+ .append(clientConfig.getConfigStoreBasePath())
+ .append(ConfigConstants.META_STORE_SUB_DIR)
+ .append(clientConfig.getInlongGroupId())
+ .append(ConfigConstants.REMOTE_DP_CACHE_FILE_SUFFIX)
+ .toString();
+ strBuff.delete(0, strBuff.length());
+ this.encryptConfigVisitUrl = clientConfig.getRsaPubKeyUrl();
+ this.encryptConfigCacheFile = strBuff
+ .append(clientConfig.getConfigStoreBasePath())
+ .append(ConfigConstants.META_STORE_SUB_DIR)
+ .append(clientConfig.getAuthSecretId())
+ .append(ConfigConstants.REMOTE_ENCRYPT_CACHE_FILE_SUFFIX)
+ .toString();
+ strBuff.delete(0, strBuff.length());
}
- private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson)
{
- Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
- if (localProxyAddrJson.has("tsn")) {
- JsonArray jsonStreamId = localProxyAddrJson.getAsJsonArray("tsn");
- for (int i = 0; i < jsonStreamId.size(); i++) {
- JsonObject jsonItem = jsonStreamId.get(i).getAsJsonObject();
- if (jsonItem != null && jsonItem.has("streamId") &&
jsonItem.has("sn")) {
- streamIdMap.put(jsonItem.get("streamId").getAsString(),
jsonItem.get("sn").getAsInt());
- }
- }
- }
- return streamIdMap;
+ private void addAuthorizationInfo(HttpPost httpPost) {
+ httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
+
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
+ clientConfig.getAuthSecretKey()));
}
- public ProxyConfigEntry requestProxyList(String url) {
- ArrayList<BasicNameValuePair> params = new
ArrayList<BasicNameValuePair>();
+ private List<BasicNameValuePair> buildProxyNodeQueryParams() {
+ ArrayList<BasicNameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
params.add(new BasicNameValuePair("protocolType",
clientConfig.getProtocolType()));
- logger.info("Begin to get configure from manager {}, param is {}",
url, params);
-
- String resultStr = requestConfiguration(url, params);
- ProxyClusterConfig clusterConfig = gson.fromJson(resultStr,
ProxyClusterConfig.class);
- if (clusterConfig == null || !clusterConfig.isSuccess() ||
clusterConfig.getData() == null) {
- return null;
- }
+ return params;
+ }
- DataProxyNodeResponse proxyCluster = clusterConfig.getData();
- return getProxyConfigEntry(proxyCluster);
+ private List<BasicNameValuePair> buildPubKeyQueryParams() {
+ List<BasicNameValuePair> params = new ArrayList<>();
+ params.add(new BasicNameValuePair("operation", "query"));
+ params.add(new BasicNameValuePair("username",
clientConfig.getAuthSecretId()));
+ return params;
}
- private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse
proxyCluster) {
+ private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(String
strRet) {
+ DataProxyNodeResponse proxyCluster;
+ try {
+ proxyCluster = gson.fromJson(strRet, DataProxyNodeResponse.class);
+ } catch (Throwable ex) {
+ if (parseCounter.shouldPrint()) {
+ logger.warn("ConfigManager({}) parse exception, groupId={},
config={}",
+ this.callerId, clientConfig.getInlongGroupId(),
strRet, ex);
+ }
+ return new Tuple2<>(null, "parse failure:" + ex.getMessage());
+ }
+ // parse nodeList
List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
if (CollectionUtils.isEmpty(nodeList)) {
- logger.error("dataproxy nodeList is empty in
DataProxyNodeResponse!");
- return null;
+ return new Tuple2<>(null, "nodeList is empty!");
}
- Map<String, HostInfo> hostMap = formatHostInfoMap(nodeList);
- if (MapUtils.isEmpty(hostMap)) {
- return null;
+ HostInfo tmpHostInfo;
+ Map<String, HostInfo> hostMap = new HashMap<>();
+ for (DataProxyNodeInfo proxy : nodeList) {
+ if (ObjectUtils.isEmpty(proxy.getId())
+ || StringUtils.isEmpty(proxy.getIp())
+ || ObjectUtils.isEmpty(proxy.getPort())
+ || proxy.getPort() < 0) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("Invalid proxy node: groupId={}, id={}, ip={},
port={}",
+ clientConfig.getInlongGroupId(), proxy.getId(),
proxy.getIp(), proxy.getPort());
+ }
+ continue;
+ }
+ tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort());
+ hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
}
-
+ if (hostMap.isEmpty()) {
+ return new Tuple2<>(null, "no valid nodeList records!");
+ }
+ // parse clusterId
int clusterId = -1;
if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) {
clusterId = proxyCluster.getClusterId();
}
+ // parse load
int load = ConfigConstants.LOAD_THRESHOLD;
if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) {
load = proxyCluster.getLoad() > 200 ? 200 :
(Math.max(proxyCluster.getLoad(), 0));
}
+ // parse isIntranet
boolean isIntranet = true;
- if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
- isIntranet = proxyCluster.getIsIntranet() == 1 ? true : false;
+ if (ObjectUtils.isNotEmpty(proxyCluster.getIsIntranet())) {
+ isIntranet = proxyCluster.getIsIntranet() == 1;
}
+ // parse isSwitch
int isSwitch = 0;
if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
isSwitch = proxyCluster.getIsSwitch();
}
+ // build ProxyConfigEntry
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
proxyEntry.setClusterId(clusterId);
proxyEntry.setGroupId(clientConfig.getInlongGroupId());
@@ -635,114 +864,8 @@ public class ProxyConfigManager extends Thread {
proxyEntry.setHostMap(hostMap);
proxyEntry.setSwitchStat(isSwitch);
proxyEntry.setLoad(load);
- proxyEntry.setSize(nodeList.size());
proxyEntry.setMaxPacketLength(
proxyCluster.getMaxPacketLength() != null ?
proxyCluster.getMaxPacketLength() : -1);
- return proxyEntry;
- }
-
- private Map<String, HostInfo> formatHostInfoMap(List<DataProxyNodeInfo>
nodeList) {
- HostInfo tmpHostInfo;
- Map<String, HostInfo> hostMap = new HashMap<>();
- for (DataProxyNodeInfo proxy : nodeList) {
- if (ObjectUtils.isEmpty(proxy.getId()) ||
StringUtils.isEmpty(proxy.getIp()) || ObjectUtils
- .isEmpty(proxy.getPort()) || proxy.getPort() < 0) {
- logger.error("invalid proxy node, id:{}, ip:{}, port:{}",
proxy.getId(), proxy.getIp(),
- proxy.getPort());
- continue;
- }
- tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort());
- hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
-
- }
- if (hostMap.isEmpty()) {
- logger.error("Parse proxyList failure: address is empty for
response from manager!");
- return null;
- }
- return hostMap;
- }
-
- /* Request new configurations from Manager. */
- private String requestConfiguration(String url, List<BasicNameValuePair>
params) {
- if (StringUtils.isBlank(url)) {
- logger.error("request url is null");
- return null;
- }
- HttpPost httpPost = null;
- HttpParams myParams = new BasicHttpParams();
- HttpConnectionParams.setConnectionTimeout(myParams, 10000);
- HttpConnectionParams.setSoTimeout(myParams,
clientConfig.getManagerSocketTimeout());
- CloseableHttpClient httpClient;
- if (this.clientConfig.isRequestByHttp()) {
- httpClient = new DefaultHttpClient(myParams);
- } else {
- try {
- httpClient = getCloseableHttpClient(params);
- } catch (Throwable eHttps) {
- logger.error("Create Https cliet failure, error 1 is ",
eHttps);
- eHttps.printStackTrace();
- return null;
- }
- }
- logger.info("Request url : {}, params : {}", url, params);
- try {
- httpPost = new HttpPost(url);
- httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
-
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
- clientConfig.getAuthSecretKey()));
- UrlEncodedFormEntity urlEncodedFormEntity = new
UrlEncodedFormEntity(params, "UTF-8");
- httpPost.setEntity(urlEncodedFormEntity);
- HttpResponse response = httpClient.execute(httpPost);
- String returnStr = EntityUtils.toString(response.getEntity());
- if (StringUtils.isNotBlank(returnStr)
- && response.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
- logger.info("Get configure from manager is {}", returnStr);
- return returnStr;
- }
- return null;
- } catch (Throwable e) {
- logger.error("Connect Manager error, message: {}, url is {}",
e.getMessage(), url);
- return null;
- } finally {
- if (httpPost != null) {
- httpPost.releaseConnection();
- }
- if (httpClient != null) {
- httpClient.getConnectionManager().shutdown();
- }
- }
- }
-
- private StringEntity getEntity(List<BasicNameValuePair> params) throws
UnsupportedEncodingException {
- JsonObject jsonObject = new JsonObject();
- for (BasicNameValuePair pair : params) {
- jsonObject.addProperty(pair.getName(), pair.getValue());
- }
- StringEntity se = new StringEntity(jsonObject.toString());
- se.setContentType(APPLICATION_JSON);
- return se;
- }
-
- private CloseableHttpClient
getCloseableHttpClient(List<BasicNameValuePair> params)
- throws NoSuchAlgorithmException, KeyManagementException {
- CloseableHttpClient httpClient;
- ArrayList<Header> headers = new ArrayList<Header>();
- for (BasicNameValuePair paramItem : params) {
- headers.add(new BasicHeader(paramItem.getName(),
paramItem.getValue()));
- }
- RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(10000)
-
.setSocketTimeout(clientConfig.getManagerSocketTimeout()).build();
- SSLContext sslContext = SSLContexts.custom().build();
- SSLConnectionSocketFactory sslsf = new
SSLConnectionSocketFactory(sslContext,
- new String[]{clientConfig.getTlsVersion()}, null,
- SSLConnectionSocketFactory.getDefaultHostnameVerifier());
- httpClient =
HttpClients.custom().setDefaultHeaders(headers).setDefaultRequestConfig(requestConfig)
- .setSSLSocketFactory(sslsf).build();
- return httpClient;
- }
-
- public void updateHashRing(List<HostInfo> newHosts) {
- this.hashRing.updateNode(newHosts);
- logger.debug("update hash ring {}",
hashRing.getVirtualNode2RealNode());
+ return new Tuple2<>(proxyEntry, "ok");
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 3999390f9b..2ba1938409 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -54,9 +54,8 @@ public class HttpClientExample {
proxyConfig = new ProxyClientConfig(localIP, requestByHttp,
inLongManagerAddr,
Integer.valueOf(inLongManagerPort),
inlongGroupId, "admin", "inlong");// user and password of
manager
- proxyConfig.setInlongGroupId(inlongGroupId);
- proxyConfig.setConfStoreBasePath(configBasePath);
- proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
+ proxyConfig.setConfigStoreBasePath(configBasePath);
+ proxyConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
proxyConfig.setDiscardOldMessage(true);
proxyConfig.setProtocolType(ProtocolType.HTTP);
sender = new HttpProxySender(proxyConfig);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index eda90bdbca..85012af172 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -70,9 +70,9 @@ public class TcpClientExample {
dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp,
inLongManagerAddr,
Integer.valueOf(inLongManagerPort), inlongGroupId,
"admin", "inlong");
if (StringUtils.isNotEmpty(configBasePath)) {
- dataProxyConfig.setConfStoreBasePath(configBasePath);
+ dataProxyConfig.setConfigStoreBasePath(configBasePath);
}
- dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
+ dataProxyConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
dataProxyConfig.setProtocolType(ProtocolType.TCP);
dataProxyConfig.setRequestTimeoutMs(20000L);
messageSender =
DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index 45c95e042d..55a26918a0 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -27,6 +27,7 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.utils.ConsistencyHashUtil;
import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -84,7 +85,7 @@ public class ClientMgr {
private int aliveConnections;
private int realSize;
private SendHBThread sendHBThread;
- private ProxyConfigManager ipManager;
+ private ProxyConfigManager configManager;
private int groupIdNum = 0;
private String groupId = "";
private Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
@@ -115,10 +116,9 @@ public class ClientMgr {
bootstrap.option(ChannelOption.SO_SNDBUF,
ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
bootstrap.handler(new ClientPipelineFactory(this, sender));
/* ready to Start the thread which refreshes the proxy list. */
- ipManager = new ProxyConfigManager(configure, this);
- ipManager.setName("proxyConfigManager");
+ configManager = new ProxyConfigManager(sender.getInstanceId(),
configure, this);
+ configManager.setName("proxyConfigManager");
if (configure.getInlongGroupId() != null) {
- ipManager.setInlongGroupId(configure.getInlongGroupId());
groupId = configure.getInlongGroupId();
}
@@ -131,13 +131,13 @@ public class ClientMgr {
this.loadBalance = configure.getLoadBalance();
try {
- ipManager.doProxyEntryQueryWork();
+ configManager.doProxyEntryQueryWork();
} catch (IOException e) {
e.printStackTrace();
logger.info(e.getMessage());
}
- ipManager.setDaemon(true);
- ipManager.start();
+ configManager.setDaemon(true);
+ configManager.start();
this.sendHBThread = new SendHBThread();
this.sendHBThread.setName("SendHBThread");
@@ -181,7 +181,13 @@ public class ClientMgr {
}
public EncryptConfigEntry getEncryptConfigEntry() {
- return this.ipManager.getEncryptConfigEntry(configure.getUserName());
+ Tuple2<EncryptConfigEntry, String> result;
+ try {
+ result = configManager.getEncryptConfigure(false);
+ return result.getF0();
+ } catch (Throwable ex) {
+ return null;
+ }
}
public List<HostInfo> getProxyInfoList() {
@@ -273,7 +279,12 @@ public class ClientMgr {
}
public ProxyConfigEntry getGroupIdConfigureInfo() throws Exception {
- return ipManager.getGroupIdConfigure();
+ Tuple2<ProxyConfigEntry, String> result =
+ configManager.getGroupIdConfigure(true);
+ if (result.getF0() == null) {
+ throw new Exception(result.getF1());
+ }
+ return result.getF0();
}
/**
@@ -531,7 +542,7 @@ public class ClientMgr {
public void shutDown() {
bootstrap.config().group().shutdownGracefully();
- ipManager.shutDown();
+ configManager.shutDown();
// connectionCheckThread.shutDown();
sendHBThread.shutDown();
@@ -851,9 +862,9 @@ public class ClientMgr {
Collections.singletonList(hbMsg.getBytes(StandardCharsets.UTF_8)),
8, false, false, false, System.currentTimeMillis() / 1000, 1,
"", "", "");
try {
- if (configure.isNeedAuthentication()) {
- encodeObject.setAuth(configure.isNeedAuthentication(),
- configure.getUserName(), configure.getSecretKey());
+ if (configure.isEnableAuthentication()) {
+ encodeObject.setAuth(configure.isEnableAuthentication(),
+ configure.getAuthSecretId(),
configure.getAuthSecretKey());
}
client.write(encodeObject);
} catch (Throwable e) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 5d31f05f8a..1cf9939ff9 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,15 +76,14 @@ public class HttpProxySender extends Thread {
private void initTDMClientAndRequest(ProxyClientConfig configure) throws
Exception {
try {
- proxyConfigManager = new ProxyConfigManager(configure, null);
- proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
+ proxyConfigManager = new ProxyConfigManager(configure);
ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
hostList.addAll(proxyConfigEntry.getHostMap().values());
this.setDaemon(true);
this.start();
} catch (Throwable e) {
- if (configure.isReadProxyIPFromLocal()) {
+ if (configure.isOnlyUseLocalProxyConfig()) {
throw new Exception("Get local proxy configure failure! e =
{}", e);
} else {
throw new Exception("Visit TDManager error! e = {}", e);
@@ -98,7 +98,9 @@ public class HttpProxySender extends Thread {
* @return proxy config entry.
*/
private ProxyConfigEntry retryGettingProxyConfig() throws Exception {
- return proxyConfigManager.getGroupIdConfigure();
+ Tuple2<ProxyConfigEntry, String> result =
+ proxyConfigManager.getGroupIdConfigure(true);
+ return result.getF0();
}
/**
@@ -112,9 +114,13 @@ public class HttpProxySender extends Thread {
int randSleepTime =
proxyClientConfig.getProxyHttpUpdateIntervalMinutes() * 60 + rand;
TimeUnit.MILLISECONDS.sleep(randSleepTime * 1000);
if (proxyConfigManager != null) {
- ProxyConfigEntry proxyConfigEntry =
proxyConfigManager.getGroupIdConfigure();
- hostList.addAll(proxyConfigEntry.getHostMap().values());
- hostList.retainAll(proxyConfigEntry.getHostMap().values());
+ Tuple2<ProxyConfigEntry, String> result =
+ proxyConfigManager.getGroupIdConfigure(false);
+ if (result.getF0() == null) {
+ throw new Exception(result.getF1());
+ }
+ hostList.addAll(result.getF0().getHostMap().values());
+ hostList.retainAll(result.getF0().getHostMap().values());
} else {
logger.error("manager is null, please check it!");
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 9f001205e3..e4f820e3ed 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -43,12 +43,13 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
-
+ private static final AtomicLong senderIdGen = new AtomicLong(0L);
/* Store the callback used by asynchronously message sending. */
private final ConcurrentHashMap<Channel, ConcurrentHashMap<String,
QueueObject>> callbacks =
new ConcurrentHashMap<>();
@@ -61,6 +62,7 @@ public class Sender {
private final AtomicInteger currentBufferSize = new AtomicInteger(0);
private final TimeoutScanThread scanThread;
private final ClientMgr clientMgr;
+ private final String instanceId;
private final ProxyClientConfig configure;
private MetricWorkerThread metricWorker = null;
private int clusterId = -1;
@@ -74,6 +76,7 @@ public class Sender {
*/
public Sender(ProxyClientConfig configure, ThreadFactory
selfDefineFactory) throws Exception {
this.configure = configure;
+ this.instanceId = "sender-" + senderIdGen.incrementAndGet();
this.asyncCallbackMaxSize = configure.getTotalAsyncCallbackSize();
this.threadPool = Executors.newCachedThreadPool();
this.clientMgr = new ClientMgr(configure, this, selfDefineFactory);
@@ -82,14 +85,14 @@ public class Sender {
proxyConfigEntry = this.clientMgr.getGroupIdConfigureInfo();
setClusterId(proxyConfigEntry.getClusterId());
} catch (Throwable e) {
- if (configure.isReadProxyIPFromLocal()) {
+ if (configure.isOnlyUseLocalProxyConfig()) {
throw new Exception("Get local proxy configure failure!",
e.getCause());
} else {
throw new Exception("Visit manager error!", e.getCause());
}
}
if (!proxyConfigEntry.isInterVisit()) {
- if (!configure.isNeedAuthentication()) {
+ if (!configure.isEnableAuthentication()) {
throw new Exception("In OutNetwork isNeedAuthentication must
be true!");
}
if (!configure.isNeedDataEncry()) {
@@ -200,7 +203,8 @@ public class Sender {
}
}
if (this.configure.isNeedDataEncry()) {
- encodeObject.setEncryptEntry(true, configure.getUserName(),
clientMgr.getEncryptConfigEntry());
+ encodeObject.setEncryptEntry(true,
+ configure.getAuthSecretId(),
clientMgr.getEncryptConfigEntry());
} else {
encodeObject.setEncryptEntry(false, null, null);
}
@@ -371,7 +375,8 @@ public class Sender {
}
}
if (this.configure.isNeedDataEncry()) {
- encodeObject.setEncryptEntry(true, configure.getUserName(),
clientMgr.getEncryptConfigEntry());
+ encodeObject.setEncryptEntry(true,
+ configure.getAuthSecretId(),
clientMgr.getEncryptConfigEntry());
} else {
encodeObject.setEncryptEntry(false, null, null);
}
@@ -498,6 +503,10 @@ public class Sender {
this.clusterId = clusterId;
}
+ public String getInstanceId() {
+ return instanceId;
+ }
+
/**
* check whether clientChannel is idle; if idle, need send hb to keep alive
*
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 371dcf661c..b7bd42ab2b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -150,11 +150,11 @@ public class ProxyUtils {
* @param clientConfig
*/
public static void validClientConfig(ProxyClientConfig clientConfig) {
- if (clientConfig.isNeedAuthentication()) {
- if (StringUtils.isBlank(clientConfig.getUserName())) {
- throw new IllegalArgumentException("Authentication require
userName not Blank!");
+ if (clientConfig.isEnableAuthentication()) {
+ if (StringUtils.isBlank(clientConfig.getAuthSecretId())) {
+ throw new IllegalArgumentException("Authentication require
secretId not Blank!");
}
- if (StringUtils.isBlank(clientConfig.getSecretKey())) {
+ if (StringUtils.isBlank(clientConfig.getAuthSecretKey())) {
throw new IllegalArgumentException("Authentication require
secretKey not Blank!");
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java
new file mode 100644
index 0000000000..e5ba8c2bc2
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+public class Tuple2<T0, T1> {
+
+ /** Field 0 of the tuple. */
+ private T0 f0 = null;
+ /** Field 1 of the tuple. */
+ private T1 f1 = null;
+
+ /**
+ * Creates a new tuple where all fields are null.
+ */
+ public Tuple2() {
+
+ }
+
+ /**
+ * Creates a new tuple with field 0 specified.
+ *
+ * @param value0 The value for field 0
+ */
+ public Tuple2(T0 value0) {
+ this.f0 = value0;
+ }
+
+ /**
+ * Creates a new tuple and assigns the given values to the tuple's fields.
+ *
+ * @param value0 The value for field 0
+ * @param value1 The value for field 1
+ */
+ public Tuple2(T0 value0, T1 value1) {
+ setF0AndF1(value0, value1);
+ }
+
+ public T0 getF0() {
+ return f0;
+ }
+
+ public T1 getF1() {
+ return f1;
+ }
+
+ /**
+ * Set all field values
+ *
+ * @param value0 The value for field 0
+ * @param value1 The value for field 1
+ */
+ public void setF0AndF1(T0 value0, T1 value1) {
+ this.f0 = value0;
+ this.f1 = value1;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
index 5f3ba92c75..40fec3b57d 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.junit.Assert;
import org.junit.Test;
@@ -36,14 +37,18 @@ public class ProxyConfigManagerTest {
.toString();
private final ProxyClientConfig clientConfig =
PowerMockito.mock(ProxyClientConfig.class);
private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class);
- private final ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(clientConfig, clientMgr);
+ private final ProxyConfigManager proxyConfigManager;
public ProxyConfigManagerTest() throws URISyntaxException {
+ clientConfig.setConfigStoreBasePath(localFile);
+ proxyConfigManager =
+ new ProxyConfigManager("test", clientConfig, clientMgr);
}
@Test
public void testProxyConfigParse() throws Exception {
- ProxyConfigEntry proxyEntry =
proxyConfigManager.getLocalProxyListFromFile(localFile);
+ Tuple2<ProxyConfigEntry, String> result =
proxyConfigManager.getLocalProxyListFromFile(localFile);
+ ProxyConfigEntry proxyEntry = result.getF0();
Assert.assertEquals(proxyEntry.isInterVisit(), false);
Assert.assertEquals(proxyEntry.getLoad(), 12);
Assert.assertEquals(proxyEntry.getClusterId(), 1);
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 74cfcffa21..1965ef37e3 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -62,7 +62,7 @@ public class InlongSdkDirtySender {
ProxyClientConfig proxyClientConfig =
new
ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
inlongManagerAddr, inlongManagerPort, inlongGroupId,
authId, authKey);
- proxyClientConfig.setReadProxyIPFromLocal(false);
+ proxyClientConfig.setOnlyUseLocalProxyConfig(false);
proxyClientConfig.setAsyncCallbackSize(maxCallbackSize);
this.sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
this.sender.setMsgtype(7);