This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a658a091b3 [INLONG-11463][SDK] Remove deprecated APIs in the
DefaultMessageSender class (#11464)
a658a091b3 is described below
commit a658a091b369d0d76c698298d0cfc7fef12d2ed1
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Nov 6 20:39:30 2024 +0800
[INLONG-11463][SDK] Remove deprecated APIs in the DefaultMessageSender
class (#11464)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/agent/constant/CommonConstants.java | 3 -
.../plugin/sinks/filecollect/SenderManager.java | 3 -
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 117 -----------
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 9 -
.../inlong/sdk/dataproxy/common/FileCallback.java | 38 ----
.../sdk/dataproxy/common/SendMessageCallback.java | 4 +-
.../sdk/dataproxy/example/MyFileCallBack.java | 61 ------
.../sdk/dataproxy/example/MyMessageCallBack.java | 11 +-
.../inlong/sdk/dataproxy/network/Sender.java | 229 +--------------------
.../sdk/dataproxy/threads/MetricWorkerThread.java | 18 +-
.../sdk/dataproxy/threads/TimeoutScanThread.java | 10 +-
11 files changed, 25 insertions(+), 478 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 53a5bd976c..757db41afd 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -68,9 +68,6 @@ public class CommonConstants {
public static final String PROXY_SENDER_MAX_RETRY =
"proxy.sender.maxRetry";
public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;
- public static final String PROXY_IS_FILE = "proxy.isFile";
- public static final boolean DEFAULT_IS_FILE = false;
-
public static final String PROXY_CLIENT_IO_THREAD_NUM =
"client.iothread.num";
public static final int DEFAULT_PROXY_CLIENT_IO_THREAD_NUM =
Runtime.getRuntime().availableProcessors();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 984baf6de6..a37a171a37 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -87,7 +87,6 @@ public class SenderManager {
private final int aliveConnectionNum;
private final boolean isCompress;
private final int msgType;
- private final boolean isFile;
private final long maxSenderTimeout;
private final int maxSenderRetry;
private final long retrySleepTime;
@@ -133,7 +132,6 @@ public class SenderManager {
CommonConstants.PROXY_SENDER_MAX_RETRY,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
retrySleepTime = agentConf.getLong(
CommonConstants.PROXY_RETRY_SLEEP,
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
- isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
ioThreadNum =
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait =
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -200,7 +198,6 @@ public class SenderManager {
ProxyClientConfig proxyClientConfig = new
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
- proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setIoThreadNum(ioThreadNum);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index ec3eff3bad..623e84d8c0 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -21,7 +21,6 @@ import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
@@ -833,122 +832,6 @@ public class DefaultMessageSender implements
MessageSender {
}
}
- @Deprecated
- public void asyncsendMessageData(FileCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
- long dt, int sid, boolean isSupportLF, String msgUUID, long
timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
- dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
- throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
- }
- if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
- throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
- }
- addIndexCnt(groupId, streamId, bodyList.size());
-
- StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
-
- if (msgtype == 7 || msgtype == 8) {
- EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
- isCompress, isReport, isGroupIdTransfer,
- dt / 1000, sid, groupId, streamId, attrs.toString(),
"data", "");
- encodeObject.setSupportLF(isSupportLF);
- sender.asyncSendMessageIndex(encodeObject, callback, msgUUID,
timeout, timeUnit);
- }
- }
-
- @Deprecated
- private void asyncSendMetric(FileCallback callback, byte[] body, String
groupId, String streamId, long dt, int sid,
- String ip, String msgUUID, long timeout, TimeUnit timeUnit, String
messageKey) throws ProxysdkException {
- dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
- throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
- }
- if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
- throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
- }
- boolean isCompressEnd = false;
- if (msgtype == 7 || msgtype == 8) {
- sender.asyncSendMessageIndex(new EncodeObject(body, msgtype,
isCompressEnd,
- isReport, isGroupIdTransfer, dt / 1000,
- sid, groupId, streamId, "", messageKey, ip), callback,
msgUUID, timeout, timeUnit);
- }
- }
-
- @Deprecated
- public void asyncsendMessageProxy(FileCallback callback, byte[] body,
String groupId, String streamId, long dt,
- int sid, String ip, String msgUUID, long timeout, TimeUnit
timeUnit) throws ProxysdkException {
- asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip,
msgUUID, timeout,
- timeUnit, "minute");
- }
-
- @Deprecated
- public void asyncsendMessageFile(FileCallback callback, byte[] body,
String groupId, String streamId, long dt,
- int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
- asyncSendMetric(callback, body, groupId, streamId, dt, sid, "",
msgUUID, timeout, timeUnit,
- "file");
- }
-
- @Deprecated
- public String sendMessageData(List<byte[]> bodyList, String groupId,
String streamId, long dt, int sid,
- boolean isSupportLF, String msgUUID, long timeout, TimeUnit
timeUnit, Map<String, String> extraAttrMap) {
- dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
- return SendResult.INVALID_ATTRIBUTES.toString();
- }
- if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
- return SendResult.BODY_EXCEED_MAX_LEN.toString();
- }
- addIndexCnt(groupId, streamId, bodyList.size());
-
- StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
-
- if (msgtype == 7 || msgtype == 8) {
- EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
isCompress,
- isReport, isGroupIdTransfer, dt / 1000,
- sid, groupId, streamId, attrs.toString(), "data", "");
- encodeObject.setSupportLF(isSupportLF);
- Function<Sender, String> sendOperation = (sender) ->
sender.syncSendMessageIndex(encodeObject, msgUUID,
- timeout, timeUnit);
- return attemptSendMessageIndex(sendOperation);
- }
- return null;
- }
-
- @Deprecated
- private String sendMetric(byte[] body, String groupId, String streamId,
long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit, String messageKey) {
- dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
- return SendResult.INVALID_ATTRIBUTES.toString();
- }
- if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
- return SendResult.BODY_EXCEED_MAX_LEN.toString();
- }
- if (msgtype == 7 || msgtype == 8) {
- EncodeObject encodeObject = new EncodeObject(body, msgtype, false,
isReport,
- isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "",
messageKey, ip);
- Function<Sender, String> sendOperation = (sender) ->
sender.syncSendMessageIndex(encodeObject, msgUUID,
- timeout, timeUnit);
- return attemptSendMessageIndex(sendOperation);
- }
- return null;
- }
-
- @Deprecated
- public String sendMessageProxy(byte[] body, String groupId, String
streamId, long dt, int sid, String ip,
- String msgUUID, long timeout, TimeUnit timeUnit) {
- return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID,
timeout, timeUnit, "minute");
- }
-
- @Deprecated
- public String sendMessageFile(byte[] body, String groupId, String
streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) {
- return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID,
timeout, timeUnit, "file");
- }
-
private void shutdownInternalThreads() {
indexCol.shutDown();
MANAGER_FETCHER_THREAD_STARTED.set(false);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index d74f876fab..c3253805cb 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -39,7 +39,6 @@ public class ProxyClientConfig {
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
private String inlongGroupId;
- private boolean isFile = false;
private boolean requestByHttp = true;
private boolean isNeedDataEncry = false;
private boolean needAuthentication = false;
@@ -196,14 +195,6 @@ public class ProxyClientConfig {
return requestByHttp;
}
- public boolean isFile() {
- return isFile;
- }
-
- public void setFile(boolean file) {
- isFile = file;
- }
-
public String getInlongGroupId() {
return inlongGroupId;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
deleted file mode 100644
index 8fce78257e..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.common;
-
-public abstract class FileCallback implements SendMessageCallback {
-
- /* Invoked when a message is confirmed by TDBus. */
- public void onMessageAck(String result) {
- }
-
- ;
-
- public void onMessageAck(SendResult result) {
- }
-
- ;
-
- /* Invoked when a message transportation interrupted by an exception. */
- public void onException(Throwable e) {
- }
-
- ;
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
index fc80705031..9e83f4673c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
@@ -20,8 +20,8 @@ package org.apache.inlong.sdk.dataproxy.common;
public interface SendMessageCallback {
/* Invoked when a message is confirmed by TDBus. */
- public void onMessageAck(SendResult result);
+ void onMessageAck(SendResult result);
/* Invoked when a message transportation interrupted by an exception. */
- public void onException(Throwable e);
+ void onException(Throwable e);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
deleted file mode 100644
index 3685d0ad53..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MyFileCallBack extends FileCallback {
-
- private static final Logger logger = LoggerFactory
- .getLogger(MyFileCallBack.class);
- private DefaultMessageSender messageSender = null;
- private Event event = null;
-
- public MyFileCallBack() {
-
- }
-
- public MyFileCallBack(DefaultMessageSender messageSender, Event event) {
- super();
- this.messageSender = messageSender;
- this.event = event;
- }
-
- public void onMessageAck(String result) {
- logger.info("onMessageAck return result = {}", result);
- }
-
- public void onMessageAck(SendResult result) {
- if (result == SendResult.OK) {
- logger.info("onMessageAck return Ok");
- } else {
- logger.info("onMessageAck return failure = {}", result);
- }
- }
-
- public void onException(Throwable e) {
- logger.error("Send message failure, error {}", e.getMessage());
- e.printStackTrace();
- }
-
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
index 7aef6e705c..d9b5c08132 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
@@ -18,16 +18,17 @@
package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MyMessageCallBack extends FileCallback {
+public class MyMessageCallBack implements SendMessageCallback {
private static final Logger logger = LoggerFactory
.getLogger(MyMessageCallBack.class);
+
private DefaultMessageSender messageSender = null;
private Event event = null;
@@ -41,10 +42,7 @@ public class MyMessageCallBack extends FileCallback {
this.event = event;
}
- public void onMessageAck(String result) {
- logger.info("onMessageAck return result = {}", result);
- }
-
+ @Override
public void onMessageAck(SendResult result) {
if (result == SendResult.OK) {
logger.info("onMessageAck return Ok");
@@ -53,6 +51,7 @@ public class MyMessageCallBack extends FileCallback {
}
}
+ @Override
public void onException(Throwable e) {
logger.error("Send message failure, error {}", e.getMessage());
e.printStackTrace();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 9581da1f80..50b3105b56 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sdk.dataproxy.network;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
@@ -63,7 +62,6 @@ public class Sender {
private final TimeoutScanThread scanThread;
private final ClientMgr clientMgr;
private final ProxyClientConfig configure;
- private final boolean isFile;
private MetricWorkerThread metricWorker = null;
private int clusterId = -1;
@@ -98,7 +96,6 @@ public class Sender {
throw new Exception("In OutNetwork isNeedDataEncry must be
true!");
}
}
- this.isFile = configure.isFile();
scanThread = new TimeoutScanThread(callbacks, currentBufferSize,
configure, clientMgr);
scanThread.start();
@@ -172,15 +169,8 @@ public class Sender {
if (callback == null) {
return;
}
- if (isFile) {
- String proxyip = channel.remoteAddress().toString();
- ((FileCallback)
callback.getCallback()).onMessageAck(result.toString()
- + "=" + proxyip.substring(1, proxyip.indexOf(':')));
- currentBufferSize.addAndGet(-callback.getSize());
- } else {
- callback.getCallback().onMessageAck(result);
- currentBufferSize.decrementAndGet();
- }
+ callback.getCallback().onMessageAck(result);
+ currentBufferSize.decrementAndGet();
}
private SendResult syncSendInternalMessage(NettyClient client,
EncodeObject encodeObject, String msgUUID,
@@ -287,190 +277,6 @@ public class Sender {
return message;
}
- private SendResult syncSendMessageIndexInternal(NettyClient client,
EncodeObject encodeObject, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ExecutionException,
InterruptedException, TimeoutException {
- if (client == null || !client.isActive()) {
- chooseProxy.remove(encodeObject.getMessageId());
- client = clientMgr.getClientByRoundRobin();
- if (client == null) {
- return SendResult.NO_CONNECTION;
- }
- chooseProxy.put(encodeObject.getMessageId(), client);
- }
-
- if (encodeObject.getMsgtype() == 7) {
- int groupIdnum = 0;
- int streamIdnum = 0;
- if (encodeObject.getGroupId().equals(clientMgr.getGroupId())) {
- groupIdnum = clientMgr.getGroupIdNum();
- streamIdnum =
clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null
- ?
clientMgr.getStreamIdMap().get(encodeObject.getStreamId())
- : 0;
- }
- encodeObject.setGroupIdNum(groupIdnum);
- encodeObject.setStreamIdNum(streamIdnum);
- if (groupIdnum == 0 || streamIdnum == 0) {
- encodeObject.setGroupIdTransfer(false);
- }
- }
- if (this.configure.isNeedDataEncry()) {
- encodeObject.setEncryptEntry(true, configure.getUserName(),
clientMgr.getEncryptConfigEntry());
- } else {
- encodeObject.setEncryptEntry(false, null, null);
- }
- encodeObject.setMsgUUID(msgUUID);
- SyncMessageCallable callable = new SyncMessageCallable(client,
encodeObject, timeout, timeUnit);
- syncCallables.put(encodeObject.getMessageId(), callable);
-
- Future<SendResult> future = threadPool.submit(callable);
- return future.get(timeout, timeUnit);
- }
-
- /**
- * sync send
- *
- * @param encodeObject
- * @param msgUUID
- * @param timeout
- * @param timeUnit
- * @return
- */
- public String syncSendMessageIndex(EncodeObject encodeObject, String
msgUUID, long timeout, TimeUnit timeUnit) {
- try {
- SendResult message = null;
- NettyClient client = chooseProxy.get(encodeObject.getMessageId());
- String proxyip = encodeObject.getProxyIp();
- if (proxyip != null && proxyip.length() != 0) {
- client = clientMgr.getContainProxy(proxyip);
- }
- if (isNotValidateAttr(encodeObject.getCommonattr(),
encodeObject.getAttributes())) {
- LOGGER.error("error attr format {} {}",
encodeObject.getCommonattr(),
- encodeObject.getAttributes());
- return SendResult.INVALID_ATTRIBUTES.toString();
- }
- try {
- message = syncSendMessageIndexInternal(client, encodeObject,
- msgUUID, timeout, timeUnit);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- LOGGER.error("send message error {}", getExceptionStack(e));
- syncCallables.remove(encodeObject.getMessageId());
- return SendResult.THREAD_INTERRUPT.toString();
- } catch (ExecutionException e) {
- // TODO Auto-generated catch block
- LOGGER.error("ExecutionException {}", getExceptionStack(e));
- syncCallables.remove(encodeObject.getMessageId());
- return SendResult.UNKOWN_ERROR.toString();
- } catch (TimeoutException e) {
- // TODO Auto-generated catch block
- LOGGER.error("TimeoutException {}", getExceptionStack(e));
- // e.printStackTrace();
- SyncMessageCallable syncMessageCallable =
syncCallables.remove(encodeObject.getMessageId());
- if (syncMessageCallable != null) {
- NettyClient tmpClient = syncMessageCallable.getClient();
- if (tmpClient != null) {
- Channel curChannel = tmpClient.getChannel();
- if (curChannel != null) {
- LOGGER.error("channel maybe busy {}", curChannel);
- scanThread.addTimeoutChannel(curChannel);
- }
- }
- }
- return SendResult.TIMEOUT.toString();
- } catch (Throwable e) {
- LOGGER.error("syncSendMessage exception {}",
getExceptionStack(e));
- syncCallables.remove(encodeObject.getMessageId());
- return SendResult.UNKOWN_ERROR.toString();
- }
- scanThread.resetTimeoutChannel(client.getChannel());
- return message.toString() + "=" + client.getServerIP();
- } catch (Exception e) {
- LOGGER.error("agent send error {}", getExceptionStack(e));
- syncCallables.remove(encodeObject.getMessageId());
- return SendResult.UNKOWN_ERROR.toString();
- }
- }
-
- /**
- * async send message index
- *
- * @param encodeObject
- * @param callback
- * @param msgUUID
- * @param timeout
- * @param timeUnit
- * @throws ProxysdkException
- */
- public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback
callback, String msgUUID, long timeout,
- TimeUnit timeUnit) throws ProxysdkException {
- NettyClient client = chooseProxy.get(encodeObject.getMessageId());
- String proxyip = encodeObject.getProxyIp();
- if (proxyip != null && proxyip.length() != 0) {
- client = clientMgr.getContainProxy(proxyip);
- }
- if (client == null || !client.isActive()) {
- chooseProxy.remove(encodeObject.getMessageId());
- client = clientMgr.getClientByRoundRobin();
- if (client == null) {
- throw new
ProxysdkException(SendResult.NO_CONNECTION.toString());
- }
- chooseProxy.put(encodeObject.getMessageId(), client);
- }
- if (currentBufferSize.get() >= asyncCallbackMaxSize) {
- throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
- }
- int size = 1;
- if (isFile) {
- if (encodeObject.getBodyBytes() != null) {
- size = encodeObject.getBodyBytes().length;
- } else {
- for (byte[] bytes : encodeObject.getBodylist()) {
- size = size + bytes.length;
- }
- }
- if (currentBufferSize.addAndGet(size) >= asyncCallbackMaxSize) {
- currentBufferSize.addAndGet(-size);
- throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
- }
-
- } else {
- if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
- currentBufferSize.decrementAndGet();
- throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
- }
- }
- ConcurrentHashMap<String, QueueObject> tmpCallBackMap = new
ConcurrentHashMap<>();
- ConcurrentHashMap<String, QueueObject> msgQueueMap =
callbacks.putIfAbsent(
- client.getChannel(), tmpCallBackMap);
- if (msgQueueMap == null) {
- msgQueueMap = tmpCallBackMap;
- }
- msgQueueMap.put(encodeObject.getMessageId(), new
QueueObject(System.currentTimeMillis(),
- callback, size, timeout, timeUnit));
- if (encodeObject.getMsgtype() == 7) {
- int groupIdnum = 0;
- int streamIdnum = 0;
- if ((clientMgr.getGroupId().length() != 0) &&
(encodeObject.getGroupId().equals(clientMgr.getGroupId()))) {
- groupIdnum = clientMgr.getGroupIdNum();
- streamIdnum =
(clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null)
- ?
clientMgr.getStreamIdMap().get(encodeObject.getStreamId())
- : 0;
- }
- encodeObject.setGroupIdNum(groupIdnum);
- encodeObject.setStreamIdNum(streamIdnum);
- if (groupIdnum == 0 || streamIdnum == 0) {
- encodeObject.setGroupIdTransfer(false);
- }
- }
- if (this.configure.isNeedDataEncry()) {
- encodeObject.setEncryptEntry(true, configure.getUserName(),
clientMgr.getEncryptConfigEntry());
- } else {
- encodeObject.setEncryptEntry(false, null, null);
- }
- encodeObject.setMsgUUID(msgUUID);
- client.write(encodeObject);
- }
-
/**
* whether is validate
*
@@ -539,24 +345,9 @@ public class Sender {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
int size = 1;
- if (isFile) {
- if (encodeObject.getBodyBytes() != null) {
- size = encodeObject.getBodyBytes().length;
- } else {
- for (byte[] bytes : encodeObject.getBodylist()) {
- size = size + bytes.length;
- }
- }
- if (currentBufferSize.addAndGet(size) >= asyncCallbackMaxSize) {
- currentBufferSize.addAndGet(-size);
- throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
- }
-
- } else {
- if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
- currentBufferSize.decrementAndGet();
- throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
- }
+ if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
+ currentBufferSize.decrementAndGet();
+ throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
}
ConcurrentHashMap<String, QueueObject> msgQueueMap =
callbacks.computeIfAbsent(client.getChannel(), (k) -> new
ConcurrentHashMap<>());
@@ -623,14 +414,8 @@ public class Sender {
if (queueObject == null) {
continue;
}
- if (isFile) {
- ((FileCallback) queueObject.getCallback())
-
.onMessageAck(SendResult.CONNECTION_BREAK.toString());
- currentBufferSize.addAndGet(-queueObject.getSize());
- } else {
-
queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK);
- currentBufferSize.decrementAndGet();
- }
+
queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK);
+ currentBufferSize.decrementAndGet();
}
msgQueueMap.clear();
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 270531bf5b..e735850360 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sdk.dataproxy.threads;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.metric.MessageRecord;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
@@ -192,7 +192,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
callBack.increaseRetry();
try {
if (callBack.getRetryCount() < 4) {
- sender.asyncSendMessageIndex(encodeObject, callBack,
+ sender.asyncSendMessage(encodeObject, callBack,
String.valueOf(System.currentTimeMillis()), 20,
TimeUnit.SECONDS);
} else {
logger.error("Send metric failure: {} {}",
encodeObject.getBodyBytes(), encodeObject.getBodylist());
@@ -267,7 +267,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
}
}
- private class MetricSendCallBack extends FileCallback {
+ private class MetricSendCallBack implements SendMessageCallback {
private final EncodeObject encodeObject;
private int retryCount = 0;
@@ -285,17 +285,17 @@ public class MetricWorkerThread extends Thread implements
Closeable {
}
@Override
- public void onMessageAck(String result) {
- if (!SendResult.OK.toString().equals(result)) {
- tryToSendMetricToManager(encodeObject, this);
- } else {
+ public void onMessageAck(SendResult result) {
+ if (!SendResult.OK.equals(result)) {
logger.debug("Send metric is ok!");
+ } else {
+ tryToSendMetricToManager(encodeObject, this);
}
}
@Override
- public void onMessageAck(SendResult result) {
-
+ public void onException(Throwable e) {
+ //
}
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
index 8c77eae109..f9e4980264 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sdk.dataproxy.threads;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.network.QueueObject;
@@ -143,13 +142,8 @@ public class TimeoutScanThread extends Thread {
// remove it before callback
QueueObject queueObject1 =
messageIdCallbacks.remove(messageId);
if (queueObject1 != null) {
- if (config.isFile()) {
- ((FileCallback)
queueObject1.getCallback()).onMessageAck(SendResult.TIMEOUT.toString());
- currentBufferSize.addAndGet(-queueObject1.getSize());
- } else {
-
queueObject1.getCallback().onMessageAck(SendResult.TIMEOUT);
- currentBufferSize.decrementAndGet();
- }
+
queueObject1.getCallback().onMessageAck(SendResult.TIMEOUT);
+ currentBufferSize.decrementAndGet();
}
addTimeoutChannel(channel);
}