This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0656bc9455 [INLONG-7766][DataProxySDK] Adjusted frame length exceeds 
occurred when reporting data through the HTTP protocol (#8418)
0656bc9455 is described below

commit 0656bc9455418493dd51aee909e35fb89ab9aa50
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jul 4 18:54:22 2023 +0800

    [INLONG-7766][DataProxySDK] Adjusted frame length exceeds occurred when 
reporting data through the HTTP protocol (#8418)
---
 .../java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java | 7 +++++++
 .../org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java | 2 ++
 .../org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java  | 2 ++
 .../org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java   | 6 ++++++
 4 files changed, 17 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 355001fbb3..448c1a4a70 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
+import org.apache.inlong.common.constant.ProtocolType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.util.MessageUtils;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
@@ -102,6 +103,12 @@ public class DefaultMessageSender implements MessageSender 
{
      */
     public static DefaultMessageSender 
generateSenderByClusterId(ProxyClientConfig configure,
             ThreadFactory selfDefineFactory) throws Exception {
+        // correct ProtocolType settings
+        if (!ProtocolType.TCP.equals(configure.getProtocolType())) {
+            configure.setProtocolType(ProtocolType.TCP);
+        }
+        LOGGER.info("Initial tcp sender, configure is {}", configure);
+        // initial sender object
         ProxyConfigManager proxyConfigManager = new 
ProxyConfigManager(configure,
                 Utils.getLocalIp(), null);
         proxyConfigManager.setGroupId(configure.getGroupId());
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index f66df46bbc..593db7ceeb 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.dataproxy.example;
 
+import org.apache.inlong.common.constant.ProtocolType;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.network.HttpProxySender;
 import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
@@ -68,6 +69,7 @@ public class HttpClientExample {
             proxyConfig.setConfStoreBasePath(configBasePath);
             proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
             proxyConfig.setDiscardOldMessage(true);
+            proxyConfig.setProtocolType(ProtocolType.HTTP);
             sender = new HttpProxySender(proxyConfig);
         } catch (ProxysdkException e) {
             e.printStackTrace();
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index 92dd0d1dd1..0b46195bf9 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.dataproxy.example;
 
+import org.apache.inlong.common.constant.ProtocolType;
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.SendResult;
@@ -84,6 +85,7 @@ public class TcpClientExample {
                 dataProxyConfig.setConfStoreBasePath(configBasePath);
             }
             dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
+            dataProxyConfig.setProtocolType(ProtocolType.TCP);
             messageSender = 
DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
             messageSender.setMsgtype(msgType);
         } catch (Exception e) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 7fb5565dfc..426a300949 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.dataproxy.network;
 
+import org.apache.inlong.common.constant.ProtocolType;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.SendResult;
@@ -54,6 +55,11 @@ public class HttpProxySender extends Thread {
     private final LinkedBlockingQueue<HttpMessage> messageCache;
 
     public HttpProxySender(ProxyClientConfig configure) throws Exception {
+        // correct ProtocolType settings
+        if (!ProtocolType.HTTP.equals(configure.getProtocolType())) {
+            configure.setProtocolType(ProtocolType.HTTP);
+        }
+        logger.info("Initial http sender, configure is {}", configure);
         this.proxyClientConfig = configure;
         initTDMClientAndRequest(configure);
         this.messageCache = new 
LinkedBlockingQueue<>(configure.getTotalAsyncCallbackSize());

Reply via email to