This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3aa8d284bb [INLONG-9079][SDK] Shaded some dependencies to reduce
conflicts with flink sdk (#9101)
3aa8d284bb is described below
commit 3aa8d284bb68d73285fb370d78623157b4372a89
Author: castor <[email protected]>
AuthorDate: Thu Nov 9 14:56:30 2023 +0800
[INLONG-9079][SDK] Shaded some dependencies to reduce conflicts with flink
sdk (#9101)
Co-authored-by: castorqin <[email protected]>
---
.../inlong/agent/plugin/sinks/SenderManager.java | 4 +-
.../plugin/sinks/filecollect/SenderManager.java | 4 +-
.../sinks/filecollect/TestSenderManager.java | 4 +-
inlong-sdk/dataproxy-sdk/pom.xml | 71 +++++++++++++++++++---
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 7 ++-
.../apache/inlong/sdk/dataproxy/MessageSender.java | 2 +
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 31 ++++++----
.../inlong/sdk/dataproxy/codec/EncodeObject.java | 2 +-
.../sdk/dataproxy/{ => common}/FileCallback.java | 2 +-
.../{ => common}/SendMessageCallback.java | 2 +-
.../sdk/dataproxy/{ => common}/SendResult.java | 2 +-
.../sdk/dataproxy/config/ProxyConfigManager.java | 27 ++++----
.../sdk/dataproxy/example/HttpClientExample.java | 9 ++-
.../sdk/dataproxy/example/MyFileCallBack.java | 4 +-
.../sdk/dataproxy/example/MyMessageCallBack.java | 4 +-
.../sdk/dataproxy/example/SendMsgThread.java | 2 +-
.../sdk/dataproxy/example/TcpClientExample.java | 11 ++--
.../sdk/dataproxy/http/InternalHttpSender.java | 2 +-
.../inlong/sdk/dataproxy/network/ClientMgr.java | 6 +-
.../inlong/sdk/dataproxy/network/HttpMessage.java | 2 +-
.../sdk/dataproxy/network/HttpProxySender.java | 6 +-
.../inlong/sdk/dataproxy/network/QueueObject.java | 2 +-
.../inlong/sdk/dataproxy/network/Sender.java | 6 +-
.../sdk/dataproxy/network/SyncMessageCallable.java | 2 +-
.../sdk/dataproxy/pb/PbProtocolMessageSender.java | 4 +-
.../sdk/dataproxy/pb/SdkProxyChannelManager.java | 2 +-
.../sdk/dataproxy/pb/context/CallbackProfile.java | 2 +-
.../sdk/dataproxy/threads/MetricWorkerThread.java | 4 +-
.../sdk/dataproxy/threads/TimeoutScanThread.java | 4 +-
inlong-sort/sort-flink/sort-flink-v1.13/pom.xml | 3 +-
inlong-sort/sort-flink/sort-flink-v1.15/pom.xml | 2 +-
pom.xml | 1 +
32 files changed, 154 insertions(+), 82 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 6b05f23fb7..7e69d5d94d 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -32,8 +32,8 @@ import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 5112471d77..abb21fe8ba 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -35,8 +35,8 @@ import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import io.netty.util.concurrent.DefaultThreadFactory;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index bf96afac74..d7c03bf099 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -27,8 +27,8 @@ import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.junit.AfterClass;
import org.junit.Assert;
diff --git a/inlong-sdk/dataproxy-sdk/pom.xml b/inlong-sdk/dataproxy-sdk/pom.xml
index 9c86763830..97e101fbd7 100644
--- a/inlong-sdk/dataproxy-sdk/pom.xml
+++ b/inlong-sdk/dataproxy-sdk/pom.xml
@@ -89,6 +89,31 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>${apache.thrift.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -98,15 +123,47 @@
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
- <artifactId>maven-release-plugin</artifactId>
- <version>2.5.3</version>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
<configuration>
- <autoVersionSubmodules>true</autoVersionSubmodules>
- <tagNameFormat>v@{project.version}</tagNameFormat>
- <releaseProfiles>release</releaseProfiles>
- <arguments>-Dmaven.javadoc.skip=true
-Dmaven.test.skipTests=true -Dmaven.test.skip=true
- -Dmaven.deploy.skip=true</arguments>
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+ <include>com.fasterxml.*:*</include>
+ <include>org.apache.commons:*</include>
+ <include>io.netty:*</include>
+ <inclue>org.apache.thrift:*</inclue>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+
<shadedPattern>org.apache.inlong.dataproxy.shaded.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+
<shadedPattern>org.apache.inlong.dataproxy.shaded.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+
<shadedPattern>org.apache.inlong.dataproxy.shaded.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.thrift</pattern>
+
<shadedPattern>org.apache.inlong.dataproxy.shaded.org.apache.thrift</shadedPattern>
+ </relocation>
+ </relocations>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+ <minimizeJar>false</minimizeJar>
</configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
</plugin>
</plugins>
</build>
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 448c1a4a70..8359e54f2d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -21,6 +21,9 @@ import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
@@ -70,7 +73,7 @@ public class DefaultMessageSender implements MessageSender {
public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory
selfDefineFactory) throws Exception {
ProxyUtils.validClientConfig(configure);
sender = new Sender(configure, selfDefineFactory);
- groupId = configure.getGroupId();
+ groupId = configure.getInlongGroupId();
indexCol = new IndexCollectThread(storeIndex);
indexCol.start();
@@ -111,7 +114,7 @@ public class DefaultMessageSender implements MessageSender {
// initial sender object
ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(configure,
Utils.getLocalIp(), null);
- proxyConfigManager.setGroupId(configure.getGroupId());
+ proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
if (sender != null) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 8f4782be2e..155031bee6 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import java.util.List;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 7601e4e885..f5552b17f9 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -37,7 +37,7 @@ public class ProxyClientConfig {
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
private String netTag;
- private String groupId;
+ private String inlongGroupId;
private boolean isFile = false;
private boolean isLocalVisit = true;
private boolean isNeedDataEncry = false;
@@ -103,7 +103,7 @@ public class ProxyClientConfig {
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String localHost, boolean isLocalVisit, String
managerIp,
- int managerPort, String groupId, String netTag, String
authSecretId, String authSecretKey,
+ int managerPort, String inlongGroupId, String netTag, String
authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws
ProxysdkException {
if (Utils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
@@ -111,12 +111,12 @@ public class ProxyClientConfig {
if (Utils.isBlank(managerIp)) {
throw new IllegalArgumentException("managerIp is Blank!");
}
- if (Utils.isBlank(groupId)) {
+ if (Utils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
this.proxyIPServiceURL =
- "http://" + managerIp + ":" + managerPort +
ConfigConstants.MANAGER_DATAPROXY_API + groupId;
- this.groupId = groupId;
+ getProxyIPServiceURL(managerIp, managerPort, inlongGroupId,
isLocalVisit);
+ this.inlongGroupId = inlongGroupId;
this.netTag = netTag;
this.isLocalVisit = isLocalVisit;
this.managerPort = managerPort;
@@ -137,9 +137,18 @@ public class ProxyClientConfig {
this.maxRetry = maxRetry;
}
- public ProxyClientConfig(String localHost, boolean isLocalVisit, String
managerIp, int managerPort, String groupId,
+ private String getProxyIPServiceURL(String managerIp, int managerPort,
String inlongGroupId, boolean isLocalVisit) {
+ String protocolType = "http://";
+ if (!isLocalVisit) {
+ protocolType = "https://";
+ }
+ return protocolType + managerIp + ":" + managerPort +
ConfigConstants.MANAGER_DATAPROXY_API + inlongGroupId;
+ }
+
+ public ProxyClientConfig(String localHost, boolean isLocalVisit, String
managerIp, int managerPort,
+ String inlongGroupId,
String netTag, String authSecretId, String authSecretKey) throws
ProxysdkException {
- this(localHost, isLocalVisit, managerIp, managerPort, groupId, netTag,
authSecretId, authSecretKey,
+ this(localHost, isLocalVisit, managerIp, managerPort, inlongGroupId,
netTag, authSecretId, authSecretKey,
ConfigConstants.DEFAULT_LOAD_BALANCE,
ConfigConstants.DEFAULT_VIRTUAL_NODE,
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
}
@@ -164,12 +173,12 @@ public class ProxyClientConfig {
isFile = file;
}
- public String getGroupId() {
- return groupId;
+ public String getInlongGroupId() {
+ return inlongGroupId;
}
- public void setGroupId(String groupId) {
- this.groupId = groupId;
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
}
public int getManagerPort() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
index 32a5655167..e875d01e77 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.dataproxy.codec;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import com.google.common.base.Splitter;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/FileCallback.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
similarity index 96%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/FileCallback.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
index ca46d1c406..8fce78257e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/FileCallback.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy;
+package org.apache.inlong.sdk.dataproxy.common;
public abstract class FileCallback implements SendMessageCallback {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendMessageCallback.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
similarity index 95%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendMessageCallback.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
index 5274a6d97d..fc80705031 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendMessageCallback.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy;
+package org.apache.inlong.sdk.dataproxy.common;
public interface SendMessageCallback {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
similarity index 96%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
index 494689f758..adab601e0a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy;
+package org.apache.inlong.sdk.dataproxy.common;
public enum SendResult {
INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 88049d7845..65762e2af4 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -40,7 +40,9 @@ import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
@@ -100,7 +102,7 @@ public class ProxyConfigManager extends Thread {
private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
/* the status of the cluster.if this value is changed,we need rechoose
three proxy */
private int oldStat = 0;
- private String groupId;
+ private String inlongGroupId;
private String localMd5;
private boolean bShutDown = false;
private long doworkTime = 0;
@@ -113,12 +115,12 @@ public class ProxyConfigManager extends Thread {
this.hashRing.setVirtualNode(configure.getVirtualNode());
}
- public String getGroupId() {
- return groupId;
+ public String getInlongGroupId() {
+ return inlongGroupId;
}
- public void setGroupId(String groupId) {
- this.groupId = groupId;
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
}
public void shutDown() {
@@ -170,7 +172,7 @@ public class ProxyConfigManager extends Thread {
if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
JsonReader reader = new JsonReader(new
FileReader(configCachePath));
ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader,
ProxyConfigEntry.class);
- LOGGER.info("{} has a backup! {}", groupId, proxyConfigEntry);
+ LOGGER.info("{} has a backup! {}", inlongGroupId,
proxyConfigEntry);
return proxyConfigEntry;
}
} catch (Exception ex) {
@@ -218,7 +220,7 @@ public class ProxyConfigManager extends Thread {
*/
public ProxyConfigEntry getGroupIdConfigure() throws Exception {
ProxyConfigEntry proxyEntry;
- String configAddr = clientConfig.getConfStoreBasePath() + groupId;
+ String configAddr = clientConfig.getConfStoreBasePath() +
inlongGroupId;
if (this.clientConfig.isReadProxyIPFromLocal()) {
configAddr = configAddr + ".local";
proxyEntry = getLocalProxyListFromFile(configAddr);
@@ -259,7 +261,7 @@ public class ProxyConfigManager extends Thread {
localMd5 = calcHostInfoMd5(proxyInfoList);
}
ProxyConfigEntry proxyEntry = null;
- String configAddr = clientConfig.getConfStoreBasePath() + groupId;
+ String configAddr = clientConfig.getConfStoreBasePath() +
inlongGroupId;
if (clientConfig.isReadProxyIPFromLocal()) {
configAddr = configAddr + ".local";
proxyEntry = getLocalProxyListFromFile(configAddr);
@@ -634,7 +636,7 @@ public class ProxyConfigManager extends Thread {
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
proxyEntry.setClusterId(clusterId);
- proxyEntry.setGroupId(clientConfig.getGroupId());
+ proxyEntry.setGroupId(clientConfig.getInlongGroupId());
proxyEntry.setInterVisit(isIntranet);
proxyEntry.setHostMap(hostMap);
proxyEntry.setSwitchStat(isSwitch);
@@ -740,11 +742,12 @@ public class ProxyConfigManager extends Thread {
httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
clientConfig.getAuthSecretKey()));
- StringEntity se = getEntity(params);
- httpPost.setEntity(se);
+ UrlEncodedFormEntity urlEncodedFormEntity = new
UrlEncodedFormEntity(params, "UTF-8");
+ httpPost.setEntity(urlEncodedFormEntity);
HttpResponse response = httpClient.execute(httpPost);
returnStr = EntityUtils.toString(response.getEntity());
- if (Utils.isNotBlank(returnStr) &&
response.getStatusLine().getStatusCode() == 200) {
+ if (Utils.isNotBlank(returnStr)
+ && response.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
LOGGER.info("Get configure from manager is " + returnStr);
return returnStr;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 593db7ceeb..539cbeaf69 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -38,7 +38,6 @@ public class HttpClientExample {
*
* 2. if 'isLocalVisit' is false sdk will get config from manager auto.
*/
- String dataProxyGroup = "test";
String inlongGroupId = "test_group_id";
String inlongStreamId = "test_stream_id";
String configBasePath = "/data/inlong/dataproxy/conf";
@@ -49,14 +48,14 @@ public class HttpClientExample {
String messageBody = "inlong message body!";
HttpProxySender sender = getMessageSender(localIP, inLongManagerAddr,
- inLongManagerPort, netTag, dataProxyGroup, false, false,
+ inLongManagerPort, netTag, inlongGroupId, false, false,
configBasePath);
sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody);
}
public static HttpProxySender getMessageSender(String localIP, String
inLongManagerAddr,
- String inLongManagerPort, String netTag, String dataProxyGroup,
+ String inLongManagerPort, String netTag, String inlongGroupId,
boolean isLocalVisit, boolean isReadProxyIPFromLocal,
String configBasePath) {
ProxyClientConfig proxyConfig = null;
@@ -64,8 +63,8 @@ public class HttpClientExample {
try {
proxyConfig = new ProxyClientConfig(localIP, isLocalVisit,
inLongManagerAddr,
Integer.valueOf(inLongManagerPort),
- dataProxyGroup, netTag, "test", "123456");
- proxyConfig.setGroupId(dataProxyGroup);
+ inlongGroupId, netTag, "test", "123456");
+ proxyConfig.setInlongGroupId(inlongGroupId);
proxyConfig.setConfStoreBasePath(configBasePath);
proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
proxyConfig.setDiscardOldMessage(true);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
index 2496331d33..3685d0ad53 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.FileCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
index 6fbb813bbf..7aef6e705c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.FileCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
index bb91d325f1..4c51696be0 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index 0b46195bf9..d4eee4a052 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -40,14 +40,13 @@ public class TcpClientExample {
*/
public static void main(String[] args) throws InterruptedException {
- String dataProxyGroup = "test_test";
String inlongGroupId = "test_test";
String inlongStreamId = "test_test";
String netTag = "";
/*
* 1. if isLocalVisit is true, will get dataproxy server info from
local file in
- * ${configBasePath}/${dataProxyGroup}.local file
+ * ${configBasePath}/${inlongGroupId}.local file
*
* for example: /data/inlong/config/test_test.local and file context
like this:
*
{"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1",
@@ -68,19 +67,19 @@ public class TcpClientExample {
TcpClientExample tcpClientExample = new TcpClientExample();
DefaultMessageSender sender = tcpClientExample
.getMessageSender(localIP, inLongManagerAddr,
inLongManagerPort, netTag,
- dataProxyGroup, false, false, configBasePath, msgType);
+ inlongGroupId, false, false, configBasePath, msgType);
tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId,
messageBody, System.currentTimeMillis());
}
public DefaultMessageSender getMessageSender(String localIP, String
inLongManagerAddr, String inLongManagerPort,
- String netTag, String dataProxyGroup, boolean isLocalVisit,
boolean isReadProxyIPFromLocal,
+ String netTag, String inlongGroupId, boolean isLocalVisit, boolean
isReadProxyIPFromLocal,
String configBasePath, int msgType) {
ProxyClientConfig dataProxyConfig = null;
DefaultMessageSender messageSender = null;
try {
dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit,
inLongManagerAddr,
- Integer.valueOf(inLongManagerPort), dataProxyGroup,
netTag, "test", "123456");
+ Integer.valueOf(inLongManagerPort), inlongGroupId, netTag,
"test", "123456");
if (StringUtils.isNotEmpty(configBasePath)) {
dataProxyConfig.setConfStoreBasePath(configBasePath);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
index b4da6b8a52..62f1e2289d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sdk.dataproxy.http;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
import org.apache.inlong.sdk.dataproxy.network.Utils;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index 8e1f878591..86ca8bd243 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -119,9 +119,9 @@ public class ClientMgr {
/* ready to Start the thread which refreshes the proxy list. */
ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(),
this);
ipManager.setName("proxyConfigManager");
- if (configure.getGroupId() != null) {
- ipManager.setGroupId(configure.getGroupId());
- groupId = configure.getGroupId();
+ if (configure.getInlongGroupId() != null) {
+ ipManager.setInlongGroupId(configure.getInlongGroupId());
+ groupId = configure.getInlongGroupId();
}
/*
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
index e6658fc8b4..da3bbf45e2 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import java.util.List;
import java.util.concurrent.TimeUnit;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 426a300949..4ef884ae8f 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -19,8 +19,8 @@ package org.apache.inlong.sdk.dataproxy.network;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -77,7 +77,7 @@ public class HttpProxySender extends Thread {
try {
proxyConfigManager = new ProxyConfigManager(configure,
Utils.getLocalIp(), null);
- proxyConfigManager.setGroupId(configure.getGroupId());
+ proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
hostList.addAll(proxyConfigEntry.getHostMap().values());
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
index 8bbaffc8e5..31e5332396 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import java.util.concurrent.TimeUnit;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 7c1608fe7d..bae96e2e4c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -17,11 +17,11 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.FileCallback;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
index d99e6eeec4..8e00ff8c3e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.SendResult;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
index 0a6ecc96d5..d9817b7a03 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
@@ -20,8 +20,8 @@ package org.apache.inlong.sdk.dataproxy.pb;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.sdk.commons.protocol.SdkEvent;
import org.apache.inlong.sdk.dataproxy.MessageSender;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.pb.channel.BufferQueueChannel;
import org.apache.inlong.sdk.dataproxy.pb.context.CallbackProfile;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/SdkProxyChannelManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/SdkProxyChannelManager.java
index 41c9b482e4..98fa0ed5cd 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/SdkProxyChannelManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/SdkProxyChannelManager.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sdk.dataproxy.pb;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.pb.context.SdkProfile;
import org.apache.inlong.sdk.dataproxy.pb.context.SdkSinkContext;
import org.apache.inlong.sdk.dataproxy.pb.dispatch.DispatchProfile;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/CallbackProfile.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/CallbackProfile.java
index 3443a532f3..a6f04e1536 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/CallbackProfile.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/context/CallbackProfile.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sdk.dataproxy.pb.context;
import org.apache.inlong.sdk.commons.protocol.SdkEvent;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.flume.Event;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index f4ba492aee..ac6da06c61 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -17,10 +17,10 @@
package org.apache.inlong.sdk.dataproxy.threads;
-import org.apache.inlong.sdk.dataproxy.FileCallback;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendResult;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.metric.MessageRecord;
import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
import org.apache.inlong.sdk.dataproxy.network.Sender;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
index e0895bece7..8c77eae109 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
@@ -17,9 +17,9 @@
package org.apache.inlong.sdk.dataproxy.threads;
-import org.apache.inlong.sdk.dataproxy.FileCallback;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.network.QueueObject;
import org.apache.inlong.sdk.dataproxy.network.TimeScanObject;
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
index 84d317c4fa..0f349e5bad 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
@@ -56,7 +56,6 @@
<flink.connector.doris.version>1.0.3</flink.connector.doris.version>
<hudi.version>0.12.3</hudi.version>
<sqlserver.jdbc.version>7.2.2.jre8</sqlserver.jdbc.version>
- <thrift.version>0.9.3</thrift.version>
<flink.iceberg.version>1.1.0</flink.iceberg.version>
</properties>
@@ -187,7 +186,7 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
- <version>${thrift.version}</version>
+ <version>${libfb303.version}</version>
</dependency>
<dependency>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
index 923bf01d4c..32498fcb20 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
@@ -179,7 +179,7 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
- <version>${thrift.version}</version>
+ <version>${libfb303.version}</version>
</dependency>
<dependency>
diff --git a/pom.xml b/pom.xml
index a04d163a9c..0ad2f0209e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -208,6 +208,7 @@
<tencentcloud-api.version>3.1.830</tencentcloud-api.version>
<woodstox-core.version>5.4.0</woodstox-core.version>
<libfb303.version>0.9.3</libfb303.version>
+ <apache.thrift.version>0.14.1</apache.thrift.version>
</properties>
<dependencyManagement>