This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3db9adcc0a [INLONG-11695][SDK] MessageSender related interfaces
abstraction (#11696)
3db9adcc0a is described below
commit 3db9adcc0a135247c055badc2bb0f831b6bdeae7
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 21 12:12:01 2025 +0800
[INLONG-11695][SDK] MessageSender related interfaces abstraction (#11696)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/network/ClientMgr.java | 67 ++++++++++++++
.../inlong/sdk/dataproxy/sender/MessageSender.java | 47 ++++++++++
.../sdk/dataproxy/sender/MsgSendCallback.java | 42 +++++++++
.../sdk/dataproxy/sender/http/HttpMsgSender.java | 56 ++++++++++++
.../sdk/dataproxy/sender/tcp/TcpMsgSender.java | 101 +++++++++++++++++++++
5 files changed, 313 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
new file mode 100644
index 0000000000..bb678456b1
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Client Manager interface
+ *
+ * Used to Manager network client
+ */
+public interface ClientMgr {
+
+ /**
+ * Start network client manager
+ *
+ * @param procResult the start result, return detail error infos if
sending fails
+ * @return true if successful, false return indicates failure.
+ */
+ boolean start(ProcessResult procResult);
+
+ /**
+ * Stop network client manager
+ *
+ */
+ void stop();
+
+ /**
+ * Get the number of proxy nodes currently in use
+ *
+ * @return Number of nodes in use
+ */
+ int getActiveNodeCnt();
+
+ /**
+ * Get the number of in-flight messages
+ *
+ * @return Number of in-flight messages
+ */
+ int getInflightMsgCnt();
+
+ /**
+ * Update cached proxy nodes
+ *
+ * @param nodeChanged whether the updated node has changed
+ * @param hostInfoMap the new proxy nodes
+ */
+ void updateProxyInfoList(boolean nodeChanged, ConcurrentHashMap<String,
HostInfo> hostInfoMap);
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MessageSender.java
new file mode 100644
index 0000000000..26374c7857
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MessageSender.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+/**
+ * Message Sender interface
+ *
+ * Used to define the sender common methods
+ */
+public interface MessageSender {
+
+ /**
+ * Start sender when the sender is built
+ *
+ * <p>Attention:
+ * if return false, the caller need to handle it based on the error
code and
+ * error information returned by procResult, such as:
+ * prompting the user, retrying after some time, etc.
+ * </p>
+ *
+ * @param procResult the start result, return detail error infos if
sending fails
+ * @return true if successful, false return indicates that the sender
fails to start.
+ */
+ boolean start(ProcessResult procResult);
+
+ /**
+ * Close the sender when need to stop the sender's sending service.
+ */
+ void close();
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MsgSendCallback.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MsgSendCallback.java
new file mode 100644
index 0000000000..f0c8638f08
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MsgSendCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+/**
+ * Message Send Callback interface
+ *
+ * Used to define the send callback methods
+ */
+public interface MsgSendCallback {
+
+ /**
+ * Invoked when a message is confirmed by DataProxy
+ *
+ * @param result The event process result, include detail error infos if
sending fails
+ */
+ void onMessageAck(ProcessResult result);
+
+ /**
+ * Invoked when a message transportation interrupted by an exception
+ *
+ * @param ex The exception info
+ */
+ void onException(Throwable ex);
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSender.java
new file mode 100644
index 0000000000..d7cd77d8cf
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSender.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.http;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+
+/**
+ * HTTP Message Sender interface
+ *
+ * Used to define the HTTP message sender methods
+ */
+public interface HttpMsgSender extends MessageSender {
+
+ /**
+ * Synchronously send message and wait for the final sending result
+ *
+ * <p>Attention: if return false, the caller can choose to wait for a
period of time before trying again, or
+ * discard the event after multiple retries and failures.</p>
+ *
+ * @param eventInfo the event information need to send
+ * @param procResult The send result, including the detail error infos if
failed
+ * @return true if successful, false if failed for some reason.
+ */
+ boolean syncSendMessage(HttpEventInfo eventInfo, ProcessResult procResult);
+
+ /**
+ * Asynchronously send message
+ *
+ * <p>Attention: if return false, the caller can choose to wait for a
period of time before trying again, or
+ * discard the event after multiple retries and failures.</p>
+ *
+ * @param eventInfo the event information need to send
+ * @param callback the callback that returns the response from DataProxy
or
+ * an exception that occurred while waiting for the
response.
+ * @param procResult The send result, including the detail error infos if
the event not accepted
+ * @return true if successful, false if the event not accepted for some
reason.
+ */
+ boolean asyncSendMessage(HttpEventInfo eventInfo, MsgSendCallback
callback, ProcessResult procResult);
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
new file mode 100644
index 0000000000..97543d86a5
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+
+/**
+ * TCP Message Sender interface
+ *
+ * Used to define the TCP message sender methods
+ */
+public interface TcpMsgSender extends MessageSender {
+
+ /**
+ * Send message without response
+ *
+ * <p>Attention:
+ * 1. if return false, the caller can choose to wait for a period of time
before trying again, or
+ * discard the event after multiple retries and failures.
+ * 2. this method may lose messages. It is suitable for situations where
the reporting volume is very large,
+ * the business does not pay attention to the final reporting results,
and
+ * the message loss is tolerated in the event of an exception.
+ * </p>
+ *
+ * @param eventInfo the event information need to send
+ * @param procResult The send result, include the detail error infos if
the eventInfo is not accepted
+ * @return true if successful, false return indicates that the eventInfo
was not accepted for some reason.
+ */
+ boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult
procResult);
+
+ /**
+ * Synchronously send message and wait for the final sending result
+ *
+ * <p>Attention:
+ * 1. if return false, the caller can choose to wait for a period of time
before trying again, or
+ * discard the event after multiple retries and failures.
+ * 2. this method, with sendInB2B = true, tries to ensure that messages
are delivered, but there
+ * may be duplicate messages or message loss scenarios. It is suitable
for scenarios with
+ * a very large number of reports, very low reporting time
requirements, and
+ * the need to return the sending results.
+ * 3. this method, with sendInB2B = false, ensures that the message is
delivered only once and
+ * will not be repeated. It is suitable for businesses with a small
amount of reports and
+ * no requirements on the reporting time, but require DataProxy to
forward messages with high reliability.
+ * </p>
+ *
+ * @param sendInB2B indicates the DataProxy message service mode, true
indicates DataProxy returns
+ * as soon as it receives the request and forwards the
message in B2B mode until it succeeds;
+ * false indicates DataProxy returns after receiving the
request and forwarding it successfully,
+ * and DataProxy does not retry on failure
+ * @param eventInfo the event information need to send
+ * @param procResult The send result, including the detail error infos if
failed
+ * @return true if successful, false if failed for some reason.
+ */
+ boolean syncSendMessage(boolean sendInB2B,
+ TcpEventInfo eventInfo, ProcessResult procResult);
+
+ /**
+ * Asynchronously send message
+ *
+ * <p>Attention:
+ * 1. if return false, the caller can choose to wait for a period of time
before trying again, or
+ * discard the event after multiple retries and failures.
+ * 2. this method, with sendInB2B = true, tries to ensure that messages
are delivered, but there
+ * may be duplicate messages or message loss scenarios. It is suitable
for scenarios with
+ * a very large number of reports, very low reporting time
requirements, and
+ * the need to return the sending results.
+ * 3. this method, with sendInB2B = false, ensures that the message is
delivered only once and
+ * will not be repeated. It is suitable for businesses with a small
amount of reports and
+ * no requirements on the reporting time, but require DataProxy to
forward messages with high reliability.
+ * </p>
+ *
+ * @param sendInB2B indicates the DataProxy message service mode, true
indicates DataProxy returns
+ * as soon as it receives the request and forwards the
message in B2B mode until it succeeds;
+ * false indicates DataProxy returns after receiving the
request and forwarding it successfully,
+ * and DataProxy does not retry on failure
+ * @param eventInfo the event information need to send
+ * @param callback the callback that returns the response from DataProxy
or
+ * an exception that occurred while waiting for the
response.
+ * @param procResult The send result, including the detail error infos if
the event not accepted
+ * @return true if successful, false if the event not accepted for some
reason.
+ */
+ boolean asyncSendMessage(boolean sendInB2B,
+ TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult
procResult);
+}