This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 783426eb68 [INLONG-8657][DataProxy] Cache Source, Sink name and 
Channel object content (#8658)
783426eb68 is described below

commit 783426eb683380cfeb8c702a5af7848afe9ee05b
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Aug 8 14:28:27 2023 +0800

    [INLONG-8657][DataProxy] Cache Source, Sink name and Channel object content 
(#8658)
---
 .../dataproxy/sink/mq/MessageQueueZoneSink.java    | 37 +++++++++++---------
 .../sink/mq/MessageQueueZoneSinkContext.java       |  2 +-
 .../dataproxy/sink/mq/MessageQueueZoneWorker.java  |  2 +-
 .../apache/inlong/dataproxy/source/BaseSource.java | 39 ++++++++++++++--------
 .../dataproxy/source/ServerMessageFactory.java     |  2 +-
 .../dataproxy/source/ServerMessageHandler.java     | 13 ++++----
 .../inlong/dataproxy/source/SimpleHttpSource.java  | 10 +++---
 .../inlong/dataproxy/source/SimpleTcpSource.java   | 10 +++---
 .../inlong/dataproxy/source/SimpleUdpSource.java   |  6 ++--
 .../source/httpMsg/HttpMessageHandler.java         |  7 ++--
 10 files changed, 75 insertions(+), 53 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index c6f42ef5bc..a9897641fe 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -59,7 +59,8 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 
* 1000);
 
     private final long MQ_CLUSTER_STATUS_CHECK_DUR_MS = 2000L;
-
+    private String cachedSinkName;
+    private Channel cachedMsgChannel;
     private Context parentContext;
     private MessageQueueZoneSinkContext context;
     private final List<MessageQueueZoneWorker> workers = new ArrayList<>();
@@ -90,7 +91,8 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
      */
     @Override
     public void configure(Context context) {
-        logger.info("{} start to configure, context:{}.", this.getName(), 
context.toString());
+        this.cachedSinkName = getName();
+        logger.info("{} start to configure, context:{}.", this.cachedSinkName, 
context.toString());
         this.parentContext = context;
     }
 
@@ -100,11 +102,12 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
     @Override
     public void start() {
         if (getChannel() == null) {
-            logger.error("{}'s channel is null", this.getName());
+            logger.error("{}'s channel is null", this.cachedSinkName);
         }
+        cachedMsgChannel = getChannel();
         try {
             ConfigManager.getInstance().regMetaConfigChgCallback(this);
-            this.context = new MessageQueueZoneSinkContext(this, 
parentContext, getChannel());
+            this.context = new MessageQueueZoneSinkContext(this, 
parentContext, cachedMsgChannel);
             this.context.start();
             this.dispatchManager = new BatchPackManager(this, parentContext);
             this.scheduledPool = Executors.newScheduledThreadPool(2);
@@ -122,7 +125,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
             this.zoneProducer.start();
             // start configure change listener thread
             this.configListener = new Thread(new ConfigChangeProcessor());
-            this.configListener.setName(getName() + "-configure-listener");
+            this.configListener.setName(this.cachedSinkName + 
"-configure-listener");
             this.configListener.start();
             // create worker
             MessageQueueZoneWorker zoneWorker;
@@ -133,7 +136,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
                 this.workers.add(zoneWorker);
             }
         } catch (Exception e) {
-            logger.error("{} start failure", this.getName(), e);
+            logger.error("{} start failure", this.cachedSinkName, e);
         }
         super.start();
     }
@@ -159,7 +162,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
             try {
                 worker.close();
             } catch (Throwable e) {
-                logger.error("{} stop Zone worker failure", this.getName(), e);
+                logger.error("{} stop Zone worker failure", 
this.cachedSinkName, e);
             }
         }
         this.context.close();
@@ -185,11 +188,10 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
             }
         }
         this.dispatchManager.outputOvertimeData();
-        Channel channel = getChannel();
-        Transaction tx = channel.getTransaction();
+        Transaction tx = cachedMsgChannel.getTransaction();
         tx.begin();
         try {
-            Event event = channel.take();
+            Event event = cachedMsgChannel.take();
             // no data
             if (event == null) {
                 tx.commit();
@@ -248,13 +250,13 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
         } catch (Throwable t) {
             
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_FAILURE);
             if (logCounter.shouldPrint()) {
-                logger.error("{} process event failed!", this.getName(), t);
+                logger.error("{} process event failed!", this.cachedSinkName, 
t);
             }
             try {
                 tx.rollback();
             } catch (Throwable e) {
                 if (logCounter.shouldPrint()) {
-                    logger.error("{} channel take transaction rollback 
exception", this.getName(), e);
+                    logger.error("{} channel take transaction rollback 
exception", this.cachedSinkName, e);
                 }
             }
             return Status.BACKOFF;
@@ -262,6 +264,11 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
             tx.close();
         }
     }
+
+    public String getCachedSinkName() {
+        return cachedSinkName;
+    }
+
     public boolean isMqClusterStarted() {
         return mqClusterStarted;
     }
@@ -325,13 +332,13 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
         @Override
         public void run() {
             long lastCheckTime;
-            logger.info("{} config-change processor start!", getName());
+            logger.info("{} config-change processor start!", cachedSinkName);
             while (!isShutdown) {
                 reentrantLock.lock();
                 try {
                     condition.await();
                 } catch (InterruptedException e1) {
-                    logger.info("{} config-change processor meet interrupt, 
break!", getName());
+                    logger.info("{} config-change processor meet interrupt, 
break!", cachedSinkName);
                     break;
                 } finally {
                     reentrantLock.unlock();
@@ -344,7 +351,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
                     zoneProducer.reloadMetaConfig();
                 } while (lastCheckTime != lastNotifyTime.get());
             }
-            logger.info("{} config-change processor exit!", getName());
+            logger.info("{} config-change processor exit!", cachedSinkName);
         }
     }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 33168baf8c..495b62d325 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -55,7 +55,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
      * Constructor
      */
     public MessageQueueZoneSinkContext(MessageQueueZoneSink mqZoneSink, 
Context context, Channel channel) {
-        super(mqZoneSink.getName(), context, channel);
+        super(mqZoneSink.getCachedSinkName(), context, channel);
         this.mqZoneSink = mqZoneSink;
         // proxyClusterId
         this.proxyClusterId = 
CommonConfigHolder.getInstance().getClusterName();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
index 524c322db4..e12b1de015 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
@@ -44,7 +44,7 @@ public class MessageQueueZoneWorker extends Thread {
             long fetchWaitMs, MessageQueueZoneProducer zoneProducer) {
         super();
         this.mqZoneSink = mqZoneSink;
-        this.workerName = mqZoneSink.getName() + "-worker-" + workerIndex;
+        this.workerName = mqZoneSink.getCachedSinkName() + "-worker-" + 
workerIndex;
         this.fetchWaitMs = fetchWaitMs;
         this.zoneProducer = zoneProducer;
         this.status = LifecycleState.IDLE;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index e93c43910b..1fa617dc4d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -51,6 +51,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.FlumeException;
+import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.source.AbstractSource;
 import org.slf4j.Logger;
@@ -78,6 +79,8 @@ public abstract class BaseSource
     protected Context context;
     // whether source reject service
     protected volatile boolean isRejectService = false;
+    protected String cachedSrcName;
+    protected ChannelProcessor cachedChProcessor;
     // source service host
     protected String srcHost;
     // source serviced port
@@ -133,7 +136,8 @@ public abstract class BaseSource
 
     @Override
     public void configure(Context context) {
-        logger.info("{} start to configure context:{}.", this.getName(), 
context.toString());
+        this.cachedSrcName = getName();
+        logger.info("{} start to configure context:{}.", this.cachedSrcName, 
context.toString());
         this.context = context;
         this.srcHost = getHostIp(context);
         this.srcPort = getHostPort(context);
@@ -246,9 +250,10 @@ public abstract class BaseSource
             FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
         }
         super.start();
+        this.cachedChProcessor = getChannelProcessor();
         // initial metric item set
         this.metricItemSet = new DataProxyMetricItemSet(
-                CommonConfigHolder.getInstance().getClusterName(), getName(), 
String.valueOf(srcPort));
+                CommonConfigHolder.getInstance().getClusterName(), 
this.cachedSrcName, String.valueOf(srcPort));
         MetricRegister.register(metricItemSet);
         // init monitor logic
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
@@ -258,25 +263,25 @@ public abstract class BaseSource
             this.monitorIndex.start();
             this.monitorStats = new MonitorStats(
                     
CommonConfigHolder.getInstance().getFileMetricEventOutName()
-                            + AttrConstants.SEP_HASHTAG + this.getName(),
+                            + AttrConstants.SEP_HASHTAG + this.cachedSrcName,
                     
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
                     
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
             this.monitorStats.start();
         }
         startSource();
         // register
-        AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, 
this.getName(), this);
+        AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, 
this.cachedSrcName, this);
     }
 
     @Override
     public synchronized void stop() {
-        logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(), 
this.getName());
+        logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(), 
this.cachedSrcName);
         // close channels
         if (!allChannels.isEmpty()) {
             try {
                 allChannels.close().awaitUninterruptibly();
             } catch (Exception e) {
-                logger.warn("Close {} netty channels throw exception", 
this.getName(), e);
+                logger.warn("Close {} netty channels throw exception", 
this.cachedSrcName, e);
             } finally {
                 allChannels.clear();
             }
@@ -286,7 +291,7 @@ public abstract class BaseSource
             try {
                 channelFuture.channel().closeFuture().sync();
             } catch (InterruptedException e) {
-                logger.warn("Close {} channel future throw exception", 
this.getName(), e);
+                logger.warn("Close {} channel future throw exception", 
this.cachedSrcName, e);
             }
         }
         // stop super class
@@ -307,7 +312,7 @@ public abstract class BaseSource
                 monitorStats.stop();
             }
         }
-        logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), 
this.getName());
+        logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), 
this.cachedSrcName);
     }
 
     @Override
@@ -334,7 +339,7 @@ public abstract class BaseSource
                 }
             }
             logger.info("Source {} channel check, disconnects {} Illegal 
channels, waist {} ms",
-                    getName(), cnt, (System.currentTimeMillis() - startTime));
+                    this.cachedSrcName, cnt, (System.currentTimeMillis() - 
startTime));
         }
     }
 
@@ -435,7 +440,7 @@ public abstract class BaseSource
             return;
         }
         String tenMinsDt = DateTimeUtils.ms2yyyyMMddHHmmTenMins(dt);
-        strBuff.append(getName()).append(AttrConstants.SEP_HASHTAG)
+        strBuff.append(this.cachedSrcName).append(AttrConstants.SEP_HASHTAG)
                 .append(groupId).append(AttrConstants.SEP_HASHTAG)
                 .append(streamId).append(AttrConstants.SEP_HASHTAG)
                 .append(topicName).append(AttrConstants.SEP_HASHTAG)
@@ -464,7 +469,7 @@ public abstract class BaseSource
     public void addMetric(boolean result, long size, Event event) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, 
CommonConfigHolder.getInstance().getClusterName());
-        dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, getName());
+        dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, this.cachedSrcName);
         dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, getStrPort());
         DataProxyMetricItem.fillInlongId(event, dimensions);
         DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
@@ -486,7 +491,7 @@ public abstract class BaseSource
      */
     public ChannelInitializer getChannelInitializerFactory() {
         ChannelInitializer fac = null;
-        logger.info(this.getName() + " load msgFactory=" + msgFactoryName);
+        logger.info(this.cachedSrcName + " load msgFactory=" + msgFactoryName);
         try {
             Class<? extends ChannelInitializer> clazz =
                     (Class<? extends ChannelInitializer>) 
Class.forName(msgFactoryName);
@@ -495,7 +500,7 @@ public abstract class BaseSource
             fac = (ChannelInitializer) ctor.newInstance(this);
         } catch (Exception e) {
             logger.error("{} start error, fail to construct 
ChannelPipelineFactory with name {}",
-                    this.getName(), msgFactoryName, e);
+                    this.cachedSrcName, msgFactoryName, e);
             stop();
             throw new FlumeException(e.getMessage());
         }
@@ -531,6 +536,14 @@ public abstract class BaseSource
         return isRejectService;
     }
 
+    public String getCachedSrcName() {
+        return cachedSrcName;
+    }
+
+    public ChannelProcessor getCachedChProcessor() {
+        return cachedChProcessor;
+    }
+
     /**
      * getHostIp
      *
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 7b35c27ed4..d68a8b4df2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -77,7 +77,7 @@ public class ServerMessageFactory extends 
ChannelInitializer<SocketChannel> {
                         (ChannelInboundHandlerAdapter) 
ctor.newInstance(source);
                 ch.pipeline().addLast("messageHandler", messageHandler);
             } catch (Exception e) {
-                LOG.error("{} newInstance {} failure!", source.getName(),
+                LOG.error("{} newInstance {} failure!", 
source.getCachedSrcName(),
                         source.getMessageHandlerName(), e);
             }
         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 7dd19dc157..984ddcb2c8 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -204,7 +204,8 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
             ctx.channel().disconnect();
             ctx.channel().close();
             logger.warn("{} refuse to connect = {} , connections = {}, 
maxConnections = {}",
-                    source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
+                    source.getCachedSrcName(), ctx.channel(), 
source.getAllChannels().size(),
+                    source.getMaxConnections());
             return;
         }
         // add legal channel
@@ -212,7 +213,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         ctx.fireChannelActive();
         source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
         logger.info("{} added new channel {}, current connections = {}, 
maxConnections = {}",
-                source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
+                source.getCachedSrcName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
     }
 
     @Override
@@ -227,7 +228,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
         if (logCounter.shouldPrint()) {
             logger.warn("{} received an exception from channel {}",
-                    source.getName(), ctx.channel(), cause);
+                    source.getCachedSrcName(), ctx.channel(), cause);
         }
         if (ctx.channel() != null) {
             source.getAllChannels().remove(ctx.channel());
@@ -270,7 +271,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         // build InLong event.
         Event event = msgCodec.encEventPackage(source, channel);
         try {
-            source.getChannelProcessor().processEvent(event);
+            source.getCachedChProcessor().processEvent(event);
             source.fileMetricAddSuccStats(strBuff, msgCodec.getGroupId(), 
msgCodec.getStreamId(),
                     msgCodec.getTopicName(), msgCodec.getStrRemoteIP(), 
msgCodec.getMsgProcType(),
                     msgCodec.getDataTimeMs(), msgCodec.getMsgPkgTime(), 
msgCodec.getMsgCount(), 1,
@@ -370,7 +371,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         ProxyPackEvent packEvent = new ProxyPackEvent(inlongGroupId, 
inlongStreamId, events, callback);
         // put to channel
         try {
-            source.getChannelProcessor().processEvent(packEvent);
+            source.getCachedChProcessor().processEvent(packEvent);
             events.forEach(event -> {
                 source.addMetric(true, event.getBody().length, event);
                 
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
@@ -416,7 +417,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
             event.setTopic(topic);
             // put to channel
             try {
-                source.getChannelProcessor().processEvent(event);
+                source.getCachedChProcessor().processEvent(event);
                 source.addMetric(true, event.getBody().length, event);
                 
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
             } catch (Throwable ex) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
index 48962c2923..ce62d7ab24 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
@@ -67,13 +67,13 @@ public class SimpleHttpSource extends BaseSource implements 
Configurable {
 
     @Override
     public synchronized void startSource() {
-        logger.info("start " + this.getName());
+        logger.info("start " + this.getCachedSrcName());
         // build accept group
         this.acceptorGroup = new NioEventLoopGroup(maxAcceptThreads,
-                new DefaultThreadFactory(this.getName() + "-boss-group"));
+                new DefaultThreadFactory(this.getCachedSrcName() + 
"-boss-group"));
         // build worker group
         this.workerGroup = new NioEventLoopGroup(maxWorkerThreads,
-                new DefaultThreadFactory(this.getName() + "-worker-group"));
+                new DefaultThreadFactory(this.getCachedSrcName() + 
"-worker-group"));
         // init boostrap
         bootstrap = new ServerBootstrap();
         if (conLinger >= 0) {
@@ -97,12 +97,12 @@ public class SimpleHttpSource extends BaseSource implements 
Configurable {
             }
         } catch (Exception e) {
             logger.error("Source {} bind ({}:{}) error, program will exit! e = 
{}",
-                    this.getName(), srcHost, srcPort, e);
+                    this.getCachedSrcName(), srcHost, srcPort, e);
             System.exit(-1);
         }
         ConfigManager.getInstance().addSourceReportInfo(
                 srcHost, String.valueOf(srcPort), 
getProtocolName().toUpperCase());
-        logger.info("Source {} started at ({}:{})!", this.getName(), srcHost, 
srcPort);
+        logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(), 
srcHost, srcPort);
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 2860bdf8e1..9306128fa2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -74,13 +74,13 @@ public class SimpleTcpSource extends BaseSource implements 
Configurable {
 
     @Override
     public synchronized void startSource() {
-        logger.info("start " + this.getName());
+        logger.info("start " + this.getCachedSrcName());
         // build accept group
         this.acceptorGroup = EventLoopUtil.newEventLoopGroup(maxAcceptThreads, 
enableBusyWait,
-                new DefaultThreadFactory(this.getName() + "-boss-group"));
+                new DefaultThreadFactory(this.getCachedSrcName() + 
"-boss-group"));
         // build worker group
         this.workerGroup = EventLoopUtil.newEventLoopGroup(maxWorkerThreads, 
enableBusyWait,
-                new DefaultThreadFactory(this.getName() + "-worker-group"));
+                new DefaultThreadFactory(this.getCachedSrcName() + 
"-worker-group"));
         // init boostrap
         bootstrap = new ServerBootstrap();
         if (conLinger >= 0) {
@@ -106,12 +106,12 @@ public class SimpleTcpSource extends BaseSource 
implements Configurable {
             }
         } catch (Exception e) {
             logger.error("Source {} bind ({}:{}) error, program will exit! e = 
{}",
-                    this.getName(), srcHost, srcPort, e);
+                    this.getCachedSrcName(), srcHost, srcPort, e);
             System.exit(-1);
         }
         ConfigManager.getInstance().addSourceReportInfo(
                 srcHost, String.valueOf(srcPort), 
getProtocolName().toUpperCase());
-        logger.info("Source {} started at ({}:{})!", this.getName(), srcHost, 
srcPort);
+        logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(), 
srcHost, srcPort);
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index eb9dd70220..8b7df49122 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -50,7 +50,7 @@ public class SimpleUdpSource extends BaseSource implements 
Configurable {
     @Override
     public void startSource() {
         // setup Netty server
-        logger.info("start " + this.getName());
+        logger.info("start " + this.getCachedSrcName());
         bootstrap = new Bootstrap();
         bootstrap.channel(NioDatagramChannel.class);
         if (conLinger >= 0) {
@@ -69,12 +69,12 @@ public class SimpleUdpSource extends BaseSource implements 
Configurable {
             }
         } catch (Exception e) {
             logger.error("Source {} bind ({}:{}) error, program will exit! e = 
{}",
-                    this.getName(), srcHost, srcPort, e);
+                    this.getCachedSrcName(), srcHost, srcPort, e);
             System.exit(-1);
         }
         ConfigManager.getInstance().addSourceReportInfo(
                 srcHost, String.valueOf(srcPort), 
getProtocolName().toUpperCase());
-        logger.info("Source {} started at ({}:{})!", this.getName(), srcHost, 
srcPort);
+        logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(), 
srcHost, srcPort);
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index ec759c8c30..00359fe983 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -196,7 +196,8 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
             ctx.channel().close();
             if (logCounter.shouldPrint()) {
                 logger.warn("{} refuse to connect = {} , connections = {}, 
maxConnections = {}",
-                        source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
+                        source.getCachedSrcName(), ctx.channel(), 
source.getAllChannels().size(),
+                        source.getMaxConnections());
             }
             return;
         }
@@ -216,7 +217,7 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
         source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
         if (logCounter.shouldPrint()) {
             logger.warn("{} received an exception from channel {}",
-                    source.getName(), ctx.channel(), cause);
+                    source.getCachedSrcName(), ctx.channel(), cause);
         }
         if (cause instanceof IOException) {
             ctx.close();
@@ -351,7 +352,7 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
         eventHeaders.put(ConfigConstants.PKG_TIME_KEY, 
String.valueOf(pkgTime));
         Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
         try {
-            source.getChannelProcessor().processEvent(event);
+            source.getCachedChProcessor().processEvent(event);
             source.fileMetricAddSuccStats(strBuff, groupId, streamId, 
topicName, clientIp,
                     "b2b", dataTime, pkgTime, intMsgCnt, 1, 
event.getBody().length);
             source.addMetric(true, event.getBody().length, event);

Reply via email to