This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 783426eb68 [INLONG-8657][DataProxy] Cache Source, Sink name and
Channel object content (#8658)
783426eb68 is described below
commit 783426eb683380cfeb8c702a5af7848afe9ee05b
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Aug 8 14:28:27 2023 +0800
[INLONG-8657][DataProxy] Cache Source, Sink name and Channel object content
(#8658)
---
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 37 +++++++++++---------
.../sink/mq/MessageQueueZoneSinkContext.java | 2 +-
.../dataproxy/sink/mq/MessageQueueZoneWorker.java | 2 +-
.../apache/inlong/dataproxy/source/BaseSource.java | 39 ++++++++++++++--------
.../dataproxy/source/ServerMessageFactory.java | 2 +-
.../dataproxy/source/ServerMessageHandler.java | 13 ++++----
.../inlong/dataproxy/source/SimpleHttpSource.java | 10 +++---
.../inlong/dataproxy/source/SimpleTcpSource.java | 10 +++---
.../inlong/dataproxy/source/SimpleUdpSource.java | 6 ++--
.../source/httpMsg/HttpMessageHandler.java | 7 ++--
10 files changed, 75 insertions(+), 53 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index c6f42ef5bc..a9897641fe 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -59,7 +59,8 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
private final long MQ_CLUSTER_STATUS_CHECK_DUR_MS = 2000L;
-
+ private String cachedSinkName;
+ private Channel cachedMsgChannel;
private Context parentContext;
private MessageQueueZoneSinkContext context;
private final List<MessageQueueZoneWorker> workers = new ArrayList<>();
@@ -90,7 +91,8 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
*/
@Override
public void configure(Context context) {
- logger.info("{} start to configure, context:{}.", this.getName(),
context.toString());
+ this.cachedSinkName = getName();
+ logger.info("{} start to configure, context:{}.", this.cachedSinkName,
context.toString());
this.parentContext = context;
}
@@ -100,11 +102,12 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
@Override
public void start() {
if (getChannel() == null) {
- logger.error("{}'s channel is null", this.getName());
+ logger.error("{}'s channel is null", this.cachedSinkName);
}
+ cachedMsgChannel = getChannel();
try {
ConfigManager.getInstance().regMetaConfigChgCallback(this);
- this.context = new MessageQueueZoneSinkContext(this,
parentContext, getChannel());
+ this.context = new MessageQueueZoneSinkContext(this,
parentContext, cachedMsgChannel);
this.context.start();
this.dispatchManager = new BatchPackManager(this, parentContext);
this.scheduledPool = Executors.newScheduledThreadPool(2);
@@ -122,7 +125,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
this.zoneProducer.start();
// start configure change listener thread
this.configListener = new Thread(new ConfigChangeProcessor());
- this.configListener.setName(getName() + "-configure-listener");
+ this.configListener.setName(this.cachedSinkName +
"-configure-listener");
this.configListener.start();
// create worker
MessageQueueZoneWorker zoneWorker;
@@ -133,7 +136,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
this.workers.add(zoneWorker);
}
} catch (Exception e) {
- logger.error("{} start failure", this.getName(), e);
+ logger.error("{} start failure", this.cachedSinkName, e);
}
super.start();
}
@@ -159,7 +162,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
try {
worker.close();
} catch (Throwable e) {
- logger.error("{} stop Zone worker failure", this.getName(), e);
+ logger.error("{} stop Zone worker failure",
this.cachedSinkName, e);
}
}
this.context.close();
@@ -185,11 +188,10 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
}
}
this.dispatchManager.outputOvertimeData();
- Channel channel = getChannel();
- Transaction tx = channel.getTransaction();
+ Transaction tx = cachedMsgChannel.getTransaction();
tx.begin();
try {
- Event event = channel.take();
+ Event event = cachedMsgChannel.take();
// no data
if (event == null) {
tx.commit();
@@ -248,13 +250,13 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
} catch (Throwable t) {
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_FAILURE);
if (logCounter.shouldPrint()) {
- logger.error("{} process event failed!", this.getName(), t);
+ logger.error("{} process event failed!", this.cachedSinkName,
t);
}
try {
tx.rollback();
} catch (Throwable e) {
if (logCounter.shouldPrint()) {
- logger.error("{} channel take transaction rollback
exception", this.getName(), e);
+ logger.error("{} channel take transaction rollback
exception", this.cachedSinkName, e);
}
}
return Status.BACKOFF;
@@ -262,6 +264,11 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
tx.close();
}
}
+
+ public String getCachedSinkName() {
+ return cachedSinkName;
+ }
+
public boolean isMqClusterStarted() {
return mqClusterStarted;
}
@@ -325,13 +332,13 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
@Override
public void run() {
long lastCheckTime;
- logger.info("{} config-change processor start!", getName());
+ logger.info("{} config-change processor start!", cachedSinkName);
while (!isShutdown) {
reentrantLock.lock();
try {
condition.await();
} catch (InterruptedException e1) {
- logger.info("{} config-change processor meet interrupt,
break!", getName());
+ logger.info("{} config-change processor meet interrupt,
break!", cachedSinkName);
break;
} finally {
reentrantLock.unlock();
@@ -344,7 +351,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
zoneProducer.reloadMetaConfig();
} while (lastCheckTime != lastNotifyTime.get());
}
- logger.info("{} config-change processor exit!", getName());
+ logger.info("{} config-change processor exit!", cachedSinkName);
}
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 33168baf8c..495b62d325 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -55,7 +55,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
* Constructor
*/
public MessageQueueZoneSinkContext(MessageQueueZoneSink mqZoneSink,
Context context, Channel channel) {
- super(mqZoneSink.getName(), context, channel);
+ super(mqZoneSink.getCachedSinkName(), context, channel);
this.mqZoneSink = mqZoneSink;
// proxyClusterId
this.proxyClusterId =
CommonConfigHolder.getInstance().getClusterName();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
index 524c322db4..e12b1de015 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
@@ -44,7 +44,7 @@ public class MessageQueueZoneWorker extends Thread {
long fetchWaitMs, MessageQueueZoneProducer zoneProducer) {
super();
this.mqZoneSink = mqZoneSink;
- this.workerName = mqZoneSink.getName() + "-worker-" + workerIndex;
+ this.workerName = mqZoneSink.getCachedSinkName() + "-worker-" +
workerIndex;
this.fetchWaitMs = fetchWaitMs;
this.zoneProducer = zoneProducer;
this.status = LifecycleState.IDLE;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index e93c43910b..1fa617dc4d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -51,6 +51,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
+import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
@@ -78,6 +79,8 @@ public abstract class BaseSource
protected Context context;
// whether source reject service
protected volatile boolean isRejectService = false;
+ protected String cachedSrcName;
+ protected ChannelProcessor cachedChProcessor;
// source service host
protected String srcHost;
// source serviced port
@@ -133,7 +136,8 @@ public abstract class BaseSource
@Override
public void configure(Context context) {
- logger.info("{} start to configure context:{}.", this.getName(),
context.toString());
+ this.cachedSrcName = getName();
+ logger.info("{} start to configure context:{}.", this.cachedSrcName,
context.toString());
this.context = context;
this.srcHost = getHostIp(context);
this.srcPort = getHostPort(context);
@@ -246,9 +250,10 @@ public abstract class BaseSource
FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
}
super.start();
+ this.cachedChProcessor = getChannelProcessor();
// initial metric item set
this.metricItemSet = new DataProxyMetricItemSet(
- CommonConfigHolder.getInstance().getClusterName(), getName(),
String.valueOf(srcPort));
+ CommonConfigHolder.getInstance().getClusterName(),
this.cachedSrcName, String.valueOf(srcPort));
MetricRegister.register(metricItemSet);
// init monitor logic
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
@@ -258,25 +263,25 @@ public abstract class BaseSource
this.monitorIndex.start();
this.monitorStats = new MonitorStats(
CommonConfigHolder.getInstance().getFileMetricEventOutName()
- + AttrConstants.SEP_HASHTAG + this.getName(),
+ + AttrConstants.SEP_HASHTAG + this.cachedSrcName,
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
this.monitorStats.start();
}
startSource();
// register
- AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE,
this.getName(), this);
+ AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE,
this.cachedSrcName, this);
}
@Override
public synchronized void stop() {
- logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(),
this.getName());
+ logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(),
this.cachedSrcName);
// close channels
if (!allChannels.isEmpty()) {
try {
allChannels.close().awaitUninterruptibly();
} catch (Exception e) {
- logger.warn("Close {} netty channels throw exception",
this.getName(), e);
+ logger.warn("Close {} netty channels throw exception",
this.cachedSrcName, e);
} finally {
allChannels.clear();
}
@@ -286,7 +291,7 @@ public abstract class BaseSource
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
- logger.warn("Close {} channel future throw exception",
this.getName(), e);
+ logger.warn("Close {} channel future throw exception",
this.cachedSrcName, e);
}
}
// stop super class
@@ -307,7 +312,7 @@ public abstract class BaseSource
monitorStats.stop();
}
}
- logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(),
this.getName());
+ logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(),
this.cachedSrcName);
}
@Override
@@ -334,7 +339,7 @@ public abstract class BaseSource
}
}
logger.info("Source {} channel check, disconnects {} Illegal
channels, waist {} ms",
- getName(), cnt, (System.currentTimeMillis() - startTime));
+ this.cachedSrcName, cnt, (System.currentTimeMillis() -
startTime));
}
}
@@ -435,7 +440,7 @@ public abstract class BaseSource
return;
}
String tenMinsDt = DateTimeUtils.ms2yyyyMMddHHmmTenMins(dt);
- strBuff.append(getName()).append(AttrConstants.SEP_HASHTAG)
+ strBuff.append(this.cachedSrcName).append(AttrConstants.SEP_HASHTAG)
.append(groupId).append(AttrConstants.SEP_HASHTAG)
.append(streamId).append(AttrConstants.SEP_HASHTAG)
.append(topicName).append(AttrConstants.SEP_HASHTAG)
@@ -464,7 +469,7 @@ public abstract class BaseSource
public void addMetric(boolean result, long size, Event event) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
CommonConfigHolder.getInstance().getClusterName());
- dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, getName());
+ dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, this.cachedSrcName);
dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, getStrPort());
DataProxyMetricItem.fillInlongId(event, dimensions);
DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
@@ -486,7 +491,7 @@ public abstract class BaseSource
*/
public ChannelInitializer getChannelInitializerFactory() {
ChannelInitializer fac = null;
- logger.info(this.getName() + " load msgFactory=" + msgFactoryName);
+ logger.info(this.cachedSrcName + " load msgFactory=" + msgFactoryName);
try {
Class<? extends ChannelInitializer> clazz =
(Class<? extends ChannelInitializer>)
Class.forName(msgFactoryName);
@@ -495,7 +500,7 @@ public abstract class BaseSource
fac = (ChannelInitializer) ctor.newInstance(this);
} catch (Exception e) {
logger.error("{} start error, fail to construct
ChannelPipelineFactory with name {}",
- this.getName(), msgFactoryName, e);
+ this.cachedSrcName, msgFactoryName, e);
stop();
throw new FlumeException(e.getMessage());
}
@@ -531,6 +536,14 @@ public abstract class BaseSource
return isRejectService;
}
+ public String getCachedSrcName() {
+ return cachedSrcName;
+ }
+
+ public ChannelProcessor getCachedChProcessor() {
+ return cachedChProcessor;
+ }
+
/**
* getHostIp
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 7b35c27ed4..d68a8b4df2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -77,7 +77,7 @@ public class ServerMessageFactory extends
ChannelInitializer<SocketChannel> {
(ChannelInboundHandlerAdapter)
ctor.newInstance(source);
ch.pipeline().addLast("messageHandler", messageHandler);
} catch (Exception e) {
- LOG.error("{} newInstance {} failure!", source.getName(),
+ LOG.error("{} newInstance {} failure!",
source.getCachedSrcName(),
source.getMessageHandlerName(), e);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 7dd19dc157..984ddcb2c8 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -204,7 +204,8 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
ctx.channel().disconnect();
ctx.channel().close();
logger.warn("{} refuse to connect = {} , connections = {},
maxConnections = {}",
- source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
+ source.getCachedSrcName(), ctx.channel(),
source.getAllChannels().size(),
+ source.getMaxConnections());
return;
}
// add legal channel
@@ -212,7 +213,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
ctx.fireChannelActive();
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
logger.info("{} added new channel {}, current connections = {},
maxConnections = {}",
- source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
+ source.getCachedSrcName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
}
@Override
@@ -227,7 +228,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
if (logCounter.shouldPrint()) {
logger.warn("{} received an exception from channel {}",
- source.getName(), ctx.channel(), cause);
+ source.getCachedSrcName(), ctx.channel(), cause);
}
if (ctx.channel() != null) {
source.getAllChannels().remove(ctx.channel());
@@ -270,7 +271,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
// build InLong event.
Event event = msgCodec.encEventPackage(source, channel);
try {
- source.getChannelProcessor().processEvent(event);
+ source.getCachedChProcessor().processEvent(event);
source.fileMetricAddSuccStats(strBuff, msgCodec.getGroupId(),
msgCodec.getStreamId(),
msgCodec.getTopicName(), msgCodec.getStrRemoteIP(),
msgCodec.getMsgProcType(),
msgCodec.getDataTimeMs(), msgCodec.getMsgPkgTime(),
msgCodec.getMsgCount(), 1,
@@ -370,7 +371,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
ProxyPackEvent packEvent = new ProxyPackEvent(inlongGroupId,
inlongStreamId, events, callback);
// put to channel
try {
- source.getChannelProcessor().processEvent(packEvent);
+ source.getCachedChProcessor().processEvent(packEvent);
events.forEach(event -> {
source.addMetric(true, event.getBody().length, event);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
@@ -416,7 +417,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
event.setTopic(topic);
// put to channel
try {
- source.getChannelProcessor().processEvent(event);
+ source.getCachedChProcessor().processEvent(event);
source.addMetric(true, event.getBody().length, event);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
} catch (Throwable ex) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
index 48962c2923..ce62d7ab24 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
@@ -67,13 +67,13 @@ public class SimpleHttpSource extends BaseSource implements
Configurable {
@Override
public synchronized void startSource() {
- logger.info("start " + this.getName());
+ logger.info("start " + this.getCachedSrcName());
// build accept group
this.acceptorGroup = new NioEventLoopGroup(maxAcceptThreads,
- new DefaultThreadFactory(this.getName() + "-boss-group"));
+ new DefaultThreadFactory(this.getCachedSrcName() +
"-boss-group"));
// build worker group
this.workerGroup = new NioEventLoopGroup(maxWorkerThreads,
- new DefaultThreadFactory(this.getName() + "-worker-group"));
+ new DefaultThreadFactory(this.getCachedSrcName() +
"-worker-group"));
// init boostrap
bootstrap = new ServerBootstrap();
if (conLinger >= 0) {
@@ -97,12 +97,12 @@ public class SimpleHttpSource extends BaseSource implements
Configurable {
}
} catch (Exception e) {
logger.error("Source {} bind ({}:{}) error, program will exit! e =
{}",
- this.getName(), srcHost, srcPort, e);
+ this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
ConfigManager.getInstance().addSourceReportInfo(
srcHost, String.valueOf(srcPort),
getProtocolName().toUpperCase());
- logger.info("Source {} started at ({}:{})!", this.getName(), srcHost,
srcPort);
+ logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(),
srcHost, srcPort);
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 2860bdf8e1..9306128fa2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -74,13 +74,13 @@ public class SimpleTcpSource extends BaseSource implements
Configurable {
@Override
public synchronized void startSource() {
- logger.info("start " + this.getName());
+ logger.info("start " + this.getCachedSrcName());
// build accept group
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(maxAcceptThreads,
enableBusyWait,
- new DefaultThreadFactory(this.getName() + "-boss-group"));
+ new DefaultThreadFactory(this.getCachedSrcName() +
"-boss-group"));
// build worker group
this.workerGroup = EventLoopUtil.newEventLoopGroup(maxWorkerThreads,
enableBusyWait,
- new DefaultThreadFactory(this.getName() + "-worker-group"));
+ new DefaultThreadFactory(this.getCachedSrcName() +
"-worker-group"));
// init boostrap
bootstrap = new ServerBootstrap();
if (conLinger >= 0) {
@@ -106,12 +106,12 @@ public class SimpleTcpSource extends BaseSource
implements Configurable {
}
} catch (Exception e) {
logger.error("Source {} bind ({}:{}) error, program will exit! e =
{}",
- this.getName(), srcHost, srcPort, e);
+ this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
ConfigManager.getInstance().addSourceReportInfo(
srcHost, String.valueOf(srcPort),
getProtocolName().toUpperCase());
- logger.info("Source {} started at ({}:{})!", this.getName(), srcHost,
srcPort);
+ logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(),
srcHost, srcPort);
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index eb9dd70220..8b7df49122 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -50,7 +50,7 @@ public class SimpleUdpSource extends BaseSource implements
Configurable {
@Override
public void startSource() {
// setup Netty server
- logger.info("start " + this.getName());
+ logger.info("start " + this.getCachedSrcName());
bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
if (conLinger >= 0) {
@@ -69,12 +69,12 @@ public class SimpleUdpSource extends BaseSource implements
Configurable {
}
} catch (Exception e) {
logger.error("Source {} bind ({}:{}) error, program will exit! e =
{}",
- this.getName(), srcHost, srcPort, e);
+ this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
ConfigManager.getInstance().addSourceReportInfo(
srcHost, String.valueOf(srcPort),
getProtocolName().toUpperCase());
- logger.info("Source {} started at ({}:{})!", this.getName(), srcHost,
srcPort);
+ logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(),
srcHost, srcPort);
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index ec759c8c30..00359fe983 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -196,7 +196,8 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
ctx.channel().close();
if (logCounter.shouldPrint()) {
logger.warn("{} refuse to connect = {} , connections = {},
maxConnections = {}",
- source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
+ source.getCachedSrcName(), ctx.channel(),
source.getAllChannels().size(),
+ source.getMaxConnections());
}
return;
}
@@ -216,7 +217,7 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
if (logCounter.shouldPrint()) {
logger.warn("{} received an exception from channel {}",
- source.getName(), ctx.channel(), cause);
+ source.getCachedSrcName(), ctx.channel(), cause);
}
if (cause instanceof IOException) {
ctx.close();
@@ -351,7 +352,7 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
eventHeaders.put(ConfigConstants.PKG_TIME_KEY,
String.valueOf(pkgTime));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
try {
- source.getChannelProcessor().processEvent(event);
+ source.getCachedChProcessor().processEvent(event);
source.fileMetricAddSuccStats(strBuff, groupId, streamId,
topicName, clientIp,
"b2b", dataTime, pkgTime, intMsgCnt, 1,
event.getBody().length);
source.addMetric(true, event.getBody().length, event);