fuweng11 commented on code in PR #11596:
URL: https://github.com/apache/inlong/pull/11596#discussion_r1879495714
##########
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java:
##########
@@ -19,107 +19,115 @@
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
-import com.sun.management.OperatingSystemMXBean;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class NettyClient {
private static final Logger logger =
LoggerFactory.getLogger(NettyClient.class);
+ private static final LogCounter conExptCnt = new LogCounter(10, 100000, 60
* 1000L);
- private Channel channel = null;
- private final ReentrantLock stateLock = new ReentrantLock();
-
- private ConnState connState;
- private ProxyClientConfig configure;
- private Bootstrap bootstrap;
- private String serverIP;
- private int serverPort;
-
- public String getServerIP() {
- return serverIP;
- }
+ private final static int CLIENT_STATUS_INIT = -1;
+ private final static int CLIENT_STATUS_READY = 0;
+ private final static int CLIENT_STATUS_FROZEN = 1;
+ private final static int CLIENT_STATUS_DEAD = 2;
+ private final static int CLIENT_STATUS_BUSY = 3;
- public void setServerIP(String serverIP) {
- this.serverIP = serverIP;
- }
+ private final String callerId;
+ private final ProxyClientConfig configure;
+ private final Bootstrap bootstrap;
+ private final HostInfo hostInfo;
+ private Channel channel = null;
+ private final AtomicInteger conStatus = new
AtomicInteger(CLIENT_STATUS_INIT);
+ private final AtomicLong msgInFlight = new AtomicLong(0);
+ private final AtomicLong lstSendTime = new AtomicLong(0);
+ private final Semaphore reconSemaphore = new Semaphore(1, true);
+ private final AtomicLong lstReConTime = new AtomicLong(0);
- public NettyClient(Bootstrap bootstrap, String serverIP,
- int serverPort, ProxyClientConfig configure) {
+ public NettyClient(String callerId,
+ Bootstrap bootstrap, HostInfo hostInfo, ProxyClientConfig
configure) {
+ this.callerId = callerId;
this.bootstrap = bootstrap;
- this.serverIP = serverIP;
- this.serverPort = serverPort;
+ this.hostInfo = hostInfo;
this.configure = configure;
- setState(ConnState.INIT);
- }
-
- public Channel getChannel() {
- return channel;
+ setState(CLIENT_STATUS_INIT);
}
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
- public boolean connect() {
- // Connect to server.
-
- setState(ConnState.INIT);
+ public boolean connect(boolean needPrint) {
+ // Initial status
+ this.setState(CLIENT_STATUS_INIT);
+ long curTime = System.currentTimeMillis();
final CountDownLatch awaitLatch = new CountDownLatch(1);
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(
- serverIP, serverPort));
+ // Build connect to server
+ ChannelFuture future = bootstrap.connect(
+ new InetSocketAddress(hostInfo.getHostName(),
hostInfo.getPortNumber()));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture arg0) throws Exception
{
- logger.info("connect ack! {}", serverIP);
awaitLatch.countDown();
+ // logger.debug("Connect to {} ack!",
hostInfo.getReferenceName());
Review Comment:
Please remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]