This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new eba89c79bd [INLONG-10102][DataProxy] Adjust the position where Source 
calls addSourceReportInfo() (#10103)
eba89c79bd is described below

commit eba89c79bd03e737f6abb468e2d6c2932121cc48
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Apr 29 17:19:48 2024 +0800

    [INLONG-10102][DataProxy] Adjust the position where Source calls 
addSourceReportInfo() (#10103)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/dataproxy/config/holder/SourceReportConfigHolder.java | 7 ++++---
 .../java/org/apache/inlong/dataproxy/consts/SourceConstants.java | 4 ++--
 .../main/java/org/apache/inlong/dataproxy/source/BaseSource.java | 9 +++++++--
 .../org/apache/inlong/dataproxy/source/ServerMessageFactory.java | 2 +-
 .../org/apache/inlong/dataproxy/source/SimpleHttpSource.java     | 6 +-----
 .../java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java | 6 +-----
 .../java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java | 7 +------
 7 files changed, 17 insertions(+), 24 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
index cdcee01f00..cb25527377 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/SourceReportConfigHolder.java
@@ -23,8 +23,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * 
@@ -36,7 +36,7 @@ public class SourceReportConfigHolder {
     public static final Logger LOG =
             LoggerFactory.getLogger(SourceReportConfigHolder.class);
 
-    private final Map<String, AddressInfo> srcAddressMap = new HashMap<>();
+    private final Map<String, AddressInfo> srcAddressMap = new 
ConcurrentHashMap<>();
 
     public SourceReportConfigHolder() {
 
@@ -53,7 +53,8 @@ public class SourceReportConfigHolder {
             return;
         }
         String recordKey = sourceIp + "#" + sourcePort + "#" + protocolType;
-        srcAddressMap.put(recordKey, new AddressInfo(sourceIp, sourcePort, 
rptSrcType, protocolType));
+        this.srcAddressMap.put(recordKey,
+                new AddressInfo(sourceIp, sourcePort, rptSrcType, 
protocolType));
     }
 
     public Map<String, AddressInfo> getSrcAddressInfos() {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
index 9ead853540..86dc0c6fb3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
@@ -31,8 +31,8 @@ public class SourceConstants {
     public static final String SRCCXT_CONFIG_PORT = "port";
     // system env source port
     public static final String SYSENV_HOST_PORT = "inlongHostPort";
-    // source logic type name
-    public static final String SRCCXT_LOGIC_TYPE_NAME = "logic-type-name";
+    // source logic execute type
+    public static final String SRCCXT_LOGIC_EXECUTE_TYPE = 
"logic-execute-type";
     // message factory name
     public static final String SRCCXT_MSG_FACTORY_NAME = "msg-factory-name";
     // message handler name
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index acf0e4d8a4..71f5c538d4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -149,9 +149,9 @@ public abstract class BaseSource
         this.strPort = String.valueOf(this.srcPort);
         // get source logic type
         String tmpVal = context.getString(
-                SourceConstants.SRCCXT_LOGIC_TYPE_NAME, 
ReportResourceType.INLONG);
+                SourceConstants.SRCCXT_LOGIC_EXECUTE_TYPE, 
ReportResourceType.INLONG);
         Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
-                SourceConstants.SRCCXT_LOGIC_TYPE_NAME + " config is blank");
+                SourceConstants.SRCCXT_LOGIC_EXECUTE_TYPE + " config is 
blank");
         this.rptSrcType = tmpVal.trim().toUpperCase();
         // get message factory
         tmpVal = context.getString(SourceConstants.SRCCXT_MSG_FACTORY_NAME,
@@ -276,7 +276,12 @@ public abstract class BaseSource
         }
         startSource();
         // register
+        ConfigManager.getInstance().addSourceReportInfo(srcHost,
+                String.valueOf(srcPort), rptSrcType, 
getProtocolName().toUpperCase());
         AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, 
this.cachedSrcName, this);
+        logger.info("Source {} started at ({}:{}), {}={}, Protocal={}", 
this.getCachedSrcName(),
+                srcHost, srcPort, SourceConstants.SRCCXT_LOGIC_EXECUTE_TYPE, 
rptSrcType,
+                getProtocolName().toUpperCase());
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 517c11113b..5f5fda4a7c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -78,7 +78,7 @@ public class ServerMessageFactory extends 
ChannelInitializer<SocketChannel> {
                 ChannelInboundHandlerAdapter messageHandler =
                         (ChannelInboundHandlerAdapter) 
ctor.newInstance(source);
                 ch.pipeline().addLast("messageHandler", messageHandler);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 LOG.error("{} newInstance {} failure!", 
source.getCachedSrcName(),
                         source.getMessageHandlerName(), e);
             }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
index 3a90e9f10a..4bd2c33d59 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
@@ -96,15 +96,11 @@ public class SimpleHttpSource extends BaseSource implements 
Configurable {
             } else {
                 channelFuture = bootstrap.bind(new InetSocketAddress(srcHost, 
srcPort)).sync();
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("Source {} bind ({}:{}) error, program will exit! e = 
{}",
                     this.getCachedSrcName(), srcHost, srcPort, e);
             System.exit(-1);
         }
-        ConfigManager.getInstance().addSourceReportInfo(srcHost,
-                String.valueOf(srcPort), rptSrcType, 
getProtocolName().toUpperCase());
-        logger.info("Source {} started at ({}:{}), rptSrcType={}!",
-                this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index b343dc4dcb..ee5838d58d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -105,15 +105,11 @@ public class SimpleTcpSource extends BaseSource 
implements Configurable {
             } else {
                 channelFuture = bootstrap.bind(new InetSocketAddress(srcHost, 
srcPort)).sync();
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("Source {} bind ({}:{}) error, program will exit! e = 
{}",
                     this.getCachedSrcName(), srcHost, srcPort, e);
             System.exit(-1);
         }
-        ConfigManager.getInstance().addSourceReportInfo(srcHost,
-                String.valueOf(srcPort), rptSrcType, 
getProtocolName().toUpperCase());
-        logger.info("Source {} started at ({}:{}), rptSrcType={}!",
-                this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index 380fc3dab9..f427e38c06 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.dataproxy.source;
 
-import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.SourceConstants;
 
 import io.netty.bootstrap.Bootstrap;
@@ -68,15 +67,11 @@ public class SimpleUdpSource extends BaseSource implements 
Configurable {
             } else {
                 channelFuture = bootstrap.bind(new InetSocketAddress(srcHost, 
srcPort)).sync();
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("Source {} bind ({}:{}) error, program will exit! e = 
{}",
                     this.getCachedSrcName(), srcHost, srcPort, e);
             System.exit(-1);
         }
-        ConfigManager.getInstance().addSourceReportInfo(srcHost,
-                String.valueOf(srcPort), rptSrcType, 
getProtocolName().toUpperCase());
-        logger.info("Source {} started at ({}:{}), rptSrcType={}!",
-                this.getCachedSrcName(), srcHost, srcPort, rptSrcType);
     }
 
     @Override

Reply via email to