This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dae7074d9e [INLONG-11564][SDK] DataProxy SDK Implementation
Optimization (#11581)
dae7074d9e is described below
commit dae7074d9ee219135138e8a9a8a3db8948366bfd
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Dec 6 08:57:13 2024 +0800
[INLONG-11564][SDK] DataProxy SDK Implementation Optimization (#11581)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 2 -
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 24 --
.../sdk/dataproxy/codec/ProtocolDecoder.java | 53 ++--
.../sdk/dataproxy/codec/ProtocolEncoder.java | 32 +--
.../inlong/sdk/dataproxy/common/SendResult.java | 2 +-
.../sdk/dataproxy/config/EncryptConfigEntry.java | 5 +-
.../inlong/sdk/dataproxy/config/HostInfo.java | 4 +-
.../sdk/dataproxy/config/ProxyConfigManager.java | 270 +++++++-------------
.../sdk/dataproxy/network/SyncMessageCallable.java | 13 +-
.../sdk/dataproxy/network/TimeScanObject.java | 4 +-
.../dataproxy/threads/ManagerFetcherThread.java | 68 -----
.../inlong/sdk/dataproxy/utils/EventLoopUtil.java | 3 +-
.../sdk/dataproxy/utils/ServiceDiscoveryUtils.java | 279 ---------------------
13 files changed, 153 insertions(+), 606 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 6fa6f6ff9a..dd22e63cce 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -29,7 +29,6 @@ import
org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
-import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
@@ -52,7 +51,6 @@ public class DefaultMessageSender implements MessageSender {
private static final ConcurrentHashMap<Integer, DefaultMessageSender>
CACHE_SENDER =
new ConcurrentHashMap<>();
private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new
AtomicBoolean(false);
- private static ManagerFetcherThread managerFetcherThread;
private static final SequentialID idGenerator = new SequentialID();
private final Sender sender;
private final IndexCollectThread indexCol;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index e7a0f6a3e6..ce8a3b3b39 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -34,7 +34,6 @@ public class ProxyClientConfig {
private String managerIP = "";
private String managerAddress;
- private String managerIpLocalPath = System.getProperty("user.dir") +
"/.inlong/.managerIps";
private String managerUrl = "";
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
@@ -54,7 +53,6 @@ public class ProxyClientConfig {
private String authSecretKey;
private String protocolType;
- private boolean enableSaveManagerVIps = false;
// metric configure
private MetricConfig metricConfig = new MetricConfig();
@@ -211,28 +209,6 @@ public class ProxyClientConfig {
return managerIP;
}
- public String getManagerIpLocalPath() {
- return managerIpLocalPath;
- }
-
- public void setManagerIpLocalPath(String managerIpLocalPath) throws
ProxysdkException {
- if (StringUtils.isEmpty(managerIpLocalPath)) {
- throw new ProxysdkException("managerIpLocalPath is empty.");
- }
- if (managerIpLocalPath.charAt(managerIpLocalPath.length() - 1) == '/')
{
- managerIpLocalPath = managerIpLocalPath.substring(0,
managerIpLocalPath.length() - 1);
- }
- this.managerIpLocalPath = managerIpLocalPath + "/.managerIps";
- }
-
- public boolean isEnableSaveManagerVIps() {
- return enableSaveManagerVIps;
- }
-
- public void setEnableSaveManagerVIps(boolean enable) {
- this.enableSaveManagerVIps = enable;
- }
-
public String getConfStoreBasePath() {
return confStoreBasePath;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 2038a8b8d6..b7c31cba79 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.dataproxy.codec;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
@@ -29,18 +31,19 @@ import java.util.List;
public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolDecoder.class);
-
+ private static final Logger logger =
LoggerFactory.getLogger(ProtocolDecoder.class);
+ private static final LogCounter decExptCounter = new LogCounter(10,
200000, 60 * 1000L);
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buffer, List<Object> out) throws Exception {
buffer.markReaderIndex();
// totallen
int totalLen = buffer.readInt();
- LOGGER.debug("decode totalLen : {}", totalLen);
if (totalLen != buffer.readableBytes()) {
- LOGGER.error("totalLen is not equal readableBytes.total:" +
totalLen
- + ";readableBytes:" + buffer.readableBytes());
+ if (decExptCounter.shouldPrint()) {
+ logger.error("Length not equal,
totalLen={},readableBytes={},from={}",
+ totalLen, buffer.readableBytes(), ctx.channel());
+ }
buffer.resetReaderIndex();
throw new Exception("totalLen is not equal readableBytes.total");
}
@@ -48,14 +51,17 @@ public class ProtocolDecoder extends
MessageToMessageDecoder<ByteBuf> {
int msgType = buffer.readByte() & 0x1f;
if (msgType == 4) {
- LOGGER.info("debug decode");
- }
- if (msgType == 3 | msgType == 5) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("debug decode");
+ }
+ } else if (msgType == 3 | msgType == 5) {
// bodylen
int bodyLength = buffer.readInt();
if (bodyLength >= totalLen) {
- LOGGER.error("bodyLen is greater than totalLen.totalLen:" +
totalLen
- + ";bodyLen:" + bodyLength);
+ if (decExptCounter.shouldPrint()) {
+ logger.error("bodyLen greater than totalLen,
totalLen={},bodyLen={},from={}",
+ totalLen, bodyLength, ctx.channel());
+ }
buffer.resetReaderIndex();
throw new Exception("bodyLen is greater than
totalLen.totalLen");
}
@@ -64,20 +70,19 @@ public class ProtocolDecoder extends
MessageToMessageDecoder<ByteBuf> {
bodyBytes = new byte[bodyLength];
buffer.readBytes(bodyBytes);
}
-
// attrlen
+ String attrInfo = "";
int attrLength = buffer.readInt();
- byte[] attrBytes = null;
if (attrLength > 0) {
- attrBytes = new byte[attrLength];
+ byte[] attrBytes = new byte[attrLength];
buffer.readBytes(attrBytes);
+ attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
EncodeObject object;
if (bodyBytes == null) {
- object = new EncodeObject(new String(attrBytes,
StandardCharsets.UTF_8));
+ object = new EncodeObject(attrInfo);
} else {
- object = new EncodeObject(Collections.singletonList(bodyBytes),
- new String(attrBytes, StandardCharsets.UTF_8));
+ object = new
EncodeObject(Collections.singletonList(bodyBytes), attrInfo);
}
object.setMsgtype(5);
out.add(object);
@@ -85,12 +90,13 @@ public class ProtocolDecoder extends
MessageToMessageDecoder<ByteBuf> {
int seqId = buffer.readInt();
int attrLen = buffer.readShort();
- byte[] attrBytes = null;
+ String attrInfo = "";
if (attrLen > 0) {
- attrBytes = new byte[attrLen];
+ byte[] attrBytes = new byte[attrLen];
buffer.readBytes(attrBytes);
+ attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
- EncodeObject object = new EncodeObject(new String(attrBytes,
StandardCharsets.UTF_8));
+ EncodeObject object = new EncodeObject(attrInfo);
object.setMessageId(String.valueOf(seqId));
buffer.readShort();
@@ -103,15 +109,14 @@ public class ProtocolDecoder extends
MessageToMessageDecoder<ByteBuf> {
buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and
body_len
final short load = buffer.readShort(); // read from body
int attrLen = buffer.readShort();
- byte[] attrBytes = null;
+ String attrInfo = "";
if (attrLen > 0) {
- attrBytes = new byte[attrLen];
+ byte[] attrBytes = new byte[attrLen];
buffer.readBytes(attrBytes);
+ attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
buffer.skipBytes(2); // skip magic
-
- String attrs = (attrBytes == null ? "" : new String(attrBytes,
StandardCharsets.UTF_8));
- EncodeObject object = new EncodeObject(attrs);
+ EncodeObject object = new EncodeObject(attrInfo);
object.setMsgtype(8);
object.setLoad(load);
out.add(object);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index c3d463987b..1a20766e55 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -99,16 +99,17 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
if (object.isAuth()) {
msgType |= FLAG_ALLOW_AUTH;
}
- int totalLength = 1 + 4 + 1 + 4 + 2 +
endAttr.getBytes("utf8").length + 2;
+ byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
+ int totalLength = 1 + 4 + 1 + 4 + 2 + attrData.length + 2;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt((int) object.getDt());
buf.writeByte(1);
buf.writeInt(0);
- buf.writeShort(endAttr.getBytes("utf8").length);
- if (endAttr.getBytes("utf8").length > 0) {
- buf.writeBytes(endAttr.getBytes("utf8"));
+ buf.writeShort(attrData.length);
+ if (attrData.length > 0) {
+ buf.writeBytes(attrData);
}
buf.writeShort(0xee01);
} catch (Throwable ex) {
@@ -160,7 +161,8 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
if (object.isCompress()) {
msgType |= FLAG_ALLOW_COMPRESS;
}
- totalLength = totalLength + body.length +
endAttr.getBytes("utf8").length;
+ byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
+ totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
@@ -181,8 +183,8 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
buf.writeInt(body.length);
buf.writeBytes(body);
- buf.writeShort(endAttr.getBytes("utf8").length);
- buf.writeBytes(endAttr.getBytes("utf8"));
+ buf.writeShort(attrData.length);
+ buf.writeBytes(attrData);
buf.writeShort(0xee01);
}
return buf;
@@ -207,7 +209,7 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
ByteArrayOutputStream data = new ByteArrayOutputStream();
for (byte[] entry : object.getBodylist()) {
if (totalCnt++ > 0) {
- data.write("\n".getBytes("utf8"));
+
data.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
}
data.write(entry);
}
@@ -280,14 +282,15 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
if (object.isEncrypt()) {
msgType |= FLAG_ALLOW_ENCRYPT;
}
- totalLength = totalLength + body.length +
msgAttrs.getBytes("utf8").length;
+ byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
+ totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt(body.length);
buf.writeBytes(body);
- buf.writeInt(msgAttrs.getBytes("utf8").length);
- buf.writeBytes(msgAttrs.getBytes("utf8"));
+ buf.writeInt(attrData.length);
+ buf.writeBytes(attrData);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
@@ -344,14 +347,15 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
if (object.isEncrypt()) {
msgType |= FLAG_ALLOW_ENCRYPT;
}
- totalLength = totalLength + body.length +
msgAttrs.getBytes("utf8").length;
+ byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
+ totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt(body.length);
buf.writeBytes(body);
- buf.writeInt(msgAttrs.getBytes("utf8").length);
- buf.writeBytes(msgAttrs.getBytes("utf8"));
+ buf.writeInt(attrData.length);
+ buf.writeBytes(attrData);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
index adab601e0a..f336702ee4 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.common;
public enum SendResult {
- INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
OK,
+ INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
TIMEOUT,
CONNECTION_BREAK,
THREAD_INTERRUPT,
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
index 6acfe09d8a..47f6cd1ed7 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.net.URLEncoder;
import java.security.interfaces.RSAPublicKey;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
public class EncryptConfigEntry implements java.io.Serializable {
@@ -122,7 +123,7 @@ public class EncryptConfigEntry implements
java.io.Serializable {
@Override
public boolean equals(Object other) {
- if (other == null || !(other instanceof EncryptConfigEntry)) {
+ if (!(other instanceof EncryptConfigEntry)) {
return false;
}
if (other == this) {
@@ -131,7 +132,7 @@ public class EncryptConfigEntry implements
java.io.Serializable {
EncryptConfigEntry info = (EncryptConfigEntry) other;
return (this.userName.equals(info.getUserName()))
&& (this.version.equals(info.getVersion()))
- && (this.pubKey == info.getPubKey());
+ && (Objects.equals(this.pubKey, info.getPubKey()));
}
public String toString() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
index c93c872ea5..071cf4b6eb 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
@@ -25,10 +25,10 @@ public class HostInfo implements Comparable<HostInfo>,
java.io.Serializable {
private final String hostName;
private final int portNumber;
- public HostInfo(String referenceName, String hostName, int portNumber) {
- this.referenceName = referenceName;
+ public HostInfo(String hostName, int portNumber) {
this.hostName = hostName;
this.portNumber = portNumber;
+ this.referenceName = hostName + ":" + portNumber;
}
public String getReferenceName() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index b10e533037..d259cdff08 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -35,7 +35,6 @@ import com.google.gson.stream.JsonReader;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
@@ -91,11 +90,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ProxyConfigManager extends Thread {
public static final String APPLICATION_JSON = "application/json";
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProxyConfigManager.class);
+ private static final Logger logger =
LoggerFactory.getLogger(ProxyConfigManager.class);
private final ProxyClientConfig clientConfig;
private final ClientMgr clientManager;
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
- private final JsonParser jsonParser = new JsonParser();
private final Gson gson = new Gson();
private final HashRing hashRing = HashRing.getInstance();
private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
@@ -104,8 +102,8 @@ public class ProxyConfigManager extends Thread {
private String inlongGroupId;
private String localMd5;
private boolean bShutDown = false;
- private long doworkTime = 0;
- private EncryptConfigEntry userEncryConfigEntry;
+ private long lstUpdatedTime = 0;
+ private EncryptConfigEntry userEncryptConfigEntry;
public ProxyConfigManager(final ProxyClientConfig configure, final
ClientMgr clientManager) {
this.clientConfig = configure;
@@ -122,7 +120,7 @@ public class ProxyConfigManager extends Thread {
}
public void shutDown() {
- LOGGER.info("Begin to shut down ProxyConfigManager!");
+ logger.info("Begin to shut down ProxyConfigManager!");
bShutDown = true;
}
@@ -132,9 +130,9 @@ public class ProxyConfigManager extends Thread {
try {
doProxyEntryQueryWork();
updateEncryptConfigEntry();
- LOGGER.info("ProxyConf update!");
+ logger.info("ProxyConf update!");
} catch (Throwable e) {
- LOGGER.error("Refresh proxy ip list runs into exception {},
{}", e.toString(), e.getStackTrace());
+ logger.error("Refresh proxy ip list runs into exception {},
{}", e.toString(), e.getStackTrace());
e.printStackTrace();
}
@@ -147,13 +145,13 @@ public class ProxyConfigManager extends Thread {
if (proxyUpdateIntervalSec > 5) {
sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() %
(proxyUpdateIntervalSec / 5);
}
- LOGGER.info("sleep time {}", sleepTimeSec);
+ logger.info("sleep time {}", sleepTimeSec);
Thread.sleep(sleepTimeSec * 1000);
} catch (Throwable e2) {
//
}
}
- LOGGER.info("ProxyConfigManager worker existed!");
+ logger.info("ProxyConfigManager worker existed!");
}
/**
@@ -170,11 +168,11 @@ public class ProxyConfigManager extends Thread {
if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
JsonReader reader = new JsonReader(new
FileReader(configCachePath));
ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader,
ProxyConfigEntry.class);
- LOGGER.info("{} has a backup! {}", inlongGroupId,
proxyConfigEntry);
+ logger.info("{} has a backup! {}", inlongGroupId,
proxyConfigEntry);
return proxyConfigEntry;
}
} catch (Exception ex) {
- LOGGER.warn("try to read local cache, caught {}", ex.getMessage());
+ logger.warn("try to read local cache, caught {}", ex.getMessage());
} finally {
rw.readLock().unlock();
}
@@ -189,13 +187,13 @@ public class ProxyConfigManager extends Thread {
// try to create parent
file.getParentFile().mkdirs();
}
- LOGGER.info("try to write {}} to local cache {}", entry,
configCachePath);
+ logger.info("try to write {}} to local cache {}", entry,
configCachePath);
FileWriter fileWriter = new FileWriter(configCachePath);
gson.toJson(entry, fileWriter);
fileWriter.flush();
fileWriter.close();
} catch (Exception ex) {
- LOGGER.warn("try to write local cache, caught {}",
ex.getMessage());
+ logger.warn("try to write local cache, caught {}",
ex.getMessage());
} finally {
rw.writeLock().unlock();
}
@@ -205,7 +203,7 @@ public class ProxyConfigManager extends Thread {
try {
return requestProxyList(this.clientConfig.getManagerUrl());
} catch (Exception e) {
- LOGGER.warn("try to request proxy list by http, caught {}",
e.getMessage());
+ logger.warn("try to request proxy list by http, caught {}",
e.getMessage());
}
return null;
}
@@ -280,7 +278,7 @@ public class ProxyConfigManager extends Thread {
}
/* We should exit if no local IP list and can't request it from
manager. */
if (localMd5 == null && proxyEntry == null) {
- LOGGER.error("Can't connect manager at the start of proxy API
{}",
+ logger.error("Can't connect manager at the start of proxy API
{}",
this.clientConfig.getManagerUrl());
proxyEntry = tryToReadCacheProxyEntry(configAddr);
}
@@ -290,7 +288,7 @@ public class ProxyConfigManager extends Thread {
s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber())
.append(",");
}
- LOGGER.warn("Backup proxyEntry [{}]", s);
+ logger.warn("Backup proxyEntry [{}]", s);
}
}
if (localMd5 == null && proxyEntry == null && proxyInfoList == null) {
@@ -312,7 +310,7 @@ public class ProxyConfigManager extends Thread {
*/
private void compareProxyList(ProxyConfigEntry proxyEntry) {
if (proxyEntry != null) {
- LOGGER.info("{}", proxyEntry.toString());
+ logger.info("{}", proxyEntry.toString());
if (proxyEntry.getSize() != 0) {
/* Initialize the current proxy information list first. */
clientManager.setLoadThreshold(proxyEntry.getLoad());
@@ -326,31 +324,31 @@ public class ProxyConfigManager extends Thread {
String oldMd5 = calcHostInfoMd5(proxyInfoList);
if (newMd5 != null && !newMd5.equals(oldMd5)) {
/* Choose random alive connections to send messages. */
- LOGGER.info("old md5 {} new md5 {}", oldMd5, newMd5);
+ logger.info("old md5 {} new md5 {}", oldMd5, newMd5);
proxyInfoList.clear();
proxyInfoList = newProxyInfoList;
clientManager.setProxyInfoList(proxyInfoList);
- doworkTime = System.currentTimeMillis();
+ lstUpdatedTime = System.currentTimeMillis();
} else if (proxyEntry.getSwitchStat() != oldStat) {
/* judge cluster's switch state */
oldStat = proxyEntry.getSwitchStat();
- if ((System.currentTimeMillis() - doworkTime) > 3 * 60 *
1000) {
- LOGGER.info("switch the cluster!");
+ if ((System.currentTimeMillis() - lstUpdatedTime) > 3 * 60
* 1000) {
+ logger.info("switch the cluster!");
proxyInfoList.clear();
proxyInfoList = newProxyInfoList;
clientManager.setProxyInfoList(proxyInfoList);
} else {
- LOGGER.info("only change oldStat ");
+ logger.info("only change oldStat ");
}
} else {
newProxyInfoList.clear();
- LOGGER.info("proxy IP list doesn't change, load {}",
proxyEntry.getLoad());
+ logger.info("proxy IP list doesn't change, load {}",
proxyEntry.getLoad());
}
if (clientConfig.getLoadBalance() ==
LoadBalance.CONSISTENCY_HASH) {
updateHashRing(proxyInfoList);
}
} else {
- LOGGER.error("proxyEntry's size is zero");
+ logger.error("proxyEntry's size is zero");
}
}
}
@@ -359,7 +357,7 @@ public class ProxyConfigManager extends Thread {
if (StringUtils.isBlank(userName)) {
return null;
}
- EncryptConfigEntry encryptEntry = this.userEncryConfigEntry;
+ EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
if (encryptEntry == null) {
int retryCount = 0;
encryptEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(),
userName, false);
@@ -372,21 +370,21 @@ public class ProxyConfigManager extends Thread {
if (encryptEntry != null) {
encryptEntry.getRsaEncryptedKey();
synchronized (this) {
- if (this.userEncryConfigEntry == null) {
- this.userEncryConfigEntry = encryptEntry;
+ if (this.userEncryptConfigEntry == null) {
+ this.userEncryptConfigEntry = encryptEntry;
} else {
- encryptEntry = this.userEncryConfigEntry;
+ encryptEntry = this.userEncryptConfigEntry;
}
}
}
} else {
synchronized (this) {
- if (this.userEncryConfigEntry == null ||
this.userEncryConfigEntry != encryptEntry) {
+ if (this.userEncryptConfigEntry == null ||
this.userEncryptConfigEntry != encryptEntry) {
storePubKeyEntry(encryptEntry);
encryptEntry.getRsaEncryptedKey();
- this.userEncryConfigEntry = encryptEntry;
+ this.userEncryptConfigEntry = encryptEntry;
} else {
- encryptEntry = this.userEncryConfigEntry;
+ encryptEntry = this.userEncryptConfigEntry;
}
}
}
@@ -410,10 +408,10 @@ public class ProxyConfigManager extends Thread {
return;
}
synchronized (this) {
- if (this.userEncryConfigEntry == null || this.userEncryConfigEntry
!= encryptConfigEntry) {
+ if (this.userEncryptConfigEntry == null ||
this.userEncryptConfigEntry != encryptConfigEntry) {
storePubKeyEntry(encryptConfigEntry);
encryptConfigEntry.getRsaEncryptedKey();
- this.userEncryConfigEntry = encryptConfigEntry;
+ this.userEncryptConfigEntry = encryptConfigEntry;
}
}
return;
@@ -421,7 +419,7 @@ public class ProxyConfigManager extends Thread {
private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
if (StringUtils.isBlank(userName)) {
- LOGGER.warn(" userName(" + userName + ") is not available");
+ logger.warn(" userName(" + userName + ") is not available");
return null;
}
EncryptConfigEntry entry;
@@ -441,7 +439,7 @@ public class ProxyConfigManager extends Thread {
return null;
}
} catch (Throwable e1) {
- LOGGER.error("Read " + userName + " stored PubKeyEntry error ",
e1);
+ logger.error("Read " + userName + " stored PubKeyEntry error ",
e1);
return null;
} finally {
if (fis != null) {
@@ -473,7 +471,7 @@ public class ProxyConfigManager extends Thread {
p.flush();
// p.close();
} catch (Throwable e) {
- LOGGER.error("store EncryptConfigEntry " + entry.toString() + "
exception ", e);
+ logger.error("store EncryptConfigEntry " + entry.toString() + "
exception ", e);
e.printStackTrace();
} finally {
if (fos != null) {
@@ -508,7 +506,7 @@ public class ProxyConfigManager extends Thread {
private EncryptConfigEntry requestPubKey(String pubKeyUrl, String
userName, boolean needGet) {
if (StringUtils.isBlank(userName)) {
- LOGGER.error("Queried userName is null!");
+ logger.error("Queried userName is null!");
return null;
}
List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
@@ -516,26 +514,26 @@ public class ProxyConfigManager extends Thread {
params.add(new BasicNameValuePair("username", userName));
String returnStr = requestConfiguration(pubKeyUrl, params);
if (StringUtils.isBlank(returnStr)) {
- LOGGER.info("No public key information returned from manager");
+ logger.info("No public key information returned from manager");
return null;
}
- JsonObject pubKeyConf = jsonParser.parse(returnStr).getAsJsonObject();
+ JsonObject pubKeyConf =
JsonParser.parseString(returnStr).getAsJsonObject();
if (pubKeyConf == null) {
- LOGGER.info("No public key information returned from manager");
+ logger.info("No public key information returned from manager");
return null;
}
if (!pubKeyConf.has("resultCode")) {
- LOGGER.info("Parse pubKeyConf failure: No resultCode key
information returned from manager");
+ logger.info("Parse pubKeyConf failure: No resultCode key
information returned from manager");
return null;
}
int resultCode = pubKeyConf.get("resultCode").getAsInt();
if (resultCode != 0) {
- LOGGER.info("query pubKeyConf failure, error code is " +
resultCode + ", errInfo is "
+ logger.info("query pubKeyConf failure, error code is " +
resultCode + ", errInfo is "
+ pubKeyConf.get("message").getAsString());
return null;
}
if (!pubKeyConf.has("resultData")) {
- LOGGER.info("Parse pubKeyConf failure: No resultData key
information returned from manager");
+ logger.info("Parse pubKeyConf failure: No resultData key
information returned from manager");
return null;
}
JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
@@ -566,7 +564,7 @@ public class ProxyConfigManager extends Thread {
throw new Exception("Read local proxyList File failure by " +
filePath + ", reason is " + e.getCause());
}
if (ObjectUtils.isEmpty(proxyCluster)) {
- LOGGER.warn("no proxyCluster configure from local file");
+ logger.warn("no proxyCluster configure from local file");
return null;
}
@@ -591,7 +589,7 @@ public class ProxyConfigManager extends Thread {
ArrayList<BasicNameValuePair> params = new
ArrayList<BasicNameValuePair>();
params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
params.add(new BasicNameValuePair("protocolType",
clientConfig.getProtocolType()));
- LOGGER.info("Begin to get configure from manager {}, param is {}",
url, params);
+ logger.info("Begin to get configure from manager {}, param is {}",
url, params);
String resultStr = requestConfiguration(url, params);
ProxyClusterConfig clusterConfig = gson.fromJson(resultStr,
ProxyClusterConfig.class);
@@ -606,7 +604,7 @@ public class ProxyConfigManager extends Thread {
private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse
proxyCluster) {
List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
if (CollectionUtils.isEmpty(nodeList)) {
- LOGGER.error("dataproxy nodeList is empty in
DataProxyNodeResponse!");
+ logger.error("dataproxy nodeList is empty in
DataProxyNodeResponse!");
return null;
}
Map<String, HostInfo> hostMap = formatHostInfoMap(nodeList);
@@ -644,133 +642,73 @@ public class ProxyConfigManager extends Thread {
}
private Map<String, HostInfo> formatHostInfoMap(List<DataProxyNodeInfo>
nodeList) {
+ HostInfo tmpHostInfo;
Map<String, HostInfo> hostMap = new HashMap<>();
for (DataProxyNodeInfo proxy : nodeList) {
if (ObjectUtils.isEmpty(proxy.getId()) ||
StringUtils.isEmpty(proxy.getIp()) || ObjectUtils
.isEmpty(proxy.getPort()) || proxy.getPort() < 0) {
- LOGGER.error("invalid proxy node, id:{}, ip:{}, port:{}",
proxy.getId(), proxy.getIp(),
+ logger.error("invalid proxy node, id:{}, ip:{}, port:{}",
proxy.getId(), proxy.getIp(),
proxy.getPort());
continue;
}
- String refId = proxy.getIp() + ":" + proxy.getPort();
- hostMap.put(refId, new HostInfo(refId, proxy.getIp(),
proxy.getPort()));
+ tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort());
+ hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
}
if (hostMap.isEmpty()) {
- LOGGER.error("Parse proxyList failure: address is empty for
response from manager!");
+ logger.error("Parse proxyList failure: address is empty for
response from manager!");
return null;
}
return hostMap;
}
- private String updateUrl(String url, int tryIdx, String
localManagerIpList) {
- if (tryIdx == 0) {
- return url;
- }
-
- int headerIdx = url.indexOf("://");
- if (headerIdx == -1) {
- return null;
- }
- String header = "";
- header = url.substring(0, headerIdx + 3);
- String tmpUrl = url.substring(headerIdx + 3);
- int tailerIdx = tmpUrl.indexOf("/");
- if (tailerIdx == -1) {
- return null;
- }
- String tailer = "";
- tailer = tmpUrl.substring(tailerIdx);
- String[] managerIps = localManagerIpList.split(",");
- String currentManagerIp = "";
- int idx = 1;
- for (String managerIp : managerIps) {
- if (idx++ == tryIdx) {
- currentManagerIp = managerIp;
- break;
- }
- }
- if (!currentManagerIp.equals("")) {
- return header + currentManagerIp + ":" +
clientConfig.getManagerPort() + tailer;
- }
- return null;
- }
-
/* Request new configurations from Manager. */
private String requestConfiguration(String url, List<BasicNameValuePair>
params) {
if (StringUtils.isBlank(url)) {
- LOGGER.error("request url is null");
+ logger.error("request url is null");
return null;
}
- // get local managerIpList
- String localManagerIps = "";
- int tryIdx = 0;
- while (true) {
- HttpPost httpPost = null;
- String returnStr = null;
- HttpParams myParams = new BasicHttpParams();
- HttpConnectionParams.setConnectionTimeout(myParams, 10000);
- HttpConnectionParams.setSoTimeout(myParams,
clientConfig.getManagerSocketTimeout());
- CloseableHttpClient httpClient = null;
- if (this.clientConfig.isRequestByHttp()) {
- httpClient = new DefaultHttpClient(myParams);
- } else {
- try {
- httpClient = getCloseableHttpClient(params);
- } catch (Throwable eHttps) {
- LOGGER.error("Create Https cliet failure, error 1 is ",
eHttps);
- eHttps.printStackTrace();
- return null;
- }
- }
-
- if (!clientConfig.isEnableSaveManagerVIps() && tryIdx > 0) {
+ HttpPost httpPost = null;
+ HttpParams myParams = new BasicHttpParams();
+ HttpConnectionParams.setConnectionTimeout(myParams, 10000);
+ HttpConnectionParams.setSoTimeout(myParams,
clientConfig.getManagerSocketTimeout());
+ CloseableHttpClient httpClient;
+ if (this.clientConfig.isRequestByHttp()) {
+ httpClient = new DefaultHttpClient(myParams);
+ } else {
+ try {
+ httpClient = getCloseableHttpClient(params);
+ } catch (Throwable eHttps) {
+ logger.error("Create Https cliet failure, error 1 is ",
eHttps);
+ eHttps.printStackTrace();
return null;
}
- // change url's manager host port when occur error
- url = updateUrl(url, tryIdx, localManagerIps);
- if (url == null) {
- return null;
+ }
+ logger.info("Request url : {}, params : {}", url, params);
+ try {
+ httpPost = new HttpPost(url);
+ httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
+
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
+ clientConfig.getAuthSecretKey()));
+ UrlEncodedFormEntity urlEncodedFormEntity = new
UrlEncodedFormEntity(params, "UTF-8");
+ httpPost.setEntity(urlEncodedFormEntity);
+ HttpResponse response = httpClient.execute(httpPost);
+ String returnStr = EntityUtils.toString(response.getEntity());
+ if (StringUtils.isNotBlank(returnStr)
+ && response.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
+ logger.info("Get configure from manager is {}", returnStr);
+ return returnStr;
}
- tryIdx++;
-
- LOGGER.info("Request url : " + url + ", localManagerIps : " +
localManagerIps);
- try {
- httpPost = new HttpPost(url);
- httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
-
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
- clientConfig.getAuthSecretKey()));
- UrlEncodedFormEntity urlEncodedFormEntity = new
UrlEncodedFormEntity(params, "UTF-8");
- httpPost.setEntity(urlEncodedFormEntity);
- HttpResponse response = httpClient.execute(httpPost);
- returnStr = EntityUtils.toString(response.getEntity());
- if (StringUtils.isNotBlank(returnStr)
- && response.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
- LOGGER.info("Get configure from manager is " + returnStr);
- return returnStr;
- }
-
- if (!clientConfig.isRequestByHttp()) {
- return null;
- }
- } catch (Throwable e) {
- LOGGER.error("Connect Manager error, message: {}, url is {}",
e.getMessage(), url);
-
- if (!clientConfig.isRequestByHttp()) {
- return null;
- }
- // get localManagerIps
- localManagerIps = getLocalManagerIps();
- if (localManagerIps == null) {
- return null;
- }
- } finally {
- if (httpPost != null) {
- httpPost.releaseConnection();
- }
- if (httpClient != null) {
- httpClient.getConnectionManager().shutdown();
- }
+ return null;
+ } catch (Throwable e) {
+ logger.error("Connect Manager error, message: {}, url is {}",
e.getMessage(), url);
+ return null;
+ } finally {
+ if (httpPost != null) {
+ httpPost.releaseConnection();
+ }
+ if (httpClient != null) {
+ httpClient.getConnectionManager().shutdown();
}
}
}
@@ -803,36 +741,8 @@ public class ProxyConfigManager extends Thread {
return httpClient;
}
- private String getLocalManagerIps() {
- String localManagerIps;
- try {
- File localManagerIpsFile = new
File(clientConfig.getManagerIpLocalPath());
- if (localManagerIpsFile.exists()) {
- byte[] serialized;
- serialized =
FileUtils.readFileToByteArray(localManagerIpsFile);
- if (serialized == null) {
- LOGGER.error("Local managerIp file is empty, file path : "
- + clientConfig.getManagerIpLocalPath());
- return null;
- }
- localManagerIps = new String(serialized, "UTF-8");
- } else {
- if (!localManagerIpsFile.getParentFile().exists()) {
- localManagerIpsFile.getParentFile().mkdirs();
- }
- localManagerIps = "";
- LOGGER.error("Get local managerIpList not exist, file path : "
- + clientConfig.getManagerIpLocalPath());
- }
- } catch (Throwable t) {
- localManagerIps = "";
- LOGGER.error("Get local managerIpList occur exception,", t);
- }
- return localManagerIps;
- }
-
public void updateHashRing(List<HostInfo> newHosts) {
this.hashRing.updateNode(newHosts);
- LOGGER.debug("update hash ring {}",
hashRing.getVirtualNode2RealNode());
+ logger.debug("update hash ring {}",
hashRing.getVirtualNode2RealNode());
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
index 8e00ff8c3e..2f75226925 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.network;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
@@ -30,8 +31,8 @@ import java.util.concurrent.TimeUnit;
public class SyncMessageCallable implements Callable<SendResult> {
- private static final Logger logger = LoggerFactory
- .getLogger(SyncMessageCallable.class);
+ private static final Logger logger =
LoggerFactory.getLogger(SyncMessageCallable.class);
+ private static final LogCounter exptCnt = new LogCounter(10, 100000, 60 *
1000L);
private final NettyClient client;
private final CountDownLatch awaitLatch = new CountDownLatch(1);
@@ -55,13 +56,13 @@ public class SyncMessageCallable implements
Callable<SendResult> {
}
public SendResult call() throws Exception {
- // TODO Auto-generated method stub
try {
ChannelFuture channelFuture = client.write(encodeObject);
awaitLatch.await(timeout, timeUnit);
- } catch (Exception e) {
- logger.error("SendResult call", e);
- e.printStackTrace();
+ } catch (Throwable ex) {
+ if (exptCnt.shouldPrint()) {
+ logger.warn("SyncMessageCallable write data throw exception",
ex);
+ }
return SendResult.UNKOWN_ERROR;
}
return message;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
index 01d36c23fb..19291e4336 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
@@ -22,8 +22,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class TimeScanObject {
- private AtomicInteger count = new AtomicInteger(0);
- private AtomicLong time = new AtomicLong(0);
+ private final AtomicInteger count = new AtomicInteger(0);
+ private final AtomicLong time = new AtomicLong(0);
public TimeScanObject() {
this.count.set(0);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java
deleted file mode 100644
index 1424c3cb8f..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.threads;
-
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ServiceDiscoveryUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * manager fetch thread
- */
-public class ManagerFetcherThread extends Thread {
-
- private final Logger logger =
LoggerFactory.getLogger(ManagerFetcherThread.class);
- private volatile boolean isShutdown;
- private final ProxyClientConfig proxyClientConfig;
-
- public ManagerFetcherThread(ProxyClientConfig proxyClientConfig) {
- isShutdown = false;
- this.proxyClientConfig = proxyClientConfig;
- this.setDaemon(true);
- this.setName("ManagerFetcherThread");
- }
-
- public void shutdown() {
- logger.info("Begin to shutdown ManagerFetcherThread.");
- isShutdown = true;
- }
-
- @Override
- public void run() {
- logger.info("ManagerFetcherThread Thread=" +
Thread.currentThread().getId() + " started !");
- while (!isShutdown) {
- try {
- String managerIpList =
ServiceDiscoveryUtils.getManagerIpList(proxyClientConfig);
- if (StringUtils.isBlank(managerIpList)) {
- logger.error("ManagerFetcher get managerIpList is blank.");
- } else {
-
ServiceDiscoveryUtils.updateManagerInfo2Local(managerIpList,
- proxyClientConfig.getManagerIpLocalPath());
- }
- TimeUnit.MILLISECONDS.sleep((long)
proxyClientConfig.getProxyUpdateIntervalMinutes() * 60 * 1000);
- } catch (Throwable e) {
- logger.error("ManagerFetcher get or save managerIpList occur
error,", e);
- }
- }
- }
-}
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
index bf26d3fc59..eb99c43b9e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
@@ -53,12 +53,11 @@ public class EventLoopUtil {
} else if (!enableBusyWait) {
return new EpollEventLoopGroup(nThreads, threadFactory);
} else {
- EpollEventLoopGroup eventLoopGroup = new
EpollEventLoopGroup(nThreads, threadFactory, () -> {
+ return new EpollEventLoopGroup(nThreads, threadFactory, () -> {
return (selectSupplier, hasTasks) -> {
return -3;
};
});
- return eventLoopGroup;
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
deleted file mode 100644
index 6352e25308..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import org.apache.inlong.common.util.BasicAuth;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.network.IpUtils;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.Header;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
-import org.apache.http.ssl.SSLContexts;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Utils for service discovery
- */
-public class ServiceDiscoveryUtils {
-
- private static final Logger log =
LoggerFactory.getLogger(ServiceDiscoveryUtils.class);
-
- private static final String GET_MANAGER_IP_LIST_API =
"/inlong/manager/openapi/agent/getManagerIpList";
- private static String latestManagerIPList = "";
- private static String arraySed = ",";
-
- /**
- * Get Inlong-Manager IP list from the given proxy client config
- */
- public static String getManagerIpList(ProxyClientConfig clientConfig) {
- String managerAddress = clientConfig.getManagerAddress();
- if (StringUtils.isBlank(managerAddress)) {
- log.error("ServiceDiscovery get managerIpList but managerAddress
is blank, just return");
- return null;
- }
-
- String managerIpList = getManagerIpListByHttp(managerAddress,
clientConfig);
- if (StringUtils.isNotBlank(managerIpList)) {
- latestManagerIPList = managerIpList;
- return managerIpList;
- }
-
- log.error("ServiceDiscovery get managerIpList from {} occur error, try
to get from latestManagerIPList",
- managerAddress);
-
- String[] managerIps = latestManagerIPList.split(arraySed);
- if (managerIps.length > 0) {
- for (String managerIp : managerIps) {
- if (StringUtils.isBlank(managerIp)) {
- log.error("ServiceDiscovery managerIp is null,
latestManagerIPList is {}", latestManagerIPList);
- continue;
- }
-
- String currentAddress = managerIp + ":" +
clientConfig.getManagerPort();
- managerIpList = getManagerIpListByHttp(currentAddress,
clientConfig);
- if (StringUtils.isBlank(managerIpList)) {
- log.error("ServiceDiscovery get latestManagerIPList from
{} but got nothing, will try next ip",
- managerIp);
- continue;
- }
- latestManagerIPList = managerIpList;
- return managerIpList;
- }
- } else {
- log.error("ServiceDiscovery latestManagerIpList {} format error,
or not contain ip", latestManagerIPList);
- }
-
- String existedIpList =
getLocalManagerIpList(clientConfig.getManagerIpLocalPath());
- if (StringUtils.isNotBlank(existedIpList)) {
- String[] existedIps = existedIpList.split(arraySed);
- if (existedIps.length > 0) {
- for (String existedIp : existedIps) {
- if (StringUtils.isBlank(existedIp)) {
- log.error("ServiceDiscovery get illegal format ipList
from local file, "
- + "exist ip is empty, managerIpList is {},
local file is {}",
- existedIpList,
clientConfig.getManagerIpLocalPath());
- continue;
- }
-
- String currentAddress = existedIp + ":" +
clientConfig.getManagerPort();
- managerIpList = getManagerIpListByHttp(currentAddress,
clientConfig);
- if (StringUtils.isBlank(managerIpList)) {
- log.error("ServiceDiscovery get {} from local file {}
but got nothing, will try next ip",
- existedIp,
clientConfig.getManagerIpLocalPath());
- continue;
- }
- latestManagerIPList = managerIpList;
- return managerIpList;
- }
- } else {
- log.error("ServiceDiscovery get illegal format ipList from
local file, "
- + "exist ip is empty, managerIpList is {}, local file
is {}",
- existedIpList, clientConfig.getManagerIpLocalPath());
- }
- } else {
- log.error("ServiceDiscovery get empty ipList from local file {}",
clientConfig.getManagerIpLocalPath());
- }
-
- return managerIpList;
- }
-
- /**
- * Get Inlong-Manager IP list from the given managerIp and proxy client
config
- */
- public static String getManagerIpListByHttp(String managerIp,
ProxyClientConfig proxyClientConfig) {
- String url = managerIp + GET_MANAGER_IP_LIST_API;
- ArrayList<BasicNameValuePair> params = new
ArrayList<BasicNameValuePair>();
- params.add(new BasicNameValuePair("operation", "query"));
- params.add(new BasicNameValuePair("username",
proxyClientConfig.getUserName()));
-
- log.info("Begin to get configure from manager {}, param is {}", url,
params);
- CloseableHttpClient httpClient;
- HttpParams myParams = new BasicHttpParams();
- HttpConnectionParams.setConnectionTimeout(myParams,
proxyClientConfig.getManagerConnectionTimeout());
- HttpConnectionParams.setSoTimeout(myParams,
proxyClientConfig.getManagerSocketTimeout());
- if (proxyClientConfig.isRequestByHttp()) {
- httpClient = new DefaultHttpClient(myParams);
- } else {
- try {
- ArrayList<Header> headers = new ArrayList<>();
- for (BasicNameValuePair paramItem : params) {
- headers.add(new BasicHeader(paramItem.getName(),
paramItem.getValue()));
- }
- RequestConfig requestConfig = RequestConfig.custom()
-
.setConnectTimeout(10000).setSocketTimeout(30000).build();
- SSLContext sslContext = SSLContexts.custom().build();
- SSLConnectionSocketFactory sslsf = new
SSLConnectionSocketFactory(sslContext,
- new String[]{"TLSv1"}, null,
-
SSLConnectionSocketFactory.getDefaultHostnameVerifier());
- httpClient = HttpClients.custom().setDefaultHeaders(headers)
-
.setDefaultRequestConfig(requestConfig).setSSLSocketFactory(sslsf).build();
- } catch (Throwable t) {
- log.error("Create Https client failed: ", t);
- return null;
- }
- }
-
- HttpPost httpPost = null;
- try {
- httpPost = new HttpPost(url);
- if (proxyClientConfig.isNeedAuthentication()) {
- long timestamp = System.currentTimeMillis();
- int nonce = new
SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
- httpPost.setHeader(BasicAuth.BASIC_AUTH_HEADER,
-
IpUtils.getAuthorizenInfo(proxyClientConfig.getUserName(),
- proxyClientConfig.getSecretKey(), timestamp,
nonce));
- }
- httpPost.setEntity(new UrlEncodedFormEntity(params));
- HttpResponse response = httpClient.execute(httpPost);
- String returnStr = EntityUtils.toString(response.getEntity());
- if (StringUtils.isNotBlank(returnStr) &&
response.getStatusLine().getStatusCode() == 200) {
- log.info("Get configure from manager is " + returnStr);
- JsonParser jsonParser = new JsonParser();
- JsonObject jb = jsonParser.parse(returnStr).getAsJsonObject();
- if (jb == null) {
- log.warn("ServiceDiscovery updated manager ip failed,
returnStr = {} jb is "
- + "null ", returnStr, jb);
- return null;
- }
- JsonArray retData = jb.get("data").getAsJsonArray();
- List<String> managerIpList = new ArrayList<>();
- for (JsonElement datum : retData) {
- JsonObject record = datum.getAsJsonObject();
- managerIpList.add(record.get("ip").getAsString());
- }
- if (managerIpList.isEmpty()) {
- return null;
- }
- String strIPs = String.join(",", managerIpList);
- log.info("ServiceDiscovery updated manager ip success, ip : "
+ strIPs + ", retStr : " + returnStr);
- return strIPs;
- }
- return null;
- } catch (Throwable t) {
- log.error("Connect Manager error: ", t);
- return null;
- } finally {
- if (httpPost != null) {
- httpPost.releaseConnection();
- }
- if (httpClient != null) {
- httpClient.getConnectionManager().shutdown();
- }
- }
- }
-
- /**
- * Get Inlong-Manager IP list from local path
- */
- public static String getLocalManagerIpList(String localPath) {
- log.info("ServiceDiscovery start loading config from file {} ...",
localPath);
- String newestIp = null;
- try {
- File managerIpListFile = new File(localPath);
- if (!managerIpListFile.exists()) {
- log.info("ServiceDiscovery not found local groupIdInfo file
from {}", localPath);
- return null;
- }
- byte[] serialized =
FileUtils.readFileToByteArray(managerIpListFile);
- if (serialized == null) {
- return null;
- }
- newestIp = new String(serialized, StandardCharsets.UTF_8);
- log.info("ServiceDiscovery get manager ip list from local success,
result is: {}", newestIp);
- } catch (IOException e) {
- log.error("ServiceDiscovery load manager config error: ", e);
- }
-
- return newestIp;
- }
-
- /**
- * Update Inlong-Manager info to local file
- */
- public static void updateManagerInfo2Local(String storeString, String
path) {
- if (StringUtils.isBlank(storeString)) {
- log.warn("ServiceDiscovery updateTdmInfo2Local error, configMap is
empty or managerIpList is blank");
- return;
- }
- File localPath = new File(path);
- if (!localPath.getParentFile().exists()) {
- localPath.getParentFile().mkdirs();
- }
-
- try (BufferedWriter writer = new BufferedWriter(
- new OutputStreamWriter(new FileOutputStream(localPath),
StandardCharsets.UTF_8))) {
- writer.write(storeString);
- writer.flush();
- } catch (IOException e) {
- log.error("ServiceDiscovery save manager config error: ", e);
- }
- }
-
-}