This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 86182877d [INLONG-6417][SDK] Support proxy-send mode (#6437)
86182877d is described below
commit 86182877d0948eaadaca6b9a97a21f7801ffde35
Author: xueyingzhang <[email protected]>
AuthorDate: Tue Nov 8 15:47:47 2022 +0800
[INLONG-6417][SDK] Support proxy-send mode (#6437)
---
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 376 +++++++++++++++++----
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 3 +-
2 files changed, 306 insertions(+), 73 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 2dc1c2ca5..4cba1a3ea 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -95,12 +96,12 @@ public class DefaultMessageSender implements MessageSender {
/**
* generate by cluster id
*
- * @param configure - sender
+ * @param configure - sender
* @param selfDefineFactory - sender factory
* @return - sender
*/
public static DefaultMessageSender
generateSenderByClusterId(ProxyClientConfig configure,
- ThreadFactory
selfDefineFactory) throws Exception {
+ ThreadFactory selfDefineFactory) throws Exception {
ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(configure,
Utils.getLocalIp(), null);
proxyConfigManager.setGroupId(configure.getGroupId());
@@ -188,35 +189,61 @@ public class DefaultMessageSender implements
MessageSender {
@Deprecated
public SendResult sendMessage(byte[] body, String attributes, String
msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sender.syncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()), msgUUID, timeout, timeUnit);
}
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
+ return sendMessage(body, groupId, streamId, dt, msgUUID, timeout,
timeUnit, false);
+ }
+
+ /**
+ * ync send single message
+ *
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
addIndexCnt(groupId, streamId, 1);
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+ }
+
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype,
isCompressEnd, isReport,
- isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
groupId, streamId, "");
+ isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout,
timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
if (isCompressEnd) {
return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId + "&streamId="
- + streamId + "&dt=" + dt + "&cp=snappy",
idGenerator.getNextId(), this.getMsgtype(),
- true, groupId), msgUUID, timeout, timeUnit);
+ + streamId + "&dt=" + dt + "&cp=snappy" + proxySend,
idGenerator.getNextId(),
+ this.getMsgtype(), true, groupId), msgUUID, timeout,
timeUnit);
} else {
- return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId + "&streamId="
- + streamId + "&dt=" + dt,
idGenerator.getNextId(), this.getMsgtype(), false, groupId),
- msgUUID, timeout, timeUnit);
+ return sender.syncSendMessage(new EncodeObject(body,
+ "groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + dt + proxySend,
+ idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID, timeout, timeUnit);
}
}
@@ -224,7 +251,26 @@ public class DefaultMessageSender implements MessageSender
{
}
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap) {
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
{
+ return sendMessage(body, groupId, streamId, dt, msgUUID, timeout,
timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * sync send single message
+ *
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap,
boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -232,6 +278,9 @@ public class DefaultMessageSender implements MessageSender {
}
addIndexCnt(groupId, streamId, 1);
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -260,27 +309,54 @@ public class DefaultMessageSender implements
MessageSender {
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
+ return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout,
timeUnit, false);
+ }
+
+ /**
+ * sync send a batch of messages
+ *
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
addIndexCnt(groupId, streamId, bodyList.size());
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_SYNC_SEND + "=true";
+ }
+
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
isCompress, isReport,
isGroupIdTransfer, dt / 1000,
- idGenerator.getNextInt(), groupId, streamId, "");
+ idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout,
timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
if (isCompress) {
return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size(), idGenerator.getNextId(),
- this.getMsgtype(), true, groupId), msgUUID, timeout,
timeUnit);
+ + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size() + proxySend,
+ idGenerator.getNextId(), this.getMsgtype(), true,
groupId), msgUUID, timeout, timeUnit);
} else {
return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cnt=" + bodyList.size(),
idGenerator.getNextId(), this.getMsgtype(),
+ + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend,
idGenerator.getNextId(),
+ this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
}
}
@@ -288,13 +364,35 @@ public class DefaultMessageSender implements
MessageSender {
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt,
- String msgUUID, long timeout, TimeUnit
timeUnit, Map<String, String> extraAttrMap) {
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap) {
+ return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout,
timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * sync send a batch of messages
+ *
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(
+ extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES;
}
addIndexCnt(groupId, streamId, bodyList.size());
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
@@ -322,37 +420,62 @@ public class DefaultMessageSender implements
MessageSender {
@Deprecated
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String attributes,
- String msgUUID, long timeout, TimeUnit
timeUnit) throws ProxysdkException {
+ String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
sender.asyncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()),
callback, msgUUID, timeout, timeUnit);
}
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
- String groupId, String streamId, long dt,
String msgUUID,
- long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
timeout, timeUnit, false);
+ }
+
+ /**
+ * async send single message
+ *
+ * @param callback callback can be null
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, boolean
isProxySend) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, 1);
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+ }
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body,
this.getMsgtype(), isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
- groupId, streamId, "");
+ groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompressEnd) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
sender.asyncSendMessage(new EncodeObject(body, "groupId="
- + groupId + "&streamId=" + streamId + "&dt=" +
dt + "&cp=snappy",
+ + groupId + "&streamId=" + streamId + "&dt=" +
dt + "&cp=snappy" + proxySend,
idGenerator.getNextId(), this.getMsgtype(),
true, groupId),
callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
new EncodeObject(body, "groupId=" + groupId +
"&streamId="
- + streamId + "&dt=" + dt,
idGenerator.getNextId(),
+ + streamId + "&dt=" + dt + proxySend,
idGenerator.getNextId(),
this.getMsgtype(), false, groupId), callback,
msgUUID, timeout, timeUnit);
}
@@ -360,14 +483,38 @@ public class DefaultMessageSender implements
MessageSender {
}
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId,
- long dt, String msgUUID, long timeout,
TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws
ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap)
+ throws ProxysdkException {
+ asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
timeout, timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * async send single message
+ *
+ * @param callback callback can be null
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap, boolean isProxySend)
+ throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, 1);
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -392,45 +539,97 @@ public class DefaultMessageSender implements
MessageSender {
}
}
+ public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
+ long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
timeout, timeUnit, false);
+ }
+
+ /**
+ * async send a batch of messages
+ *
+ * @param callback callback can be null
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report time
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @throws ProxysdkException
+ */
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList,
- String groupId, String streamId, long dt,
String msgUUID,
- long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, boolean isProxySend) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+ }
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList,
this.getMsgtype(), isCompress,
isReport, isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
- groupId, streamId, "");
+ groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
if (isCompress) {
sender.asyncSendMessage(
new EncodeObject(bodyList, "groupId=" + groupId +
"&streamId=" + streamId
- + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size(), idGenerator.getNextId(),
+ + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size() + proxySend,
+ idGenerator.getNextId(),
this.getMsgtype(), true, groupId), callback,
msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
- new EncodeObject(bodyList, "groupId=" + groupId +
"&streamId=" + streamId + "&dt=" + dt
- + "&cnt=" + bodyList.size(),
idGenerator.getNextId(), this.getMsgtype(),
- false, groupId), callback, msgUUID, timeout,
timeUnit);
+ new EncodeObject(bodyList,
+ "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + dt + "&cnt=" + bodyList.size()
+ + proxySend, idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
+ callback, msgUUID, timeout, timeUnit);
}
}
}
public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws
ProxysdkException {
+ List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
throws ProxysdkException {
+ asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
timeout, timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * async send a batch of messages
+ *
+ * @param callback callback can be null
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report time
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(SendMessageCallback callback,
+ List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(
+ extraAttrMap)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
@@ -465,28 +664,60 @@ public class DefaultMessageSender implements
MessageSender {
* @throws ProxysdkException
*/
@Override
- public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body,
- SendMessageCallback callback) throws
ProxysdkException {
+ public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback)
+ throws ProxysdkException {
this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(),
idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT);
}
/**
- * asyncSendMessage
+ * async send single message
*
- * @param inlongGroupId
- * @param inlongStreamId
- * @param bodyList
- * @param callback
+ * @param inlongGroupId groupId
+ * @param inlongStreamId streamId
+ * @param body a single message
+ * @param callback callback can be null
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback,
+ boolean isProxySend) throws ProxysdkException {
+ this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(),
+ idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT, isProxySend);
+ }
+
+ /**
+ * async send a batch of messages
+ *
+ * @param inlongGroupId groupId
+ * @param inlongStreamId streamId
+ * @param bodyList list of messages
+ * @param callback callback can be null
* @throws ProxysdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
- SendMessageCallback callback) throws
ProxysdkException {
+ SendMessageCallback callback) throws ProxysdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT);
}
+ /**
+ * async send a batch of messages
+ *
+ * @param inlongGroupId groupId
+ * @param inlongStreamId streamId
+ * @param bodyList list of messages
+ * @param callback callback can be null
+ * @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
+ SendMessageCallback callback, boolean isProxySend) throws
ProxysdkException {
+ this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
+ idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT, isProxySend);
+ }
+
private void addIndexCnt(String groupId, String streamId, long cnt) {
try {
String key = groupId + "|" + streamId;
@@ -501,10 +732,10 @@ public class DefaultMessageSender implements
MessageSender {
}
}
- public void asyncsendMessageData(FileCallback callback, List<byte[]>
bodyList, String groupId,
- String streamId, long dt, int sid,
boolean isSupportLF, String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws
ProxysdkException {
+ @Deprecated
+ public void asyncsendMessageData(FileCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
+ long dt, int sid, boolean isSupportLF, String msgUUID, long
timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -523,9 +754,9 @@ public class DefaultMessageSender implements MessageSender {
}
}
- private void asyncSendMetric(FileCallback callback, byte[] body, String
groupId,
- String streamId, long dt, int sid, String ip,
String msgUUID,
- long timeout, TimeUnit timeUnit, String
messageKey) throws ProxysdkException {
+ @Deprecated
+ private void asyncSendMetric(FileCallback callback, byte[] body, String
groupId, String streamId, long dt, int sid,
+ String ip, String msgUUID, long timeout, TimeUnit timeUnit, String
messageKey) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -538,23 +769,23 @@ public class DefaultMessageSender implements
MessageSender {
}
}
- public void asyncsendMessageProxy(FileCallback callback, byte[] body,
String groupId, String streamId,
- long dt, int sid, String ip, String
msgUUID,
- long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ @Deprecated
+ public void asyncsendMessageProxy(FileCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ int sid, String ip, String msgUUID, long timeout, TimeUnit
timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip,
msgUUID, timeout,
timeUnit, "minute");
}
- public void asyncsendMessageFile(FileCallback callback, byte[] body,
String groupId,
- String streamId, long dt, int sid, String
msgUUID,
- long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ @Deprecated
+ public void asyncsendMessageFile(FileCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, "",
msgUUID, timeout, timeUnit,
"file");
}
- public String sendMessageData(List<byte[]> bodyList, String groupId,
- String streamId, long dt, int sid, boolean
isSupportLF, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap) {
+ @Deprecated
+ public String sendMessageData(List<byte[]> bodyList, String groupId,
String streamId, long dt, int sid,
+ boolean isSupportLF, String msgUUID, long timeout, TimeUnit
timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -574,9 +805,9 @@ public class DefaultMessageSender implements MessageSender {
return null;
}
- private String sendMetric(byte[] body, String groupId, String streamId,
long dt, int sid, String ip,
- String msgUUID,
- long timeout, TimeUnit timeUnit, String
messageKey) {
+ @Deprecated
+ private String sendMetric(byte[] body, String groupId, String streamId,
long dt, int sid, String ip, String msgUUID,
+ long timeout, TimeUnit timeUnit, String messageKey) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES.toString();
@@ -589,14 +820,15 @@ public class DefaultMessageSender implements
MessageSender {
return null;
}
- public String sendMessageProxy(byte[] body, String groupId, String
streamId,
- long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ @Deprecated
+ public String sendMessageProxy(byte[] body, String groupId, String
streamId, long dt, int sid, String ip,
+ String msgUUID, long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID,
timeout, timeUnit, "minute");
}
+ @Deprecated
public String sendMessageFile(byte[] body, String groupId, String
streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID,
timeout, timeUnit, "file");
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index df1c4939a..fd6b0f275 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -39,7 +39,8 @@ public class ProxyUtils {
static {
Collections.addAll(invalidAttr, "groupId", "streamId", "dt",
"msgUUID", "cp",
"cnt", "mt", "m", "sid", "t", "NodeIP", "messageId",
"_file_status_check", "_secretId",
- "_signature", "_timeStamp", "_nonce", "_userName",
"_clientIP", "_encyVersion", "_encyAesKey");
+ "_signature", "_timeStamp", "_nonce", "_userName",
"_clientIP", "_encyVersion", "_encyAesKey",
+ "proxySend", "errMsg", "errCode");
}
public static boolean isAttrKeysValid(Map<String, String> attrsMap) {