This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 004c2be5e4 [INLONG-11670][SDK] Rename the ProxysdkException class name
to ProxySdkException (#11671)
004c2be5e4 is described below
commit 004c2be5e45806618cb4f2dfe57a3365423d41c5
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jan 15 11:10:36 2025 +0800
[INLONG-11670][SDK] Rename the ProxysdkException class name to
ProxySdkException (#11671)
* [INLONG-11670][SDK] Rename the ProxysdkException class name to
ProxySdkException
* [INLONG-11670][SDK] Rename the ProxysdkException class name to
ProxySdkException
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../plugin/sinks/filecollect/SenderManager.java | 4 +-
.../inlong/common/msg/AttributeConstants.java | 10 +++-
inlong-sdk/dataproxy-sdk/pom.xml | 22 ++++++++
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 58 +++++++++++-----------
.../apache/inlong/sdk/dataproxy/MessageSender.java | 18 +++----
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 32 ++++++------
.../sdk/dataproxy/example/HttpClientExample.java | 4 +-
.../ProxySdkException.java} | 24 ++++++---
.../inlong/sdk/dataproxy/network/IpUtils.java | 4 +-
.../inlong/sdk/dataproxy/network/Sender.java | 17 ++++---
.../sdk/dataproxy/pb/PbProtocolMessageSender.java | 26 +++++-----
11 files changed, 131 insertions(+), 88 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 9ac9083ad8..9ef20bdf4b 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -35,7 +35,7 @@ import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
@@ -271,7 +271,7 @@ public class SenderManager {
private void asyncSendByMessageSender(SendMessageCallback cb,
List<byte[]> bodyList, String groupId, String streamId, long
dataTime, String msgUUID,
- Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxysdkException {
+ Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxySdkException {
sender.asyncSendMessage(cb, bodyList, groupId,
streamId, dataTime, msgUUID, extraAttrMap, isProxySend);
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 3402fc6455..974579d16e 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -62,6 +62,9 @@ public interface AttributeConstants {
/* from where */
String FROM = "f";
+ /* msg uuid */
+ String MSG_UUID = "msgUUID";
+
// whether to return a response, false: not need, true or not exist: need
String MESSAGE_IS_ACK = "isAck";
@@ -101,11 +104,14 @@ public interface AttributeConstants {
// Message reporting time, in milliseconds
// Provided by the initial sender of the data, and passed to
- // the downstream by the Bus without modification for the downstream to
+ // the downstream by the DataProxy without modification for the downstream
to
// calculate the end-to-end message delay; if this field does not exist in
the request,
- // it will be added by the Bus with the current time
+ // it will be added by the DataProxy with the current time
String MSG_RPT_TIME = "rtms";
+ // inlong sdk version
+ String PROXY_SDK_VERSION = "sdkVersion";
+
// Audit version is used for audit to reconciliation
String AUDIT_VERSION = "auditVersion";
}
diff --git a/inlong-sdk/dataproxy-sdk/pom.xml b/inlong-sdk/dataproxy-sdk/pom.xml
index 0041797502..a5943dd52f 100644
--- a/inlong-sdk/dataproxy-sdk/pom.xml
+++ b/inlong-sdk/dataproxy-sdk/pom.xml
@@ -174,6 +174,28 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>io.github.git-commit-id</groupId>
+ <artifactId>git-commit-id-maven-plugin</artifactId>
+ <version>4.9.9</version>
+ <configuration>
+ <generateGitPropertiesFile>true</generateGitPropertiesFile>
+
<generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename>
+ <includeOnlyProperties>
+
<includeOnlyProperty>^git.build.(version)$</includeOnlyProperty>
+ </includeOnlyProperties>
+ <commitIdGenerationMode>full</commitIdGenerationMode>
+ </configuration>
+ <executions>
+ <execution>
+ <id>get-the-git-infos</id>
+ <goals>
+ <goal>revision</goal>
+ </goals>
+ <phase>initialize</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 02157fe95a..92dec7b125 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -25,7 +25,7 @@ import
org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
@@ -443,25 +443,25 @@ public class DefaultMessageSender implements
MessageSender {
@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, Map<String, String> extraAttrMap) throws
ProxysdkException {
+ String msgUUID, Map<String, String> extraAttrMap) throws
ProxySdkException {
asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
extraAttrMap, false);
}
@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID) throws ProxysdkException {
+ String msgUUID) throws ProxySdkException {
asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
false);
}
@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
- long dt, String msgUUID) throws ProxysdkException {
+ long dt, String msgUUID) throws ProxySdkException {
asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
false);
}
@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
- long dt, String msgUUID, Map<String, String> extraAttrMap) throws
ProxysdkException {
+ long dt, String msgUUID, Map<String, String> extraAttrMap) throws
ProxySdkException {
asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID,
extraAttrMap, false);
}
@@ -529,16 +529,16 @@ public class DefaultMessageSender implements
MessageSender {
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId,
- String streamId, long dt, String msgUUID, boolean isProxySend)
throws ProxysdkException {
+ String streamId, long dt, String msgUUID, boolean isProxySend)
throws ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
- throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
+ throw new
ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
- throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ throw new
ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, 1);
@@ -584,16 +584,16 @@ public class DefaultMessageSender implements
MessageSender {
* @param msgUUID msg uuid
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, Map<String, String> extraAttrMap, boolean
isProxySend) throws ProxysdkException {
+ String msgUUID, Map<String, String> extraAttrMap, boolean
isProxySend) throws ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
- throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
+ throw new
ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
- throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ throw new
ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, 1);
if (isProxySend) {
@@ -635,16 +635,16 @@ public class DefaultMessageSender implements
MessageSender {
* @param dt data report time
* @param msgUUID msg uuid
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList,
- String groupId, String streamId, long dt, String msgUUID, boolean
isProxySend) throws ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID, boolean
isProxySend) throws ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
- throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
+ throw new
ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
- throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ throw new
ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
String proxySend = "";
@@ -690,18 +690,18 @@ public class DefaultMessageSender implements
MessageSender {
* @param msgUUID msg uuid
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxysdkException {
+ Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(
extraAttrMap)) {
- throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
+ throw new
ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
- throw new
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
+ throw new
ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
if (isProxySend) {
@@ -738,11 +738,11 @@ public class DefaultMessageSender implements
MessageSender {
* @param inlongStreamId
* @param body
* @param callback
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback)
- throws ProxysdkException {
+ throws ProxySdkException {
this.asyncSendMessage(callback, body, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId());
}
@@ -755,10 +755,10 @@ public class DefaultMessageSender implements
MessageSender {
* @param body a single message
* @param callback callback can be null
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback,
- boolean isProxySend) throws ProxysdkException {
+ boolean isProxySend) throws ProxySdkException {
this.asyncSendMessage(callback, body, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), isProxySend);
}
@@ -770,11 +770,11 @@ public class DefaultMessageSender implements
MessageSender {
* @param inlongStreamId streamId
* @param bodyList list of messages
* @param callback callback can be null
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
- SendMessageCallback callback) throws ProxysdkException {
+ SendMessageCallback callback) throws ProxySdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId());
}
@@ -787,10 +787,10 @@ public class DefaultMessageSender implements
MessageSender {
* @param bodyList list of messages
* @param callback callback can be null
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
- SendMessageCallback callback, boolean isProxySend) throws
ProxysdkException {
+ SendMessageCallback callback, boolean isProxySend) throws
ProxySdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), isProxySend);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 2a4ae6313a..862586ab77 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import java.util.List;
import java.util.Map;
@@ -79,7 +79,7 @@ public interface MessageSender {
*/
void asyncSendMessage(SendMessageCallback callback,
byte[] body, String groupId, String streamId, long dt, String
msgUUID,
- Map<String, String> extraAttrMap) throws ProxysdkException;
+ Map<String, String> extraAttrMap) throws ProxySdkException;
/**
* This method provides an asynchronized function which you want to send
data without packing
@@ -89,7 +89,7 @@ public interface MessageSender {
* @param body The data will be sent
*/
void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String groupId, String streamId, long dt, String
msgUUID) throws ProxysdkException;
+ byte[] body, String groupId, String streamId, long dt, String
msgUUID) throws ProxySdkException;
/**
* This method provides an asynchronized function which you want to send
data with packing
@@ -98,7 +98,7 @@ public interface MessageSender {
* @param bodyList The data will be sent,which is a collection consisting
of byte arrays
*/
void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID) throws ProxysdkException;
+ List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID) throws ProxySdkException;
/**
* This method provides an asynchronized function which you want to send
data with packing
@@ -111,7 +111,7 @@ public interface MessageSender {
*/
void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- Map<String, String> extraAttrMap) throws ProxysdkException;
+ Map<String, String> extraAttrMap) throws ProxySdkException;
/**
* This method provides an asynchronized function which you want to send
data.<br>
@@ -121,10 +121,10 @@ public interface MessageSender {
* @param inlongStreamId
* @param body
* @param callback callback can be null
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[]
body, SendMessageCallback callback)
- throws ProxysdkException;
+ throws ProxySdkException;
/**
* This method provides an asynchronized function which you want to send
datas.<br>
@@ -134,9 +134,9 @@ public interface MessageSender {
* @param inlongStreamId
* @param bodyList
* @param callback callback can be null
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
- SendMessageCallback callback) throws ProxysdkException;
+ SendMessageCallback callback) throws ProxySdkException;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 1d03ee5b3a..fedf925aa1 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -17,9 +17,9 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@@ -99,18 +99,18 @@ public class ProxyClientConfig {
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String localHost, boolean visitManagerByHttp,
String managerIp,
- int managerPort, String inlongGroupId, String authSecretId, String
authSecretKey) throws ProxysdkException {
+ int managerPort, String inlongGroupId, String authSecretId, String
authSecretKey) throws ProxySdkException {
if (StringUtils.isBlank(localHost)) {
- throw new ProxysdkException("localHost is blank!");
+ throw new ProxySdkException("localHost is blank!");
}
if (StringUtils.isBlank(managerIp)) {
- throw new ProxysdkException("managerIp is Blank!");
+ throw new ProxySdkException("managerIp is Blank!");
}
if (managerPort <= 0) {
- throw new ProxysdkException("managerPort <= 0!");
+ throw new ProxySdkException("managerPort <= 0!");
}
if (StringUtils.isBlank(inlongGroupId)) {
- throw new ProxysdkException("groupId is blank!");
+ throw new ProxySdkException("groupId is blank!");
}
this.inlongGroupId = inlongGroupId.trim();
this.visitManagerByHttp = visitManagerByHttp;
@@ -126,10 +126,10 @@ public class ProxyClientConfig {
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String managerAddress,
- String inlongGroupId, String authSecretId, String authSecretKey)
throws ProxysdkException {
+ String inlongGroupId, String authSecretId, String authSecretKey)
throws ProxySdkException {
checkAndParseAddress(managerAddress);
if (StringUtils.isBlank(inlongGroupId)) {
- throw new ProxysdkException("groupId is blank!");
+ throw new ProxySdkException("groupId is blank!");
}
this.inlongGroupId = inlongGroupId.trim();
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
@@ -519,11 +519,11 @@ public class ProxyClientConfig {
this.senderMaxAttempt = senderMaxAttempt;
}
- private void checkAndParseAddress(String managerAddress) throws
ProxysdkException {
+ private void checkAndParseAddress(String managerAddress) throws
ProxySdkException {
if (StringUtils.isBlank(managerAddress)
|| (!managerAddress.startsWith(ConfigConstants.HTTP)
&& !managerAddress.startsWith(ConfigConstants.HTTPS)))
{
- throw new ProxysdkException("managerAddress is blank or missing
http/https protocol");
+ throw new ProxySdkException("managerAddress is blank or missing
http/https protocol");
}
String hostPortInfo;
if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
@@ -533,25 +533,25 @@ public class ProxyClientConfig {
hostPortInfo =
managerAddress.substring(ConfigConstants.HTTP.length());
}
if (StringUtils.isBlank(hostPortInfo)) {
- throw new ProxysdkException("managerAddress must include host:port
info!");
+ throw new ProxySdkException("managerAddress must include host:port
info!");
}
String[] fields = hostPortInfo.split(":");
if (fields.length == 1) {
- throw new ProxysdkException("managerAddress must include port
info!");
+ throw new ProxySdkException("managerAddress must include port
info!");
} else if (fields.length > 2) {
- throw new ProxysdkException("managerAddress must only include
host:port info!");
+ throw new ProxySdkException("managerAddress must only include
host:port info!");
}
if (StringUtils.isBlank(fields[0])) {
- throw new ProxysdkException("managerAddress's host is blank!");
+ throw new ProxySdkException("managerAddress's host is blank!");
}
this.managerIP = fields[0].trim();
if (StringUtils.isBlank(fields[1])) {
- throw new ProxysdkException("managerAddress's port is blank!");
+ throw new ProxySdkException("managerAddress's port is blank!");
}
try {
this.managerPort = Integer.parseInt(fields[1]);
} catch (Throwable ex) {
- throw new ProxysdkException("managerAddress's port must be
number!");
+ throw new ProxySdkException("managerAddress's port must be
number!");
}
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 2ba1938409..c22c3aed98 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -19,8 +19,8 @@ package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.network.HttpProxySender;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import java.util.ArrayList;
import java.util.List;
@@ -59,7 +59,7 @@ public class HttpClientExample {
proxyConfig.setDiscardOldMessage(true);
proxyConfig.setProtocolType(ProtocolType.HTTP);
sender = new HttpProxySender(proxyConfig);
- } catch (ProxysdkException e) {
+ } catch (ProxySdkException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxySdkException.java
similarity index 50%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxySdkException.java
index 29de17dcf2..c8b806c453 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxySdkException.java
@@ -15,22 +15,34 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy.network;
+package org.apache.inlong.sdk.dataproxy.exception;
-public class ProxysdkException extends Exception {
+/**
+ * Proxy Sdk Exception
+ *
+ * Used for unacceptable situations when reporting messages, such as empty
input parameters,
+ * illegal parameters, abnormal execution status, and exceptions encountered
during execution that
+ * were not considered during design, etc.
+ *
+ * If this exception is thrown during the debugging phase, the caller needs to
check and
+ * adjust the corresponding implementation according to the exception content;
if the exception
+ * is encountered during operation; the caller can try a limited number of
times,
+ * and discard this report if it fails after trying again.
+ */
+public class ProxySdkException extends Exception {
- public ProxysdkException() {
+ public ProxySdkException() {
}
- public ProxysdkException(String message) {
+ public ProxySdkException(String message) {
super(message);
}
- public ProxysdkException(String message, Throwable cause) {
+ public ProxySdkException(String message, Throwable cause) {
super(message, cause);
}
- public ProxysdkException(Throwable cause) {
+ public ProxySdkException(Throwable cause) {
super(cause);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
index f3fbab04aa..90a0716772 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.dataproxy.network;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.HmacUtils;
import org.slf4j.Logger;
@@ -54,7 +56,7 @@ public class IpUtils {
return ip;
}
- public static boolean validLocalIp(String currLocalHost) throws
ProxysdkException {
+ public static boolean validLocalIp(String currLocalHost) throws
ProxySdkException {
String ip = "127.0.0.1";
try (DatagramSocket socket = new DatagramSocket()) {
socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 5ee4c93e4b..088ecb3d60 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -308,13 +309,13 @@ public class Sender {
* Following methods used by asynchronously message sending.
*/
public void asyncSendMessage(EncodeObject encodeObject,
- SendMessageCallback callback, String msgUUID) throws
ProxysdkException {
+ SendMessageCallback callback, String msgUUID) throws
ProxySdkException {
if (!started.get()) {
if (callback != null) {
callback.onMessageAck(SendResult.SENDER_CLOSED);
return;
} else {
- throw new
ProxysdkException(SendResult.SENDER_CLOSED.toString());
+ throw new
ProxySdkException(SendResult.SENDER_CLOSED.toString());
}
}
if (configure.isEnableMetric()) {
@@ -331,7 +332,7 @@ public class Sender {
callback.onMessageAck(SendResult.MAX_FLIGHT_ON_ALL_CONNECTION);
return;
} else {
- throw new
ProxysdkException(SendResult.MAX_FLIGHT_ON_ALL_CONNECTION.toString());
+ throw new
ProxySdkException(SendResult.MAX_FLIGHT_ON_ALL_CONNECTION.toString());
}
}
if (clientResult.getF0() != SendResult.OK) {
@@ -339,7 +340,7 @@ public class Sender {
callback.onMessageAck(clientResult.getF0());
return;
} else {
- throw new ProxysdkException(clientResult.getF0().toString());
+ throw new ProxySdkException(clientResult.getF0().toString());
}
}
if (!clientResult.getF1().getChannel().isWritable()) {
@@ -352,7 +353,7 @@ public class Sender {
callback.onMessageAck(SendResult.WRITE_OVER_WATERMARK);
return;
} else {
- throw new
ProxysdkException(SendResult.WRITE_OVER_WATERMARK.toString());
+ throw new
ProxySdkException(SendResult.WRITE_OVER_WATERMARK.toString());
}
}
if (currentBufferSize.get() >= asyncCallbackMaxSize) {
@@ -361,7 +362,7 @@ public class Sender {
callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL);
return;
} else {
- throw new
ProxysdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString());
+ throw new
ProxySdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString());
}
}
if (isNotValidateAttr(encodeObject.getCommonattr(),
encodeObject.getAttributes())) {
@@ -374,7 +375,7 @@ public class Sender {
callback.onMessageAck(SendResult.INVALID_ATTRIBUTES);
return;
} else {
- throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
+ throw new
ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
}
int size = 1;
@@ -385,7 +386,7 @@ public class Sender {
callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL);
return;
} else {
- throw new
ProxysdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString());
+ throw new
ProxySdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString());
}
}
ConcurrentHashMap<String, QueueObject> msgQueueMap =
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
index fc584f5a5a..aaf33941f3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
@@ -22,7 +22,7 @@ import org.apache.inlong.sdk.commons.protocol.SdkEvent;
import org.apache.inlong.sdk.dataproxy.MessageSender;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.pb.channel.BufferQueueChannel;
import org.apache.inlong.sdk.dataproxy.pb.context.CallbackProfile;
@@ -336,12 +336,12 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param dt
* @param msgUUID
* @param extraAttrMap
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
String msgUUID, Map<String, String> extraAttrMap)
- throws ProxysdkException {
+ throws ProxySdkException {
SdkEvent sdkEvent = new SdkEvent();
sdkEvent.setInlongGroupId(groupId);
sdkEvent.setInlongStreamId(streamId);
@@ -364,11 +364,11 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
- String groupId, String streamId, long dt, String msgUUID) throws
ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID) throws
ProxySdkException {
this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
null);
}
@@ -381,11 +381,11 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList,
- String groupId, String streamId, long dt, String msgUUID) throws
ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID) throws
ProxySdkException {
this.asyncSendMessage(callback, bodyList, groupId, streamId, dt,
msgUUID, null);
}
@@ -399,12 +399,12 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param dt
* @param msgUUID
* @param extraAttrMap
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ Map<String, String> extraAttrMap) throws ProxySdkException {
List<CallbackProfile> events = new ArrayList<>(bodyList.size());
for (byte[] body : bodyList) {
SdkEvent sdkEvent = new SdkEvent();
@@ -429,11 +429,11 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param inlongStreamId
* @param body
* @param callback
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback)
- throws ProxysdkException {
+ throws ProxySdkException {
this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(), null, null);
}
@@ -444,11 +444,11 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param inlongStreamId
* @param bodyList
* @param callback
- * @throws ProxysdkException
+ * @throws ProxySdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
- SendMessageCallback callback) throws ProxysdkException {
+ SendMessageCallback callback) throws ProxySdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(), null,
null);
}