This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1fe527d053 [INLONG-11729][SDK] Optimize TcpClientExample and
HttpClientExample codes (#11730)
1fe527d053 is described below
commit 1fe527d053121d7be9381784d3bb15f0c14ddb24
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Feb 8 16:51:07 2025 +0800
[INLONG-11729][SDK] Optimize TcpClientExample and HttpClientExample codes
(#11730)
Co-authored-by: gosonzhang <[email protected]>
---
.../apache/inlong/sdk/dataproxy/example/Event.java | 97 --------------------
.../sdk/dataproxy/example/HttpClientExample.java | 102 +++++++++++++++------
.../sdk/dataproxy/example/MyMessageCallBack.java | 60 ------------
.../sdk/dataproxy/example/SendMsgThread.java | 74 ---------------
.../sdk/dataproxy/example/TcpClientExample.java | 85 ++++++++---------
5 files changed, 115 insertions(+), 303 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java
deleted file mode 100644
index bbb7fe9140..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import java.util.ArrayList;
-
-public class Event {
-
- private byte[] body;
- private String groupId;
- private String streamId;
- private long dt;
- private int tryTimes = 0;
- ArrayList<byte[]> bodylist = new ArrayList<byte[]>();
-
- public Event(byte[] body, String groupId, String streamId, long dt) {
- super();
- this.body = body;
- this.groupId = groupId;
- this.streamId = streamId;
- this.dt = dt;
- this.setTryTimes(0);
- }
-
- public Event(ArrayList<byte[]> bodylist, String groupId, String streamId,
long dt) {
- super();
- this.bodylist = bodylist;
- this.groupId = groupId;
- this.streamId = streamId;
- this.dt = dt;
- this.setTryTimes(0);
- }
-
- public ArrayList<byte[]> getBodylist() {
- return bodylist;
- }
-
- public void setBodylist(ArrayList<byte[]> bodylist) {
- this.bodylist = bodylist;
- }
-
- public byte[] getBody() {
- return body;
- }
-
- public void setBody(byte[] body) {
- this.body = body;
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public long getDt() {
- return dt;
- }
-
- public void setDt(long dt) {
- this.dt = dt;
- }
-
- public int getTryTimes() {
- return tryTimes;
- }
-
- public void setTryTimes(int tryTimes) {
- this.tryTimes = tryTimes;
- }
-
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 502932d17d..730cb775c9 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -17,16 +17,21 @@
package org.apache.inlong.sdk.dataproxy.example;
-import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
-import org.apache.inlong.sdk.dataproxy.network.HttpProxySender;
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HttpClientExample {
+ private static final Logger logger =
LoggerFactory.getLogger(HttpClientExample.class);
+
public static void main(String[] args) {
String inlongGroupId = "test_group_id";
String inlongStreamId = "test_stream_id";
@@ -35,40 +40,77 @@ public class HttpClientExample {
String inLongManagerPort = "8083";
String messageBody = "inlong message body!";
- HttpProxySender sender = getMessageSender(inLongManagerAddr,
- inLongManagerPort, inlongGroupId, true, false,
- configBasePath);
+ // build sender factory
+ MsgSenderSingleFactory senderFactory = new MsgSenderSingleFactory();
+ // build sender object
+ HttpMsgSender sender = getMessageSender(senderFactory, false,
+ inLongManagerAddr, inLongManagerPort, inlongGroupId, false,
configBasePath);
+ // send message
sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody);
- sender.close(); // close the sender
+ // close all senders
+ sender.close();
}
- public static HttpProxySender getMessageSender(String inLongManagerAddr,
- String inLongManagerPort, String inlongGroupId,
- boolean requestByHttp, boolean isReadProxyIPFromLocal,
+ public static HttpMsgSender getMessageSender(MsgSenderFactory
senderFactory, boolean visitMsgByHttps,
+ String managerAddr, String managerPort, String inlongGroupId,
boolean useLocalMetaConfig,
String configBasePath) {
- HttpMsgSenderConfig httpConfig = null;
- HttpProxySender sender = null;
+ HttpMsgSender sender = null;
try {
- httpConfig = new HttpMsgSenderConfig(requestByHttp,
inLongManagerAddr,
- Integer.valueOf(inLongManagerPort),
- inlongGroupId, "admin", "inlong");// user and password of
manager
- httpConfig.setMetaStoreBasePath(configBasePath);
- httpConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
+ HttpMsgSenderConfig httpConfig = new
HttpMsgSenderConfig(visitMsgByHttps, managerAddr,
+ Integer.parseInt(managerPort), inlongGroupId, "admin",
"inlong");
httpConfig.setDiscardHttpCacheWhenClosing(true);
- sender = new HttpProxySender(httpConfig);
- } catch (ProxySdkException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
+ httpConfig.setMetaStoreBasePath(configBasePath);
+ httpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig);
+ httpConfig.setHttpConTimeoutMs(20000);
+ sender = senderFactory.genHttpSenderByGroupId(httpConfig);
+ } catch (Throwable ex) {
+ System.out.println("Get MessageSender throw exception, " + ex);
}
return sender;
}
- public static void sendHttpMessage(HttpProxySender sender, String
inlongGroupId,
- String inlongStreamId, String messageBody) {
- List<String> bodyList = new ArrayList<>();
- bodyList.add(messageBody);
- sender.asyncSendMessage(bodyList, inlongGroupId, inlongStreamId,
System.currentTimeMillis(),
- 20, TimeUnit.SECONDS, new MyMessageCallBack());
+ public static void sendHttpMessage(HttpMsgSender sender,
+ String inlongGroupId, String inlongStreamId, String messageBody) {
+ try {
+ ProcessResult procResult = new ProcessResult();
+ if (!sender.asyncSendMessage(new HttpEventInfo(inlongGroupId,
+ inlongStreamId, System.currentTimeMillis(), messageBody),
new MyMessageCallBack(), procResult)) {
+ System.out.println("Send message failure, result = " +
procResult);
+ return;
+ }
+ System.out.println("Send message success!");
+ } catch (Throwable ex) {
+ System.out.println("Send message exception" + ex);
+ }
+ }
+
+ // async callback class
+ public static class MyMessageCallBack implements MsgSendCallback {
+
+ private HttpMsgSender messageSender = null;
+ private HttpEventInfo event = null;
+
+ public MyMessageCallBack() {
+
+ }
+
+ public MyMessageCallBack(HttpMsgSender messageSender, HttpEventInfo
event) {
+ this.messageSender = messageSender;
+ this.event = event;
+ }
+
+ @Override
+ public void onMessageAck(ProcessResult result) {
+ if (result.isSuccess()) {
+ logger.info("onMessageAck return Ok");
+ } else {
+ logger.info("onMessageAck return failure = {}", result);
+ }
+ }
+
+ @Override
+ public void onException(Throwable ex) {
+ logger.error("Send message throw exception", ex);
+ }
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
deleted file mode 100644
index d9b5c08132..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MyMessageCallBack implements SendMessageCallback {
-
- private static final Logger logger = LoggerFactory
- .getLogger(MyMessageCallBack.class);
-
- private DefaultMessageSender messageSender = null;
- private Event event = null;
-
- public MyMessageCallBack() {
-
- }
-
- public MyMessageCallBack(DefaultMessageSender messageSender, Event event) {
- super();
- this.messageSender = messageSender;
- this.event = event;
- }
-
- @Override
- public void onMessageAck(SendResult result) {
- if (result == SendResult.OK) {
- logger.info("onMessageAck return Ok");
- } else {
- logger.info("onMessageAck return failure = {}", result);
- }
- }
-
- @Override
- public void onException(Throwable e) {
- logger.error("Send message failure, error {}", e.getMessage());
- e.printStackTrace();
- }
-
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
deleted file mode 100644
index 4658bb1a05..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-
-public class SendMsgThread extends Thread {
-
- private static final Logger logger =
LoggerFactory.getLogger(SendMsgThread.class);
- private DefaultMessageSender messageSender = null;
-
- public SendMsgThread(DefaultMessageSender messageSender) {
- this.messageSender = messageSender;
- }
-
- @Override
- public void run() {
- FileReader reader = null;
- try {
- reader = new FileReader("/data/work/jessey/d5.txt");
-
- BufferedReader br = new BufferedReader(reader);
- String line = null;
- while ((line = br.readLine()) != null) {
-
- long startTime = System.currentTimeMillis();
- SendResult result =
messageSender.sendMessage("hhhh".getBytes("utf8"),
- "b_test", "n_test1", 0,
String.valueOf(System.currentTimeMillis()));
- long endTime = System.currentTimeMillis();
- if (result == result.OK) {
- logger.info("this msg is ok time {}", endTime - startTime);
- } else {
- logger.info("this msg is error ,{}", result);
- }
- }
- } catch (Exception e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
-
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index ab7e674f80..ae653a02de 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -17,15 +17,18 @@
package org.apache.inlong.sdk.dataproxy.example;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
public class TcpClientExample {
@@ -38,59 +41,57 @@ public class TcpClientExample {
String inlongGroupId = "test_group_id";
String inlongStreamId = "test_stream_id";
-
String configBasePath = "";
String inLongManagerAddr = "127.0.0.1";
String inLongManagerPort = "8083";
-
- /*
- * It is recommended to use type 7. For others, please refer to the
official related documents
- */
- int msgType = 7;
+ int msgType = 7; // default report type
String messageBody = "inglong-message-random-body!";
+ // build sender factory
+ MsgSenderSingleFactory senderFactory = new MsgSenderSingleFactory();
+ // build sender object
TcpClientExample tcpClientExample = new TcpClientExample();
- DefaultMessageSender sender = tcpClientExample
- .getMessageSender(inLongManagerAddr, inLongManagerPort,
- inlongGroupId, true, false, configBasePath, msgType);
- tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId,
- messageBody, System.currentTimeMillis());
- sender.close(); // close the sender
+ TcpMsgSender sender = tcpClientExample.getMessageSender(senderFactory,
false,
+ inLongManagerAddr, inLongManagerPort, inlongGroupId, msgType,
false, configBasePath);
+ // send message
+ tcpClientExample.sendTcpMessage(sender,
+ inlongGroupId, inlongStreamId, System.currentTimeMillis(),
messageBody);
+ // close all senders
+ senderFactory.shutdownAll();
}
- public DefaultMessageSender getMessageSender(String inLongManagerAddr,
String inLongManagerPort,
- String inlongGroupId, boolean requestByHttp, boolean
isReadProxyIPFromLocal,
- String configBasePath, int msgType) {
- TcpMsgSenderConfig tcpConfig = null;
- DefaultMessageSender messageSender = null;
+ public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory,
boolean visitMgrByHttps,
+ String managerAddr, String managerPort, String inlongGroupId, int
msgType,
+ boolean useLocalMetaConfig, String configBasePath) {
+ TcpMsgSender messageSender = null;
try {
- tcpConfig = new TcpMsgSenderConfig(requestByHttp,
inLongManagerAddr,
- Integer.valueOf(inLongManagerPort), inlongGroupId,
"admin", "inlong");
- if (StringUtils.isNotEmpty(configBasePath)) {
- tcpConfig.setMetaStoreBasePath(configBasePath);
- }
- tcpConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
+ // build sender configure
+ TcpMsgSenderConfig tcpConfig =
+ new TcpMsgSenderConfig(visitMgrByHttps, managerAddr,
+ Integer.parseInt(managerPort), inlongGroupId,
"admin", "inlong");
+ tcpConfig.setMetaStoreBasePath(configBasePath);
+ tcpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig);
+ tcpConfig.setSdkMsgType(MsgType.valueOf(msgType));
tcpConfig.setRequestTimeoutMs(20000L);
- messageSender =
DefaultMessageSender.generateSenderByClusterId(tcpConfig);
- messageSender.setMsgtype(msgType);
- } catch (Exception e) {
- logger.error("getMessageSender has exception e = {}", e);
+ // build sender object
+ messageSender = senderFactory.genTcpSenderByClusterId(tcpConfig);
+ } catch (Throwable ex) {
+ System.out.println("Get MessageSender throw exception, " + ex);
}
return messageSender;
}
- public void sendTcpMessage(DefaultMessageSender sender, String
inlongGroupId,
- String inlongStreamId, String messageBody, long dt) {
- SendResult result = null;
+ public void sendTcpMessage(TcpMsgSender sender,
+ String inlongGroupId, String inlongStreamId, long dt, String
messageBody) {
+ ProcessResult procResult = new ProcessResult();
try {
- result = sender.sendMessage(messageBody.getBytes("utf8"),
inlongGroupId, inlongStreamId,
- 0, String.valueOf(dt));
-
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
+ sender.sendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId,
+ dt, null, messageBody.getBytes(StandardCharsets.UTF_8)),
procResult);
+ } catch (Throwable ex) {
+ System.out.println("Message sent throw exception, " + ex);
+ return;
}
- System.out.println("messageSender" + result);
- logger.info("messageSender {}", result);
+ System.out.println("Message sent result = " + procResult);
+ logger.info("Message sent result = {}", procResult);
}
-
}