This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new eba89c79bd [INLONG-10102][DataProxy] Adjust the position where Source
calls addSourceReportInfo() (#10103)
eba89c79bd is described below
commit eba89c79bd03e737f6abb468e2d6c2932121cc48
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Apr 29 17:19:48 2024 +0800
[INLONG-10102][DataProxy] Adjust the position where Source calls
addSourceReportInfo() (#10103)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/dataproxy/config/holder/SourceReportConfigHolder.java | 7 ++++---
.../java/org/apache/inlong/dataproxy/consts/SourceConstants.java | 4 ++--
.../main/java/org/apache/inlong/dataproxy/source/BaseSource.java | 9 +++++++--
.../org/apache/inlong/dataproxy/source/ServerMessageFactory.java | 2 +-
.../org/apache/inlong/dataproxy/source/SimpleHttpSource.java | 6 +-----
.../java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java | 6 +-----
.../java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java | 7 +------
7 files changed, 17 insertions(+), 24 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
index cdcee01f00..cb25527377 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
@@ -23,8 +23,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
*
@@ -36,7 +36,7 @@ public class SourceReportConfigHolder {
public static final Logger LOG =
LoggerFactory.getLogger(SourceReportConfigHolder.class);
- private final Map<String, AddressInfo> srcAddressMap = new HashMap<>();
+ private final Map<String, AddressInfo> srcAddressMap = new
ConcurrentHashMap<>();
public SourceReportConfigHolder() {
@@ -53,7 +53,8 @@ public class SourceReportConfigHolder {
return;
}
String recordKey = sourceIp + "#" + sourcePort + "#" + protocolType;
- srcAddressMap.put(recordKey, new AddressInfo(sourceIp, sourcePort,
rptSrcType, protocolType));
+ this.srcAddressMap.put(recordKey,
+ new AddressInfo(sourceIp, sourcePort, rptSrcType,
protocolType));
}
public Map<String, AddressInfo> getSrcAddressInfos() {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
index 9ead853540..86dc0c6fb3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
@@ -31,8 +31,8 @@ public class SourceConstants {
public static final String SRCCXT_CONFIG_PORT = "port";
// system env source port
public static final String SYSENV_HOST_PORT = "inlongHostPort";
- // source logic type name
- public static final String SRCCXT_LOGIC_TYPE_NAME = "logic-type-name";
+ // source logic execute type
+ public static final String SRCCXT_LOGIC_EXECUTE_TYPE =
"logic-execute-type";
// message factory name
public static final String SRCCXT_MSG_FACTORY_NAME = "msg-factory-name";
// message handler name
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index acf0e4d8a4..71f5c538d4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -149,9 +149,9 @@ public abstract class BaseSource
this.strPort = String.valueOf(this.srcPort);
// get source logic type
String tmpVal = context.getString(
- SourceConstants.SRCCXT_LOGIC_TYPE_NAME,
ReportResourceType.INLONG);
+ SourceConstants.SRCCXT_LOGIC_EXECUTE_TYPE,
ReportResourceType.INLONG);
Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
- SourceConstants.SRCCXT_LOGIC_TYPE_NAME + " config is blank");
+ SourceConstants.SRCCXT_LOGIC_EXECUTE_TYPE + " config is
blank");
this.rptSrcType = tmpVal.trim().toUpperCase();
// get message factory
tmpVal = context.getString(SourceConstants.SRCCXT_MSG_FACTORY_NAME,
@@ -276,7 +276,12 @@ public abstract class BaseSource
}
startSource();
// register
+ ConfigManager.getInstance().addSourceReportInfo(srcHost,
+ String.valueOf(srcPort), rptSrcType,
getProtocolName().toUpperCase());
AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE,
this.cachedSrcName, this);
+ logger.info("Source {} started at ({}:{}), {}={}, Protocal={}",
this.getCachedSrcName(),
+ srcHost, srcPort, SourceConstants.SRCCXT_LOGIC_EXECUTE_TYPE,
rptSrcType,
+ getProtocolName().toUpperCase());
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 517c11113b..5f5fda4a7c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -78,7 +78,7 @@ public class ServerMessageFactory extends
ChannelInitializer<SocketChannel> {
ChannelInboundHandlerAdapter messageHandler =
(ChannelInboundHandlerAdapter)
ctor.newInstance(source);
ch.pipeline().addLast("messageHandler", messageHandler);
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error("{} newInstance {} failure!",
source.getCachedSrcName(),
source.getMessageHandlerName(), e);
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
index 3a90e9f10a..4bd2c33d59 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
@@ -96,15 +96,11 @@ public class SimpleHttpSource extends BaseSource implements
Configurable {
} else {
channelFuture = bootstrap.bind(new InetSocketAddress(srcHost,
srcPort)).sync();
}
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("Source {} bind ({}:{}) error, program will exit! e =
{}",
this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
- ConfigManager.getInstance().addSourceReportInfo(srcHost,
- String.valueOf(srcPort), rptSrcType,
getProtocolName().toUpperCase());
- logger.info("Source {} started at ({}:{}), rptSrcType={}!",
- this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index b343dc4dcb..ee5838d58d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -105,15 +105,11 @@ public class SimpleTcpSource extends BaseSource
implements Configurable {
} else {
channelFuture = bootstrap.bind(new InetSocketAddress(srcHost,
srcPort)).sync();
}
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("Source {} bind ({}:{}) error, program will exit! e =
{}",
this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
- ConfigManager.getInstance().addSourceReportInfo(srcHost,
- String.valueOf(srcPort), rptSrcType,
getProtocolName().toUpperCase());
- logger.info("Source {} started at ({}:{}), rptSrcType={}!",
- this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index 380fc3dab9..f427e38c06 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -17,7 +17,6 @@
package org.apache.inlong.dataproxy.source;
-import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.SourceConstants;
import io.netty.bootstrap.Bootstrap;
@@ -68,15 +67,11 @@ public class SimpleUdpSource extends BaseSource implements
Configurable {
} else {
channelFuture = bootstrap.bind(new InetSocketAddress(srcHost,
srcPort)).sync();
}
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("Source {} bind ({}:{}) error, program will exit! e =
{}",
this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
- ConfigManager.getInstance().addSourceReportInfo(srcHost,
- String.valueOf(srcPort), rptSrcType,
getProtocolName().toUpperCase());
- logger.info("Source {} started at ({}:{}), rptSrcType={}!",
- this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
}
@Override