This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 55ed96c248 [INLONG-8212][DataProxy] Improve HTTP related message
handling (#8213)
55ed96c248 is described below
commit 55ed96c248f09f10de095750ec3f6a72ac3fc0a5
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Jun 12 09:47:08 2023 +0800
[INLONG-8212][DataProxy] Improve HTTP related message handling (#8213)
---
.../inlong/common/enums/DataProxyErrCode.java | 9 +-
.../inlong/dataproxy/config/ConfigManager.java | 27 +-
.../inlong/dataproxy/consts/HttpAttrConst.java | 38 +++
.../inlong/dataproxy/consts/StatConstants.java | 45 +--
.../dataproxy/heartbeat/HeartbeatManager.java | 2 +-
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 3 +-
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 10 +-
.../inlong/dataproxy/source2/BaseSource.java | 34 ++
.../dataproxy/source2/InLongMessageHandler.java | 30 +-
.../inlong/dataproxy/source2/SimpleHttpSource.java | 1 +
.../inlong/dataproxy/source2/SimpleTcpSource.java | 34 +-
.../source2/httpMsg/InLongHttpMsgHandler.java | 379 +++++++++++++--------
.../dataproxy/source2/v0msg/CodecBinMsg.java | 6 +-
.../dataproxy/source2/v0msg/CodecTextMsg.java | 2 +-
14 files changed, 378 insertions(+), 242 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index 263af3678e..5caeb2d875 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -26,11 +26,18 @@ public enum DataProxyErrCode {
SUCCESS(0, "Ok"),
- SINK_SERVICE_UNREADY(1, "Service not ready"),
+ SINK_SERVICE_UNREADY(1, "Service sink not ready"),
SERVICE_CLOSED(2, "Service closed"),
CONF_SERVICE_UNREADY(3, "Configure Service not ready"),
ILLEGAL_VISIT_IP(10, "Illegal visit ip"),
+ HTTP_DECODE_REQ_FAILURE(31, "Decode request failure"),
+ HTTP_UNSUPPORTED_METHOD(32, "Un-supported method"),
+ HTTP_REQ_URI_BLANK(33, "Request uri is blank"),
+ HTTP_DECODE_REQ_URI_FAILURE(34, "Decode uri failure"),
+ HTTP_UNSUPPORTED_SERVICE_URI(35, "Un-supported service uri"),
+ HTTP_UNSUPPORTED_CONTENT_TYPE(36, "Un-supported content type"),
+
FIELD_VALUE_NOT_EQUAL(95, "Field value not equal"),
UNCOMPRESS_DATA_ERROR(96, "Uncompress data error"),
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 2507419686..af626660ba 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -317,7 +317,7 @@ public class ConfigManager {
}
List<String> managerIpList =
CommonConfigHolder.getInstance().getManagerHosts();
if (managerIpList == null || managerIpList.size() == 0) {
- LOG.error("Found remote manager ip list are empty, can't quest
remote configure!");
+ LOG.error("Found manager ip list are empty, can't quest remote
configure!");
return;
}
int managerIpSize = managerIpList.size();
@@ -333,10 +333,11 @@ public class ConfigManager {
* reloadDataProxyConfig
*/
private boolean reloadDataProxyConfig(String clusterName, String
clusterTag, String host) {
+ String url = null;
HttpPost httpPost = null;
try {
- String url =
- "http://" + host + ConfigConstants.MANAGER_PATH +
ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH;
+ url = "http://" + host + ConfigConstants.MANAGER_PATH
+ + ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH;
httpPost = new HttpPost(url);
httpPost.addHeader(HttpHeaders.CONNECTION, "close");
httpPost.addHeader(HttpHeaders.AUTHORIZATION,
AuthUtils.genBasicAuth());
@@ -349,11 +350,12 @@ public class ConfigManager {
}
httpPost.setEntity(HttpUtils.getEntity(request));
// request with post
- LOG.info("Start to request {} to get config info with params
{}", url, request);
+ LOG.info("Start to request {} to get config info, with params
{}", url, request);
CloseableHttpResponse response = httpClient.execute(httpPost);
String returnStr = EntityUtils.toString(response.getEntity());
if (response.getStatusLine().getStatusCode() != 200) {
- LOG.info("Failed to request {}, the response is {}", url,
returnStr);
+ LOG.warn("Failed to request {}, with params {}, the
response is {}",
+ url, request, returnStr);
return false;
}
LOG.info("End to request {} to get config info:{}", url,
returnStr);
@@ -361,28 +363,33 @@ public class ConfigManager {
DataProxyConfigResponse proxyResponse =
gson.fromJson(returnStr,
DataProxyConfigResponse.class);
if (!proxyResponse.isResult()) {
- LOG.info("Fail to get config info from url:{}, error code
is {}", url, proxyResponse.getErrCode());
+ LOG.warn("Fail to get config from url {}, with params {},
error code is {}",
+ url, request, proxyResponse.getErrCode());
return false;
}
if (proxyResponse.getErrCode() !=
DataProxyConfigResponse.SUCC) {
- LOG.info("Get config info from url:{}, error code is {}",
url, proxyResponse.getErrCode());
+ if (proxyResponse.getErrCode() !=
DataProxyConfigResponse.NOUPDATE) {
+ LOG.warn("Get config failure from url:{}, with params
{}, error code is {}",
+ url, request, proxyResponse.getErrCode());
+ }
return true;
}
DataProxyCluster dataProxyCluster = proxyResponse.getData();
if (dataProxyCluster == null
|| dataProxyCluster.getCacheClusterSet() == null
||
dataProxyCluster.getCacheClusterSet().getCacheClusters().isEmpty()) {
- LOG.info("Get config info from url:{}, found cluster set
is empty!", url);
+ LOG.warn("Get config empty from url:{}, with params {},
return:{}, cluster is empty!",
+ url, request, returnStr);
return true;
}
// update meta configure
if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(),
returnStr)) {
ConfigManager.handshakeManagerOk.set(true);
- LOG.info("Get meta config info and set handshake status is
ok!");
+ LOG.info("Get config success from manager and updated, set
handshake status is ok!");
}
return true;
} catch (Throwable ex) {
- LOG.error("Request remote manager failure", ex);
+ LOG.error("Request manager {} failure, throw exception", url,
ex);
return false;
} finally {
if (httpPost != null) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java
new file mode 100644
index 0000000000..045696100b
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.consts;
+
+/**
+ * Http Attribute constants
+ */
+public class HttpAttrConst {
+
+ public static final String KEY_SRV_URL_HEARTBEAT = "/dataproxy/heartbeat";
+ public static final String KEY_SRV_URL_REPORT_MSG = "/dataproxy/message";
+ public static final String KEY_URL_FAVICON_ICON = "/favicon.ico";
+
+ public static final String KEY_GROUP_ID = "groupId";
+ public static final String KEY_STREAM_ID = "streamId";
+ public static final String KEY_BODY = "body";
+ public static final String KEY_DATA_TIME = "dt";
+ public static final String KEY_MESSAGE_COUNT = "cnt";
+ public static final String KEY_CHARSET = "charset";
+ public static final String VAL_DEF_CHARSET = "UTF-8";
+ public static final String RET_CNT_TYPE = "application/json;charset=utf-8";
+
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index c445cadd31..1d262a8112 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -19,15 +19,30 @@ package org.apache.inlong.dataproxy.consts;
public class StatConstants {
- public static final java.lang.String EVENT_SERVICE_CLOSED =
"source.srvclosed";
- public static final java.lang.String EVENT_SERVICE_UNREADY =
"sink.unready";
- public static final java.lang.String EVENT_VISITIP_ILLEGAL =
"links.illegal";
- public static final java.lang.String EVENT_NOTOPIC = "config.notopic";
+ public static final java.lang.String EVENT_SERVICE_CLOSED =
"service.closed";
+ public static final java.lang.String EVENT_SERVICE_SINK_UNREADY =
"service.sink.unready";
+ // visit
+ public static final java.lang.String EVENT_VISIT_ILLEGAL = "visit.illegal";
+ public static final java.lang.String EVENT_VISIT_OVERMAX = "visit.overmax";
+ public static final java.lang.String EVENT_VISIT_LINKIN = "visit.linkin";
+ public static final java.lang.String EVENT_VISIT_LINKOUT = "visit.linkout";
+ public static final java.lang.String EVENT_VISIT_EXCEPTION =
"visit.exception";
+ // configure
+ public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING =
"config.topic.missing";
// source
- public static final java.lang.String EVENT_LINKS_OVERMAX = "links.overmax";
- public static final java.lang.String EVENT_LINKS_IN = "links.linkin";
- public static final java.lang.String EVENT_LINKS_OUT = "links.linkout";
- public static final java.lang.String EVENT_LINKS_EXCEPTION =
"links.exception";
+ public static final java.lang.String EVENT_MSG_DECODE_FAIL =
"msg.decode.failure";
+ public static final java.lang.String EVENT_MSG_METHOD_INVALID =
"msg.method.invalid";
+ public static final java.lang.String EVENT_MSG_PATH_INVALID =
"msg.path.invalid";
+ public static final java.lang.String EVENT_MSG_CONTYPE_INVALID =
"msg.content.invalid";
+ public static final java.lang.String EVENT_MSG_GROUPID_MISSING =
"msg.groupid.missing";
+ public static final java.lang.String EVENT_MSG_STREAMID_MISSING =
"msg.streamid.missing";
+ public static final java.lang.String EVENT_MSG_BODY_MISSING =
"msg.body.missing";
+ public static final java.lang.String EVENT_MSG_BODY_BLANK =
"msg.body.blank";
+ public static final java.lang.String EVENT_MSG_BODY_OVERMAX =
"msg.body.overmax";
+ public static final java.lang.String EVENT_MSG_HB_SUCCESS =
"msg.hb.success";
+ public static final java.lang.String EVENT_MSG_POST_SUCCESS =
"msg.post.success";
+ public static final java.lang.String EVENT_MSG_POST_FAILURE =
"msg.post.failure";
+
public static final java.lang.String EVENT_EMPTY = "socketmsg.empty";
public static final java.lang.String EVENT_OVERMAXLEN =
"socketmsg.overmaxlen";
public static final java.lang.String EVENT_NOTEQUALLEN =
"socketmsg.notequallen";
@@ -46,20 +61,6 @@ public class StatConstants {
public static final java.lang.String EVENT_POST_SUCCESS =
"socketmsg.success";
public static final java.lang.String EVENT_POST_DROPPED =
"socketmsg.dropped";
// http
- public static final java.lang.String EVENT_HTTP_DECFAIL =
"httpmsg.decfailure";
- public static final java.lang.String EVENT_HTTP_INVALIDMETHOD =
"httpmsg.invmethod";
- public static final java.lang.String EVENT_HTTP_BLANKURI =
"httpmsg.blankuri";
- public static final java.lang.String EVENT_HTTP_URIDECFAIL =
"httpmsg.decurifail";
- public static final java.lang.String EVENT_HTTP_INVALIDURI =
"httpmsg.invuri";
- public static final java.lang.String EVENT_HTTP_ILLEGAL_VISIT =
"httpmsg.illegal";
- public static final java.lang.String EVENT_HTTP_HB_SUCCESS =
"httphb.success";
- public static final java.lang.String EVENT_HTTP_WITHOUTGROUPID =
"httpmsg.wogroupid";
- public static final java.lang.String EVENT_HTTP_WITHOUTSTREAMID =
"httpmsg.wostreamid";
- public static final java.lang.String EVENT_HTTP_NOBODY = "httpmsg.nobody";
- public static final java.lang.String EVENT_HTTP_EMPTYBODY =
"httpmsg.emptybody";
- public static final java.lang.String EVENT_HTTP_BODYOVERMAXLEN =
"httpmsg.bodyovermax";
- public static final java.lang.String EVENT_HTTP_POST_SUCCESS =
"httpmsg.success";
- public static final java.lang.String EVENT_HTTP_POST_DROPPED =
"httpmsg.dropped";
public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 26407038ee..80ef43e508 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -117,7 +117,7 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
return true;
}
} catch (Exception ex) {
- log.error("reportHeartbeat failed for url {}", url, ex);
+ log.error("reportHeartbeat failed for url {}, exception message is
{}", url, ex.getMessage());
}
return false;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index c4eddac7c5..5fb13fb7a3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -107,8 +107,7 @@ public class AuditUtils {
*/
public static long getLogTime(Event event) {
if (event != null) {
- Map<String, String> headers = event.getHeaders();
- return getLogTime(headers);
+ return getLogTime(event.getHeaders());
}
return System.currentTimeMillis();
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 54da8c283b..0f6b4fcce1 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -106,16 +106,16 @@ public class TubeHandler implements MessageQueueHandler {
}
@Override
- public void publishTopic(Set<String> topicSet) {
- if (this.producer == null || topicSet == null || topicSet.isEmpty()) {
+ public void publishTopic(Set<String> newTopicSet) {
+ if (this.producer == null || newTopicSet == null ||
newTopicSet.isEmpty()) {
return;
}
Set<String> published;
try {
- published = producer.publish(topicSet);
- topicSet.addAll(published);
+ published = producer.publish(newTopicSet);
+ this.topicSet.addAll(newTopicSet);
LOG.info("Publish topics to {}, need publish are {}, published are
{}",
- this.clusterName, topicSet, published);
+ this.clusterName, newTopicSet, published);
} catch (Throwable e) {
LOG.warn("Publish topics to {} failure", this.clusterName, e);
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index 356561ff20..fd9ae39caf 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -23,16 +23,20 @@ import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source2.httpMsg.InLongHttpMsgHandler;
+import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
@@ -52,6 +56,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
/**
@@ -61,6 +66,7 @@ public abstract class BaseSource
extends
AbstractSource
implements
+ ConfigUpdateCallback,
ProxyServiceMBean,
EventDrivenSource,
Configurable {
@@ -282,6 +288,34 @@ public abstract class BaseSource
logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(),
this.getName());
}
+ @Override
+ public void update() {
+ // check current all links
+ if (ConfigManager.getInstance().needChkIllegalIP()) {
+ int cnt = 0;
+ Channel channel;
+ String strRemoteIP;
+ long startTime = System.currentTimeMillis();
+ Iterator<Channel> iterator = allChannels.iterator();
+ while (iterator.hasNext()) {
+ channel = iterator.next();
+ strRemoteIP = AddressUtils.getChannelRemoteIP(channel);
+ if (strRemoteIP == null) {
+ continue;
+ }
+ if (ConfigManager.getInstance().isIllegalIP(strRemoteIP)) {
+ channel.disconnect();
+ channel.close();
+ allChannels.remove(channel);
+ cnt++;
+ logger.error(strRemoteIP + " is Illegal IP, so disconnect
it !");
+ }
+ }
+ logger.info("Source {} channel check, disconnects {} Illegal
channels, waist {} ms",
+ getName(), cnt, (System.currentTimeMillis() - startTime));
+ }
+ }
+
/**
* get metricItemSet
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index d030eeb1a8..0594a8682b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -193,31 +193,31 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // check max allowed connection count
- if (source.getAllChannels().size() >= source.getMaxConnections()) {
- source.fileMetricEventInc(StatConstants.EVENT_LINKS_OVERMAX);
- ctx.channel().disconnect();
- ctx.channel().close();
- logger.warn("{} refuse to connect = {} , connections = {},
maxConnections = {}",
- source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
- return;
- }
// check illegal ip
if (ConfigManager.getInstance().needChkIllegalIP()) {
String strRemoteIp =
AddressUtils.getChannelRemoteIP(ctx.channel());
if (strRemoteIp != null
&& ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
- source.fileMetricEventInc(StatConstants.EVENT_VISITIP_ILLEGAL);
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
ctx.channel().disconnect();
ctx.channel().close();
logger.error(strRemoteIp + " is Illegal IP, so refuse it !");
return;
}
}
+ // check max allowed connection count
+ if (source.getAllChannels().size() >= source.getMaxConnections()) {
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+ ctx.channel().disconnect();
+ ctx.channel().close();
+ logger.warn("{} refuse to connect = {} , connections = {},
maxConnections = {}",
+ source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
+ return;
+ }
// add legal channel
source.getAllChannels().add(ctx.channel());
ctx.fireChannelActive();
- source.fileMetricEventInc(StatConstants.EVENT_LINKS_IN);
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
logger.info("{} added new channel {}, current connections = {},
maxConnections = {}",
source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
}
@@ -227,11 +227,12 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
logger.error("{} channel {} inactive", source.getName(),
ctx.channel());
ctx.fireChannelInactive();
source.getAllChannels().remove(ctx.channel());
- source.fileMetricEventInc(StatConstants.EVENT_LINKS_OUT);
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
logger.error("{} channel {} throw exception", source.getName(),
ctx.channel(), cause);
ctx.fireExceptionCaught(cause);
if (ctx.channel() != null) {
@@ -242,7 +243,6 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
//
}
source.getAllChannels().remove(ctx.channel());
- source.fileMetricEventInc(StatConstants.EVENT_LINKS_EXCEPTION);
}
ctx.close();
}
@@ -263,7 +263,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
}
// check if the node is linked to the Manager.
if (!ConfigManager.getInstance().isMqClusterReady()) {
- source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
msgCodec.setFailureInfo(DataProxyErrCode.SINK_SERVICE_UNREADY);
responseV0Msg(channel, msgCodec, strBuff);
return;
@@ -418,7 +418,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
topic = source.getDefTopic();
} else {
- source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
+
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
source.addMetric(false, event.getBody().length, event);
this.responsePackage(ctx,
ProxySdk.ResultCode.ERR_ID_ERROR, packObject);
return;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
index 73c96dd7b7..74cc4435c0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
@@ -47,6 +47,7 @@ public class SimpleHttpSource extends BaseSource implements
Configurable {
public SimpleHttpSource() {
super();
+ ConfigManager.getInstance().regIPVisitConfigChgCallback(this);
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
index 6526bba84e..acda3cf4f7 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
@@ -18,15 +18,12 @@
package org.apache.inlong.dataproxy.source2;
import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.apache.inlong.dataproxy.utils.EventLoopUtil;
import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flume.Context;
@@ -35,12 +32,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
-import java.util.Iterator;
/**
* Simple tcp source
*/
-public class SimpleTcpSource extends BaseSource implements Configurable,
ConfigUpdateCallback {
+public class SimpleTcpSource extends BaseSource implements Configurable {
private static final Logger logger =
LoggerFactory.getLogger(SimpleTcpSource.class);
@@ -123,32 +119,4 @@ public class SimpleTcpSource extends BaseSource implements
Configurable, ConfigU
return SourceConstants.SRC_PROTOCOL_TYPE_TCP;
}
- @Override
- public void update() {
- // check current all links
- if (ConfigManager.getInstance().needChkIllegalIP()) {
- int cnt = 0;
- Channel channel;
- String strRemoteIP;
- long startTime = System.currentTimeMillis();
- Iterator<Channel> iterator = allChannels.iterator();
- while (iterator.hasNext()) {
- channel = iterator.next();
- strRemoteIP = AddressUtils.getChannelRemoteIP(channel);
- if (strRemoteIP == null) {
- continue;
- }
- if (ConfigManager.getInstance().isIllegalIP(strRemoteIP)) {
- channel.disconnect();
- channel.close();
- allChannels.remove(channel);
- cnt++;
- logger.error(strRemoteIP + " is Illegal IP, so disconnect
it !");
- }
- }
- logger.info("Source {} channel check, disconnects {} Illegal
channels, waist {} ms",
- getName(), cnt, (System.currentTimeMillis() - startTime));
- }
- }
-
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index ce33c75d45..4f636f8173 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -21,11 +21,10 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.consts.HttpAttrConst;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.source2.BaseSource;
import org.apache.inlong.dataproxy.utils.AddressUtils;
@@ -42,23 +41,25 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.CharEncoding;
+import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
+import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static io.netty.handler.codec.http.HttpUtil.is100ContinueExpected;
@@ -68,13 +69,9 @@ import static
io.netty.handler.codec.http.HttpUtil.is100ContinueExpected;
*/
public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRequest> {
- private static final String hbSrvUrl = "/dataproxy/heartbeat";
- private static final String msgSrvUrl = "/dataproxy/message";
-
private static final Logger logger =
LoggerFactory.getLogger(InLongHttpMsgHandler.class);
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
-
private final BaseSource source;
/**
@@ -88,120 +85,203 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest
req) throws Exception {
- // check request decode result
- if (!req.decoderResult().isSuccess()) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_DECFAIL);
- sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Decode message
failure!");
- return;
- }
- // check request method
- if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST)
{
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_INVALIDMETHOD);
- sendErrorMsg(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, "Only
support Get and Post methods");
- return;
- }
// process 100-continue request
if (is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.CONTINUE));
}
- // get requested service
- String reqUri = req.uri();
- if (StringUtils.isBlank(reqUri)) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_BLANKURI);
- sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Uri is blank!");
- return;
- }
- try {
- reqUri = URLDecoder.decode(reqUri, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- try {
- reqUri = URLDecoder.decode(reqUri, "ISO-8859-1");
- } catch (UnsupportedEncodingException e1) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_URIDECFAIL);
- sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Decode uri
failure!");
- return;
- }
- }
- // check requested service url
- if (!reqUri.startsWith(hbSrvUrl) || !reqUri.startsWith(msgSrvUrl)) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_INVALIDURI);
- sendErrorMsg(ctx, HttpResponseStatus.NOT_IMPLEMENTED, "Not
supported uri!");
- return;
- }
// get current time and clientIP
- long msgRcvTime = System.currentTimeMillis();
- String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
- // check illegal ip
- if (ConfigManager.getInstance().needChkIllegalIP()
- && ConfigManager.getInstance().isIllegalIP(clientIp)) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_ILLEGAL_VISIT);
- sendResponse(ctx, DataProxyErrCode.ILLEGAL_VISIT_IP, true);
+ final long msgRcvTime = System.currentTimeMillis();
+ final String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
+ // check request decode result
+ if (!req.decoderResult().isSuccess()) {
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_DECODE_FAIL);
+ sendErrorMsg(ctx, DataProxyErrCode.HTTP_DECODE_REQ_FAILURE);
return;
}
// check service status.
if (source.isRejectService()) {
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
- sendResponse(ctx, DataProxyErrCode.SERVICE_CLOSED, true);
+ sendErrorMsg(ctx, DataProxyErrCode.SERVICE_CLOSED);
return;
}
// check sink service status
if (!ConfigManager.getInstance().isMqClusterReady()) {
- source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
- sendResponse(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY, true);
+
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+ sendErrorMsg(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY);
return;
}
+ // check request method
+ if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST)
{
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_METHOD_INVALID);
+ sendErrorMsg(ctx, DataProxyErrCode.HTTP_UNSUPPORTED_METHOD,
+ "Only support [" + HttpMethod.GET.name() + ", "
+ + HttpMethod.POST.name() + "] methods");
+ return;
+ }
+ // parse request uri
+ QueryStringDecoder uriDecoder =
+ new QueryStringDecoder(req.uri(),
Charsets.toCharset(CharEncoding.UTF_8));
+ // check requested service url
+ if (!HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())
+ &&
!HttpAttrConst.KEY_SRV_URL_REPORT_MSG.equals(uriDecoder.path())) {
+ if (!HttpAttrConst.KEY_URL_FAVICON_ICON.equals(uriDecoder.path()))
{
+
source.fileMetricEventInc(StatConstants.EVENT_MSG_PATH_INVALID);
+ sendErrorMsg(ctx,
DataProxyErrCode.HTTP_UNSUPPORTED_SERVICE_URI,
+ "Only support [" + HttpAttrConst.KEY_SRV_URL_HEARTBEAT
+ ", "
+ + HttpAttrConst.KEY_SRV_URL_REPORT_MSG + "]
paths!");
+ }
+ return;
+ }
+ // get connection status
+ boolean closeConnection = isCloseConnection(req);
// process hb service
- if (reqUri.startsWith(hbSrvUrl)) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_HB_SUCCESS);
- sendResponse(ctx, DataProxyErrCode.SUCCESS, checkClose(req));
+ if (HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())) {
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_HB_SUCCESS);
+ sendResponse(ctx, closeConnection);
return;
}
+ // get request attributes
+ final Map<String, String> reqAttrs = new HashMap<>();
+ getAttrsFromDecoder(uriDecoder, reqAttrs);
+ if (req.method() == HttpMethod.POST) {
+ // check and get content value
+ String cntLengthStr =
req.headers().get(HttpHeaderNames.CONTENT_LENGTH);
+ if (StringUtils.isNotBlank(cntLengthStr) &&
NumberUtils.toInt(cntLengthStr, 0) > 0) {
+ String cntType =
req.headers().get(HttpHeaderNames.CONTENT_TYPE);
+ if (StringUtils.isNotBlank(cntType)) {
+ cntType = cntType.trim();
+ if (!cntType.equalsIgnoreCase(
+
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString())) {
+
source.fileMetricEventInc(StatConstants.EVENT_MSG_CONTYPE_INVALID);
+ sendErrorMsg(ctx,
DataProxyErrCode.HTTP_UNSUPPORTED_CONTENT_TYPE,
+ "Only support [" +
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED
+ + "] content type!");
+ return;
+ }
+ String cntStr =
req.content().toString(Charsets.toCharset(CharEncoding.UTF_8));
+ QueryStringDecoder cntDecoder = new
QueryStringDecoder(cntStr, false);
+ getAttrsFromDecoder(cntDecoder, reqAttrs);
+ }
+ }
+ }
// process message request
- processMessage(ctx, req, msgRcvTime, clientIp);
+ processMessage(ctx, reqAttrs, msgRcvTime, clientIp, closeConnection);
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
}
- private boolean processMessage(ChannelHandlerContext ctx, FullHttpRequest
req,
- long msgRcvTime, String clientIp) throws Exception {
- // get and check groupId
- HttpHeaders headers = req.headers();
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
+ // check illegal ip
+ if (ConfigManager.getInstance().needChkIllegalIP()) {
+ String strRemoteIp =
AddressUtils.getChannelRemoteIP(ctx.channel());
+ if (strRemoteIp != null
+ && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
+ ctx.channel().disconnect();
+ ctx.channel().close();
+ if (logCounter.shouldPrint()) {
+ logger.error(strRemoteIp + " is Illegal IP, so refuse it
!");
+ }
+ return;
+ }
+ }
+ // check max allowed connection count
+ if (source.getAllChannels().size() >= source.getMaxConnections()) {
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+ ctx.channel().disconnect();
+ ctx.channel().close();
+ if (logCounter.shouldPrint()) {
+ logger.warn("{} refuse to connect = {} , connections = {},
maxConnections = {}",
+ source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
+ }
+ return;
+ }
+ // add legal channel
+ source.getAllChannels().add(ctx.channel());
+ ctx.fireChannelActive();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
+ ctx.fireChannelInactive();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
+ if (cause instanceof IOException) {
+ if (logCounter.shouldPrint()) {
+ logger.warn("{} received an IOException from channel {}",
+ source.getName(), ctx.channel(), cause);
+ }
+ ctx.close();
+ } else {
+ sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
+ "Process message failure: " + cause.getMessage());
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
+ if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
+ ctx.close();
+ }
+ }
+
+ /**
+ * Process http report message
+ *
+ * @param ctx the handler context
+ * @param reqAttrs the attributes
+ * @param msgRcvTime the message received time
+ * @param clientIp the report ip
+ * @param isCloseCon whether close connection
+ *
+ * @return whether process success
+ */
+ private boolean processMessage(ChannelHandlerContext ctx, Map<String,
String> reqAttrs,
+ long msgRcvTime, String clientIp, boolean isCloseCon) throws
Exception {
StringBuilder strBuff = new StringBuilder(512);
- String groupId = headers.get(AttributeConstants.GROUP_ID);
- if (StringUtils.isEmpty(groupId)) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_WITHOUTGROUPID);
+ String groupId = reqAttrs.get(HttpAttrConst.KEY_GROUP_ID);
+ if (StringUtils.isBlank(groupId)) {
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_GROUPID_MISSING);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
- strBuff.append("Field
").append(AttributeConstants.GROUP_ID)
+ strBuff.append("Field ").append(HttpAttrConst.KEY_GROUP_ID)
.append(" must exist and not blank!").toString(),
- checkClose(req));
+ isCloseCon);
return false;
}
// get and check streamId
- String streamId = headers.get(AttributeConstants.STREAM_ID);
- if (StringUtils.isEmpty(streamId)) {
-
source.fileMetricEventInc(StatConstants.EVENT_HTTP_WITHOUTSTREAMID);
+ String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
+ if (StringUtils.isBlank(streamId)) {
+
source.fileMetricEventInc(StatConstants.EVENT_MSG_STREAMID_MISSING);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
- strBuff.append("Field
").append(AttributeConstants.STREAM_ID)
+ strBuff.append("Field
").append(HttpAttrConst.KEY_STREAM_ID)
.append(" must exist and not blank!").toString(),
- checkClose(req));
+ isCloseCon);
return false;
}
// get and check topicName
String topicName = ConfigManager.getInstance().getTopicName(groupId,
streamId);
if (StringUtils.isBlank(topicName)) {
- if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
- source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
- sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
- strBuff.append("Topic is null for
").append(AttributeConstants.GROUP_ID)
- .append("(").append(groupId).append("),")
- .append(AttributeConstants.STREAM_ID)
-
.append("(,").append(streamId).append(")").toString(),
- checkClose(req));
- return false;
- }
- topicName = source.getDefTopic();
+
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
+ strBuff.append("Topic not configured for
").append(HttpAttrConst.KEY_STREAM_ID)
+ .append("(").append(groupId).append("),")
+ .append(HttpAttrConst.KEY_STREAM_ID)
+
.append("(,").append(streamId).append(")").toString(),
+ isCloseCon);
+ return false;
}
// get and check dt
long dataTime = msgRcvTime;
- String dt = headers.get(AttributeConstants.DATA_TIME);
+ String dt = reqAttrs.get(HttpAttrConst.KEY_DATA_TIME);
if (StringUtils.isNotEmpty(dt)) {
try {
dataTime = Long.parseLong(dt);
@@ -209,45 +289,40 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
//
}
}
- // get char set
- String charset = headers.get(AttrConstants.CHARSET);
- if (StringUtils.isBlank(charset)) {
- charset = AttrConstants.CHARSET;
- }
// get and check body
- String body = headers.get(AttrConstants.BODY);
+ String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
if (StringUtils.isBlank(body)) {
if (body == null) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_NOBODY);
+
source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_MISSING);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
- strBuff.append("Field ").append(AttrConstants.BODY)
+ strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is not exist!").toString(),
- checkClose(req));
+ isCloseCon);
} else {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_EMPTYBODY);
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_BLANK);
sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
- strBuff.append("Field ").append(AttrConstants.BODY)
+ strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is Blank!").toString(),
- checkClose(req));
+ isCloseCon);
}
return false;
}
if (body.length() > source.getMaxMsgLength()) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_BODYOVERMAXLEN);
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_OVERMAX);
sendResponse(ctx,
DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
- strBuff.append("Error msg, the body
length(").append(body.length())
+ strBuff.append("Error msg, the
").append(HttpAttrConst.KEY_BODY)
+ .append(" length(").append(body.length())
.append(") is bigger than allowed length(")
.append(source.getMaxMsgLength()).append(")").toString(),
- checkClose(req));
+ isCloseCon);
return false;
}
// get message count
- String strMsgCount = headers.get(AttributeConstants.MESSAGE_COUNT);
- int intMsgCnt = NumberUtils.toInt(strMsgCount, 1);
- strMsgCount = String.valueOf(intMsgCnt);
+ int intMsgCnt =
NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
+ String strMsgCount = String.valueOf(intMsgCnt);
// build message attributes
InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed());
- strBuff.append("&groupId=").append(groupId)
+ strBuff.append("groupId=").append(groupId)
.append("&streamId=").append(streamId)
.append("&dt=").append(dataTime)
.append("&NodeIP=").append(clientIp)
@@ -255,7 +330,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
.append("&rt=").append(msgRcvTime)
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
- inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset));
+ inLongMsg.addMsg(strBuff.toString(),
body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
byte[] inlongMsgData = inLongMsg.buildArray();
inLongMsg.reset();
strBuff.delete(0, strBuff.length());
@@ -270,7 +345,6 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
InLongMsgVer.INLONG_V0.getName());
eventHeaders.put(AttributeConstants.RCV_TIME,
String.valueOf(msgRcvTime));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
- String msgProcType = "b2b";
// build metric data item
dataTime = dataTime / 1000 / 60 / 10;
dataTime = dataTime * 1000 * 60 * 10;
@@ -278,77 +352,84 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
.append(AttrConstants.SEP_HASHTAG).append(topicName)
.append(AttrConstants.SEP_HASHTAG).append(streamId)
.append(AttrConstants.SEP_HASHTAG).append(clientIp)
-
.append(AttrConstants.SEP_HASHTAG).append(NetworkUtils.getLocalIp())
- .append(AttrConstants.SEP_HASHTAG).append(msgProcType)
+ .append(AttrConstants.SEP_HASHTAG).append(source.getSrcHost())
+ .append(AttrConstants.SEP_HASHTAG).append("b2b")
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(dataTime))
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
try {
source.getChannelProcessor().processEvent(event);
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_POST_SUCCESS);
- source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1,
body.length(), 0);
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_SUCCESS);
+ source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1,
event.getBody().length, 0);
strBuff.delete(0, strBuff.length());
source.addMetric(true, event.getBody().length, event);
- sendResponse(ctx, DataProxyErrCode.SUCCESS, false);
+ sendResponse(ctx, isCloseCon);
return true;
- } catch (ChannelException ex) {
- source.fileMetricEventInc(StatConstants.EVENT_HTTP_POST_DROPPED);
+ } catch (Throwable ex) {
+ source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_FAILURE);
source.fileMetricRecordAdd(strBuff.toString(), 0, 0, 0, intMsgCnt);
source.addMetric(false, event.getBody().length, event);
strBuff.delete(0, strBuff.length());
- sendResponse(ctx, DataProxyErrCode.UNKNOWN_ERROR.getErrCode(),
- strBuff.append("Put event to channel failure:
").append(ex.getMessage())
- .toString(),
- false);
+ sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
+ strBuff.append("Put event to channel failure:
").append(ex.getMessage()).toString());
if (logCounter.shouldPrint()) {
- logger.error("Error write event to channel, data will
discard.", ex);
+ logger.error("Error writing HTTP event to channel failure.",
ex);
}
return false;
}
}
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.flush();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
- logger.error("Http process client={} error, cause:{}, msg:{}",
- cause, clientIp, cause.getMessage());
- sendErrorMsg(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR,
- "Process message failure: " + cause.getMessage());
- ctx.close();
+ /**
+ * Get attributes from decoder
+ *
+ * @param decoder the decode object
+ * @param reqAttrs the attributes
+ */
+ private void getAttrsFromDecoder(QueryStringDecoder decoder, Map<String,
String> reqAttrs) {
+ for (Map.Entry<String, List<String>> attr :
decoder.parameters().entrySet()) {
+ if (attr == null
+ || attr.getKey() == null
+ || attr.getValue() == null
+ || attr.getValue().isEmpty()) {
+ continue;
+ }
+ reqAttrs.put(attr.getKey(), attr.getValue().get(0));
+ }
}
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
- if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
- ctx.close();
+ private boolean isCloseConnection(FullHttpRequest req) {
+ String connStatus = req.headers().get(HttpHeaderNames.CONNECTION);
+ if (connStatus == null) {
+ return false;
}
+ connStatus = connStatus.trim();
+ return connStatus.equalsIgnoreCase(HttpHeaderValues.CLOSE.toString());
}
- private boolean checkClose(FullHttpRequest req) {
- String connStatus = req.headers().get("Connection");
- return !StringUtils.isBlank(connStatus) &&
"close".equalsIgnoreCase(connStatus);
+ private void sendErrorMsg(ChannelHandlerContext ctx, DataProxyErrCode
errCodeObj) {
+ sendResponse(ctx, errCodeObj.getErrCode(), errCodeObj.getErrMsg(),
true);
}
- private void sendErrorMsg(ChannelHandlerContext ctx, HttpResponseStatus
status, String errMsg) {
- FullHttpResponse response = new
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
- Unpooled.copiedBuffer("Failure: " + status + ", "
- + errMsg + "\r\n", CharsetUtil.UTF_8));
- response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;
charset=UTF-8");
- ctx.writeAndFlush(response).addListener(new SendResultListener(true));
+ private void sendErrorMsg(ChannelHandlerContext ctx, DataProxyErrCode
errCodeObj, String errMsg) {
+ sendResponse(ctx, errCodeObj.getErrCode(), errMsg, true);
}
- private void sendResponse(ChannelHandlerContext ctx, DataProxyErrCode
errCodeObj, boolean isClose) {
- sendResponse(ctx, errCodeObj.getErrCode(), errCodeObj.getErrMsg(),
isClose);
-
+ private void sendResponse(ChannelHandlerContext ctx, boolean isClose) {
+ sendResponse(ctx, DataProxyErrCode.SUCCESS.getErrCode(),
DataProxyErrCode.SUCCESS.getErrMsg(), isClose);
}
private void sendResponse(ChannelHandlerContext ctx, int errCode, String
errMsg, boolean isClose) {
+ if (ctx == null || ctx.channel() == null) {
+ return;
+ }
+ if (!ctx.channel().isWritable()) {
+
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+ if (logCounter.shouldPrint()) {
+ logger.warn("Send msg but channel full, channel={}",
ctx.channel());
+ }
+ return;
+ }
FullHttpResponse response = new
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- response.headers().set(HttpHeaderNames.CONTENT_TYPE,
"application/json;charset=utf-8");
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE,
HttpAttrConst.RET_CNT_TYPE);
StringBuilder builder =
new StringBuilder().append("{\"code\":\"").append(errCode)
.append("\",\"msg\":\"").append(errMsg).append("\"}");
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index b455817554..027e33e74f 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -268,7 +268,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
if (StringUtils.isBlank(confGroupId)) {
if (configManager.isGroupIdNumConfigEmpty()) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
this.errMsg = "GroupId-Mapping configuration is null";
} else {
@@ -300,7 +300,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
confStreamId =
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
if (StringUtils.isBlank(confStreamId)) {
if (configManager.isStreamIdNumConfigEmpty()) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
this.errMsg = "StreamId-Mapping configuration is null";
} else {
@@ -338,7 +338,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
this.topicName = source.getDefTopic();
} else {
- source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
+
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
this.errMsg = String.format("Topic is null for
inlongGroupId=(%s), inlongStreamId=(%s)",
this.groupId, this.streamId);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
index ddda70fde1..f02752ca10 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
@@ -148,7 +148,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
tmpTopicName = source.getDefTopic();
} else {
- source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
+
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
this.errMsg = String.format(
"Topic is null for inlongGroupId=(%s),
inlongStreamId=(%s)", tmpGroupId, tmpStreamId);