This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 1e3212cdb7fe27bdf3140766f3834b80028dacde Author: xueyingzhang <[email protected]> AuthorDate: Tue Nov 8 17:06:39 2022 +0800 [INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448) --- .../inlong/common/msg/AttributeConstants.java | 5 + .../apache/inlong/sdk/dataproxy/SendResult.java | 11 +- .../inlong/sdk/dataproxy/codec/EncodeObject.java | 154 ++++++++++++++------- .../inlong/sdk/dataproxy/codec/ErrorCode.java | 52 ------- .../sdk/dataproxy/codec/ProtocolDecoder.java | 38 ++--- .../inlong/sdk/dataproxy/network/Sender.java | 92 ++++++------ 6 files changed, 177 insertions(+), 175 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java index 645de6d4f..6cd635ea7 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java @@ -89,4 +89,9 @@ public interface AttributeConstants { // error message, add by receiver String MESSAGE_PROCESS_ERRMSG = "errMsg"; + + String MESSAGE_ID = "messageId"; + + // dataproxy IP from dp response ack + String MESSAGE_DP_IP = "dpIP"; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java index 10c89af38..75f057b8f 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java @@ -19,14 +19,19 @@ package org.apache.inlong.sdk.dataproxy; public enum SendResult { - INVALID_ATTRIBUTES, + INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112) OK, TIMEOUT, CONNECTION_BREAK, THREAD_INTERRUPT, ASYNC_CALLBACK_BUFFER_FULL, NO_CONNECTION, - INVALID_DATA, - UNKOWN_ERROR + INVALID_DATA, // including DataProxyErrCode(103, 111) + BODY_EXCEED_MAX_LEN, // DataProxyErrCode(104) + SINK_SERVICE_UNREADY, // DataProxyErrCode(1) + UNCONFIGURED_GROUPID_OR_STREAMID, // DataProxyErrCode(113) + TOPIC_IS_BLANK, // DataProxyErrCode(115) + DATAPROXY_FAIL_TO_RECEIVE, // DataProxyErrCode(114,116,117,118,119,120) + UNKOWN_ERROR } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java index bf50507e2..8634621cf 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java @@ -18,12 +18,21 @@ package org.apache.inlong.sdk.dataproxy.codec; -import java.util.List; - +import com.google.common.base.Splitter; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.common.enums.DataProxyErrCode; +import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.sdk.dataproxy.SendResult; import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class EncodeObject { - private static final String MESSAGE_ID_PREFIX = "messageId="; + + private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(AttributeConstants.SEPARATOR).trimResults() + .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); private byte[] bodyBytes; private String attributes; @@ -54,25 +63,24 @@ public class EncodeObject { private String msgUUID = null; private EncryptConfigEntry encryptEntry = null; - private boolean isException = false; - private ErrorCode exceptionError = null; + private SendResult sendResult = SendResult.OK; + private String errMsg; + private String dpIp; - /* Used by de_serialization. msgtype=7/8*/ + /* Used by de_serialization. msgtype=8*/ public EncodeObject() { } + /* Used by de_serialization. msgtype=7*/ + public EncodeObject(String attributes) { + handleAttr(attributes); + } + /* Used by de_serialization. */ public EncodeObject(byte[] bodyBytes, String attributes) { this.bodyBytes = bodyBytes; this.attributes = attributes; - this.messageId = ""; - String[] tokens = attributes.split("&"); - for (int i = 0; i < tokens.length; i++) { - if (tokens[i].startsWith("messageId=")) { - this.messageId = tokens[i].substring(MESSAGE_ID_PREFIX.length(), tokens[i].length()); - break; - } - } + handleAttr(attributes); } /* Used by serialization.But never used */ @@ -85,7 +93,7 @@ public class EncodeObject { // used for bytes initializtion,msgtype=3/5 public EncodeObject(byte[] bodyBytes, String attributes, String messageId, - int msgtype, boolean isCompress, final String groupId) { + int msgtype, boolean isCompress, final String groupId) { this.bodyBytes = bodyBytes; this.messageId = messageId; this.attributes = attributes + "&messageId=" + messageId; @@ -96,7 +104,7 @@ public class EncodeObject { // used for bodylist initializtion,msgtype=3/5 public EncodeObject(List<byte[]> bodyList, String attributes, String messageId, - int msgtype, boolean isCompress, final String groupId) { + int msgtype, boolean isCompress, final String groupId) { this.bodylist = bodyList; this.messageId = messageId; this.attributes = attributes + "&messageId=" + messageId; @@ -107,8 +115,8 @@ public class EncodeObject { // used for bytes initializtion,msgtype=7/8 public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, boolean isReport, - boolean isGroupIdTransfer, long dt, long seqId, String groupId, - String streamId, String commonattr) { + boolean isGroupIdTransfer, long dt, long seqId, String groupId, + String streamId, String commonattr) { this.bodyBytes = bodyBytes; this.msgtype = msgtype; this.isCompress = isCompress; @@ -123,8 +131,8 @@ public class EncodeObject { // used for bodylist initializtion,msgtype=7/8 public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress, - boolean isReport, boolean isGroupIdTransfer, long dt, - long seqId, String groupId, String streamId, String commonattr) { + boolean isReport, boolean isGroupIdTransfer, long dt, + long seqId, String groupId, String streamId, String commonattr) { this.bodylist = bodyList; this.msgtype = msgtype; this.isCompress = isCompress; @@ -139,9 +147,9 @@ public class EncodeObject { // file agent, used for bytes initializtion,msgtype=7/8 public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, - boolean isReport, boolean isGroupIdTransfer, long dt, - long seqId, String groupId, String streamId, String commonattr, - String messageKey, String proxyIp) { + boolean isReport, boolean isGroupIdTransfer, long dt, + long seqId, String groupId, String streamId, String commonattr, + String messageKey, String proxyIp) { this.bodyBytes = bodyBytes; this.msgtype = msgtype; this.isCompress = isCompress; @@ -158,9 +166,9 @@ public class EncodeObject { // file agent, used for bodylist initializtion,msgtype=7/8 public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress, - boolean isReport, boolean isGroupIdTransfer, long dt, - long seqId, String groupId, String streamId, String commonattr, - String messageKey, String proxyIp) { + boolean isReport, boolean isGroupIdTransfer, long dt, + long seqId, String groupId, String streamId, String commonattr, + String messageKey, String proxyIp) { this.bodylist = bodyList; this.msgtype = msgtype; this.isCompress = isCompress; @@ -175,6 +183,60 @@ public class EncodeObject { this.proxyIp = proxyIp; } + private void handleAttr(String attributes) { + if (StringUtils.isBlank(attributes)) { + return; + } + Map<String, String> backAttrs = new HashMap<>(MAP_SPLITTER.split(attributes)); + if (backAttrs.containsKey(AttributeConstants.MESSAGE_ID)) { + this.messageId = backAttrs.get(AttributeConstants.MESSAGE_ID); + } + dpIp = backAttrs.get(AttributeConstants.MESSAGE_DP_IP); + + String errCode = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE); + // errCode is empty or equals 0 -> success + if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) { + this.sendResult = SendResult.OK; + } else { + // get errMsg + this.errMsg = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG); + if (StringUtils.isBlank(errMsg)) { + this.errMsg = DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg(); + } + //sendResult + this.sendResult = convertToSendResult(Integer.parseInt(errCode)); + } + } + + private SendResult convertToSendResult(int errCode) { + DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode); + switch (dpErrCode) { + case SINK_SERVICE_UNREADY: + return SendResult.SINK_SERVICE_UNREADY; + case MISS_REQUIRED_GROUPID_ARGUMENT: + case MISS_REQUIRED_STREAMID_ARGUMENT: + case MISS_REQUIRED_DT_ARGUMENT: + case UNSUPPORTED_EXTEND_FIELD_VALUE: + return SendResult.INVALID_ATTRIBUTES; + case MISS_REQUIRED_BODY_ARGUMENT: + case EMPTY_MSG: + return SendResult.INVALID_DATA; + case BODY_EXCEED_MAX_LEN: + return SendResult.BODY_EXCEED_MAX_LEN; + case UNCONFIGURED_GROUPID_OR_STREAMID: + return SendResult.UNCONFIGURED_GROUPID_OR_STREAMID; + case PUT_EVENT_TO_CHANNEL_FAILURE: + case NO_AVAILABLE_PRODUCER: + case PRODUCER_IS_NULL: + case SEND_REQUEST_TO_MQ_FAILURE: + case MQ_RETURN_ERROR: + case DUPLICATED_MESSAGE: + return SendResult.DATAPROXY_FAIL_TO_RECEIVE; + default: + return SendResult.UNKOWN_ERROR; + } + } + public String getMsgUUID() { return msgUUID; } @@ -215,14 +277,6 @@ public class EncodeObject { this.streamId = streamId; } - public void setMsgtype(int msgtype) { - this.msgtype = msgtype; - } - - public void setBodyBytes(byte[] bodyBytes) { - this.bodyBytes = bodyBytes; - } - public boolean isReport() { return isReport; } @@ -317,10 +371,18 @@ public class EncodeObject { return msgtype; } + public void setMsgtype(int msgtype) { + this.msgtype = msgtype; + } + public byte[] getBodyBytes() { return bodyBytes; } + public void setBodyBytes(byte[] bodyBytes) { + this.bodyBytes = bodyBytes; + } + public String getAttributes() { return attributes; } @@ -361,6 +423,10 @@ public class EncodeObject { return cnt; } + public void setCnt(int cnt) { + this.cnt = cnt; + } + public int getRealCnt() { if (bodylist != null) { return bodylist.size(); @@ -368,23 +434,15 @@ public class EncodeObject { return 1; } - public void setCnt(int cnt) { - this.cnt = cnt; - } - - public boolean isException() { - return isException; - } - - public void setException(boolean exception) { - isException = exception; + public String getDpIp() { + return dpIp; } - public ErrorCode getExceptionError() { - return exceptionError; + public String getErrMsg() { + return errMsg; } - public void setExceptionError(ErrorCode exceptionError) { - this.exceptionError = exceptionError; + public SendResult getSendResult() { + return sendResult; } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java deleted file mode 100644 index 3598860a4..000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.codec; - -import java.util.HashMap; -import java.util.Map; - -public enum ErrorCode { - - ATTR_ERROR(1), - - DT_ERROR(2), - - COMPRESS_ERROR(3), - - OTHER_ERROR(4), - - LONG_LENGTH_ERROR(5); - private final int value; - private static final Map<Integer, ErrorCode> map = new HashMap<>(); - - static { - for (ErrorCode errorCode : ErrorCode.values()) { - map.put(errorCode.value, errorCode); - } - } - - ErrorCode(int value) { - this.value = value; - } - - public static ErrorCode valueOf(int value) { - return map.get(value); - } - -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java index 579b07b1e..7bf9bd883 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java @@ -21,15 +21,15 @@ package org.apache.inlong.sdk.dataproxy.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; -import java.nio.charset.StandardCharsets; - -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; +import java.util.List; + public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { - private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoder.class); @Override protected void decode(ChannelHandlerContext ctx, @@ -37,9 +37,9 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { buffer.markReaderIndex(); // totallen int totalLen = buffer.readInt(); - logger.debug("decode totalLen : {}", totalLen); + LOGGER.debug("decode totalLen : {}", totalLen); if (totalLen != buffer.readableBytes()) { - logger.error("totalLen is not equal readableBytes.total:" + totalLen + LOGGER.error("totalLen is not equal readableBytes.total:" + totalLen + ";readableBytes:" + buffer.readableBytes()); buffer.resetReaderIndex(); throw new Exception("totalLen is not equal readableBytes.total"); @@ -48,13 +48,13 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { int msgType = buffer.readByte() & 0x1f; if (msgType == 4) { - logger.info("debug decode"); + LOGGER.info("debug decode"); } if (msgType == 3 | msgType == 5) { // bodylen int bodyLength = buffer.readInt(); if (bodyLength >= totalLen) { - logger.error("bodyLen is greater than totalLen.totalLen:" + totalLen + LOGGER.error("bodyLen is greater than totalLen.totalLen:" + totalLen + ";bodyLen:" + bodyLength); buffer.resetReaderIndex(); throw new Exception("bodyLen is greater than totalLen.totalLen"); @@ -72,28 +72,20 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { attrBytes = new byte[attrLength]; buffer.readBytes(attrBytes); } - EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes, - StandardCharsets.UTF_8)); + EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes, StandardCharsets.UTF_8)); object.setMsgtype(5); out.add(object); } else if (msgType == 7) { int seqId = buffer.readInt(); int attrLen = buffer.readShort(); - EncodeObject object = new EncodeObject(); - object.setMessageId(String.valueOf(seqId)); - - if (attrLen == 4) { - int errorValue = buffer.readInt(); - ErrorCode errorCode = ErrorCode.valueOf(errorValue); - if (errorCode != null) { - object.setException(true); - object.setExceptionError(errorCode); - } - } else { - byte[] attrContent = new byte[attrLen]; - buffer.readBytes(attrContent); + byte[] attrBytes = null; + if (attrLen > 0) { + attrBytes = new byte[attrLen]; + buffer.readBytes(attrBytes); } + EncodeObject object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8)); + object.setMessageId(String.valueOf(seqId)); buffer.readShort(); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index 49be1ce54..f1eef9978 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -22,10 +22,10 @@ import io.netty.channel.Channel; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.FileCallback; -import org.apache.inlong.sdk.dataproxy.SendResult; +import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.SendMessageCallback; +import org.apache.inlong.sdk.dataproxy.SendResult; import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread; @@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory; import java.io.PrintWriter; import java.io.StringWriter; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -48,7 +48,7 @@ import java.util.concurrent.locks.ReentrantLock; public class Sender { - private static final Logger logger = LoggerFactory.getLogger(Sender.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); /* Store the callback used by asynchronously message sending. */ private final ConcurrentHashMap<Channel, ConcurrentHashMap<String, QueueObject>> callbacks = @@ -104,12 +104,12 @@ public class Sender { metricWorker = new MetricWorkerThread(configure, this); metricWorker.start(); - logger.info("proxy sdk is starting!"); + LOGGER.info("proxy sdk is starting!"); } private void checkCallbackList() { // max wait for 1 min - logger.info("checking call back list before close, current size is {}", + LOGGER.info("checking call back list before close, current size is {}", currentBufferSize.get()); int count = 0; try { @@ -118,10 +118,10 @@ public class Sender { count += 1; } if (currentBufferSize.get() > 0) { - logger.warn("callback not empty {}, please check it", currentBufferSize.get()); + LOGGER.warn("callback not empty {}, please check it", currentBufferSize.get()); } } catch (Exception ex) { - logger.error("exception while checking callback list", ex); + LOGGER.error("exception while checking callback list", ex); } } @@ -141,13 +141,13 @@ public class Sender { e.printStackTrace(pw); exceptStr = sw.toString(); } catch (Exception ex) { - logger.error(getExceptionStack(ex)); + LOGGER.error(getExceptionStack(ex)); } finally { try { pw.close(); sw.close(); } catch (Exception ex) { - logger.error(getExceptionStack(ex)); + LOGGER.error(getExceptionStack(ex)); } } return exceptStr; @@ -155,7 +155,7 @@ public class Sender { /*Used for asynchronously message sending.*/ public void notifyCallback(Channel channel, String messageId, SendResult result) { - logger.debug("Channel = {} , ack messageId = {}", channel, messageId); + LOGGER.debug("Channel = {} , ack messageId = {}", channel, messageId); if (channel == null) { return; } @@ -178,16 +178,13 @@ public class Sender { } } - private SendResult syncSendInternalMessage(NettyClient client, - EncodeObject encodeObject, String msgUUID, - long timeout, TimeUnit timeUnit) - throws ExecutionException, InterruptedException, TimeoutException { - + private SendResult syncSendInternalMessage(NettyClient client, EncodeObject encodeObject, String msgUUID, + long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { if (client == null) { return SendResult.NO_CONNECTION; } if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) { - logger.error("error attr format {} {}", encodeObject.getCommonattr(), + LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(), encodeObject.getAttributes()); return SendResult.INVALID_ATTRIBUTES; } @@ -237,17 +234,17 @@ public class Sender { message = syncSendInternalMessage(client, encodeObject, msgUUID, timeout, timeUnit); } catch (InterruptedException e) { // TODO Auto-generated catch block - logger.error("send message error {} ", getExceptionStack(e)); + LOGGER.error("send message error {} ", getExceptionStack(e)); syncCallables.remove(encodeObject.getMessageId()); return SendResult.THREAD_INTERRUPT; } catch (ExecutionException e) { // TODO Auto-generated catch block - logger.error("ExecutionException {} ", getExceptionStack(e)); + LOGGER.error("ExecutionException {} ", getExceptionStack(e)); syncCallables.remove(encodeObject.getMessageId()); return SendResult.UNKOWN_ERROR; } catch (TimeoutException e) { // TODO Auto-generated catch block - logger.error("TimeoutException {} ", getExceptionStack(e)); + LOGGER.error("TimeoutException {} ", getExceptionStack(e)); //e.printStackTrace(); SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId()); if (syncMessageCallable != null) { @@ -255,14 +252,14 @@ public class Sender { if (tmpClient != null) { Channel curChannel = tmpClient.getChannel(); if (curChannel != null) { - logger.error("channel maybe busy {}", curChannel); + LOGGER.error("channel maybe busy {}", curChannel); scanThread.addTimeoutChannel(curChannel); } } } return SendResult.TIMEOUT; } catch (Throwable e) { - logger.error("syncSendMessage exception {} ", getExceptionStack(e)); + LOGGER.error("syncSendMessage exception {} ", getExceptionStack(e)); syncCallables.remove(encodeObject.getMessageId()); return SendResult.UNKOWN_ERROR; } @@ -280,8 +277,7 @@ public class Sender { } private SendResult syncSendMessageIndexInternal(NettyClient client, EncodeObject encodeObject, String msgUUID, - long timeout, TimeUnit timeUnit) - throws ExecutionException, InterruptedException, TimeoutException { + long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { if (client == null || !client.isActive()) { chooseProxy.remove(encodeObject.getMessageId()); client = clientMgr.getClientByRoundRobin(); @@ -336,7 +332,7 @@ public class Sender { client = clientMgr.getContainProxy(proxyip); } if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) { - logger.error("error attr format {} {}", encodeObject.getCommonattr(), + LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(), encodeObject.getAttributes()); return SendResult.INVALID_ATTRIBUTES.toString(); } @@ -345,17 +341,17 @@ public class Sender { msgUUID, timeout, timeUnit); } catch (InterruptedException e) { // TODO Auto-generated catch block - logger.error("send message error {}", getExceptionStack(e)); + LOGGER.error("send message error {}", getExceptionStack(e)); syncCallables.remove(encodeObject.getMessageId()); return SendResult.THREAD_INTERRUPT.toString(); } catch (ExecutionException e) { // TODO Auto-generated catch block - logger.error("ExecutionException {}", getExceptionStack(e)); + LOGGER.error("ExecutionException {}", getExceptionStack(e)); syncCallables.remove(encodeObject.getMessageId()); return SendResult.UNKOWN_ERROR.toString(); } catch (TimeoutException e) { // TODO Auto-generated catch block - logger.error("TimeoutException {}", getExceptionStack(e)); + LOGGER.error("TimeoutException {}", getExceptionStack(e)); //e.printStackTrace(); SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId()); if (syncMessageCallable != null) { @@ -363,21 +359,21 @@ public class Sender { if (tmpClient != null) { Channel curChannel = tmpClient.getChannel(); if (curChannel != null) { - logger.error("channel maybe busy {}", curChannel); + LOGGER.error("channel maybe busy {}", curChannel); scanThread.addTimeoutChannel(curChannel); } } } return SendResult.TIMEOUT.toString(); } catch (Throwable e) { - logger.error("syncSendMessage exception {}", getExceptionStack(e)); + LOGGER.error("syncSendMessage exception {}", getExceptionStack(e)); syncCallables.remove(encodeObject.getMessageId()); return SendResult.UNKOWN_ERROR.toString(); } scanThread.resetTimeoutChannel(client.getChannel()); return message.toString() + "=" + client.getServerIP(); } catch (Exception e) { - logger.error("agent send error {}", getExceptionStack(e)); + LOGGER.error("agent send error {}", getExceptionStack(e)); syncCallables.remove(encodeObject.getMessageId()); return SendResult.UNKOWN_ERROR.toString(); } @@ -394,7 +390,7 @@ public class Sender { * @throws ProxysdkException */ public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback callback, String msgUUID, long timeout, - TimeUnit timeUnit) throws ProxysdkException { + TimeUnit timeUnit) throws ProxysdkException { NettyClient client = chooseProxy.get(encodeObject.getMessageId()); String proxyip = encodeObject.getProxyIp(); if (proxyip != null && proxyip.length() != 0) { @@ -510,7 +506,7 @@ public class Sender { * Following methods used by asynchronously message sending. */ public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback callback, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException { + long timeout, TimeUnit timeUnit) throws ProxysdkException { metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(), encodeObject.getDt(), encodeObject.getRealCnt()); @@ -525,7 +521,7 @@ public class Sender { throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL"); } if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) { - logger.error("error attr format {} {}", encodeObject.getCommonattr(), + LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(), encodeObject.getAttributes()); throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); } @@ -554,7 +550,7 @@ public class Sender { QueueObject queueObject = msgQueueMap.putIfAbsent(encodeObject.getMessageId(), new QueueObject(System.currentTimeMillis(), callback, size, timeout, timeUnit)); if (queueObject != null) { - logger.warn("message id {} has existed.", encodeObject.getMessageId()); + LOGGER.warn("message id {} has existed.", encodeObject.getMessageId()); } if (encodeObject.getMsgtype() == 7) { int groupIdnum = 0; @@ -584,17 +580,15 @@ public class Sender { String messageId = response.getMessageId(); chooseProxy.remove(messageId); SyncMessageCallable callable = syncCallables.remove(messageId); - SendResult result = response.isException() ? SendResult.INVALID_ATTRIBUTES : SendResult.OK; + SendResult result = response.getSendResult(); if (result == SendResult.OK) { metricWorker.recordSuccessByMessageId(messageId); + } else { + LOGGER.error("{} exception happens, error message {}", channel, response.getErrMsg()); } if (callable != null) { // for syncSend callable.update(result); } - if (response.isException()) { - logger.error("{} exception happens, error message {}", channel, - response.getExceptionError()); - } notifyCallback(channel, messageId, result); // for asyncSend } @@ -606,7 +600,7 @@ public class Sender { if (channel == null) { return; } - logger.info("channel {} connection is disconnected!", channel); + LOGGER.info("channel {} connection is disconnected!", channel); try { ConcurrentHashMap<String, QueueObject> msgQueueMap = callbacks.remove(channel); if (msgQueueMap != null) { @@ -627,7 +621,7 @@ public class Sender { msgQueueMap.clear(); } } catch (Throwable e2) { - logger.info("process channel {} disconnected callbacks throw error,", channel, e2); + LOGGER.info("process channel {} disconnected callbacks throw error,", channel, e2); } try { @@ -654,7 +648,7 @@ public class Sender { } } } catch (Throwable e) { - logger.info("process channel {} disconnected syncCallables throw error,", channel, e); + LOGGER.info("process channel {} disconnected syncCallables throw error,", channel, e); } } @@ -663,28 +657,28 @@ public class Sender { if (channel == null) { return; } - logger.info("wait for ack for channel {}", channel); + LOGGER.info("wait for ack for channel {}", channel); try { ConcurrentHashMap<String, QueueObject> queueObjMap = callbacks.get(channel); if (queueObjMap != null) { while (true) { if (queueObjMap.isEmpty()) { - logger.info("this channel {} is empty!", channel); + LOGGER.info("this channel {} is empty!", channel); break; } try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block - logger.error("wait for ack for channel {}, error {}", + LOGGER.error("wait for ack for channel {}, error {}", channel, e.getMessage()); e.printStackTrace(); } } } - logger.info("waitForAckForChannel finished , channel is {}", channel); + LOGGER.info("waitForAckForChannel finished , channel is {}", channel); } catch (Throwable e) { - logger.error("waitForAckForChannel exception, channel is {}", channel, e); + LOGGER.error("waitForAckForChannel exception, channel is {}", channel, e); } }
