This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 7438a5542 [ISSUE #4529] Add sendcallback for springConnector. (#4532)
7438a5542 is described below

commit 7438a5542e376f75d3c3f38bfae123cf2be3c7cc
Author: yanrongzhen <[email protected]>
AuthorDate: Mon Nov 6 09:50:52 2023 +0800

    [ISSUE #4529] Add sendcallback for springConnector. (#4532)
    
    * Add sendcallback for springConnector.
    
    * Do some optimization.
    
    * Remove external dependencies.
    
    * Canonical setter name.
---
 .../spring/source/MessageSendingOperations.java    |  4 +++
 .../source/connector/SpringSourceConnector.java    | 21 ++++++++++++
 .../http/demo/sub/controller/SubController.java    |  3 +-
 .../apache/eventmesh/openconnect/SourceWorker.java | 29 +++++++++++++++--
 .../api/callback/SendExcepionContext.java          | 37 ++++++++++++++++++----
 .../api/callback/SendMessageCallback.java          |  9 +++---
 .../openconnect/api/callback/SendResult.java       | 32 +++++++++++++++----
 .../offsetmgmt/api/data/ConnectRecord.java         |  9 +++++-
 .../offsetmgmt/api/data/DefaultKeyValue.java       | 25 +++++++++------
 .../openconnect/offsetmgmt/api/data/KeyValue.java  |  4 ++-
 .../eventmesh/runtime/boot/AbstractTCPServer.java  |  4 +--
 .../http/consumer/HttpClientGroupMapping.java      |  6 ++--
 .../http/processor/CreateTopicProcessor.java       |  2 +-
 .../http/processor/DeleteTopicProcessor.java       |  2 +-
 .../processor/LocalSubscribeEventProcessor.java    |  2 +-
 .../processor/LocalUnSubscribeEventProcessor.java  |  6 ++--
 .../http/processor/QuerySubscriptionProcessor.java |  2 +-
 .../http/processor/SendAsyncEventProcessor.java    |  2 +-
 .../processor/SendAsyncRemoteEventProcessor.java   |  2 +-
 .../http/processor/SendSyncMessageProcessor.java   |  4 +--
 .../http/processor/SubscribeProcessor.java         |  2 +-
 .../protocol/http/push/AsyncHTTPPushRequest.java   |  2 +-
 .../eventmesh/runtime/util/EventMeshUtil.java      |  2 +-
 .../client/grpc/consumer/SubStreamHandler.java     |  2 +-
 .../http/consumer/EventMeshHttpConsumer.java       |  4 +--
 25 files changed, 163 insertions(+), 54 deletions(-)

diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
index 37c0b291b..a337c1cd8 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
@@ -17,6 +17,8 @@
 
 package org.apache.eventmesh.connector.spring.source;
 
+import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
+
 /**
  * Operations for sending messages.
  */
@@ -24,4 +26,6 @@ public interface MessageSendingOperations {
 
     void send(Object message);
 
+    void send(Object message, SendMessageCallback sendCallback);
+
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index f87dc42e4..e57935d4f 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -19,6 +19,8 @@ package 
org.apache.eventmesh.connector.spring.source.connector;
 
 import org.apache.eventmesh.connector.spring.source.MessageSendingOperations;
 import org.apache.eventmesh.connector.spring.source.config.SpringSourceConfig;
+import org.apache.eventmesh.openconnect.SourceWorker;
+import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
 import org.apache.eventmesh.openconnect.api.config.Config;
 import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
 import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
@@ -105,6 +107,10 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations {
         return connectRecords;
     }
 
+    /**
+     * Send message.
+     * @param message message to send
+     */
     @Override
     public void send(Object message) {
         RecordPartition partition = new RecordPartition();
@@ -112,4 +118,19 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations {
         ConnectRecord record = new ConnectRecord(partition, offset, 
System.currentTimeMillis(), message);
         queue.offer(record);
     }
+
+    /**
+     * Send message with a callback.
+     * @param message message to send.
+     * @param workerCallback After the user sends the message to the Connector,
+     *                       the SourceWorker will fetch message and invoke.
+     */
+    @Override
+    public void send(Object message, SendMessageCallback workerCallback) {
+        RecordPartition partition = new RecordPartition();
+        RecordOffset offset = new RecordOffset();
+        ConnectRecord record = new ConnectRecord(partition, offset, 
System.currentTimeMillis(), message);
+        record.addExtension(SourceWorker.CALLBACK_EXTENSION, workerCallback);
+        queue.offer(record);
+    }
 }
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
index f3b35ba77..a79b60c94 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
@@ -57,7 +57,8 @@ public class SubController {
     public String subTest(final HttpServletRequest request) {
         final String content = request.getParameter("content");
         LogUtils.info(log, "receive message: {}", content);
-        @SuppressWarnings("unchecked") final Map<String, String> contentMap = 
JsonUtils.parseObject(content, HashMap.class);
+        @SuppressWarnings("unchecked")
+        final Map<String, String> contentMap = JsonUtils.parseObject(content, 
HashMap.class);
         if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME, 
contentMap.get(ProtocolKey.PROTOCOL_TYPE))) {
             final EventFormat eventFormat = 
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
             if (eventFormat != null) {
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index 3371f599a..ee0a8a865 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -23,10 +23,14 @@ import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
 import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
 import org.apache.eventmesh.client.tcp.common.MessageUtils;
 import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.common.utils.SystemUtils;
+import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
+import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.api.config.SourceConfig;
 import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
 import org.apache.eventmesh.openconnect.api.source.Source;
@@ -69,6 +73,8 @@ public class SourceWorker implements ConnectorWorker {
 
     private static final int MAX_RETRY_TIMES = 3;
 
+    public static final String CALLBACK_EXTENSION = "callBackExtension";
+
     private OffsetStorageWriterImpl offsetStorageWriter;
 
     private OffsetStorageReaderImpl offsetStorageReader;
@@ -181,6 +187,8 @@ public class SourceWorker implements ConnectorWorker {
             // todo: convert connectRecord to cloudevent
             CloudEvent event = convertRecordToEvent(connectRecord);
             Optional<RecordOffsetManagement.SubmittedPosition> 
submittedRecordPosition = prepareToUpdateRecordOffset(connectRecord);
+            Optional<SendMessageCallback> callback = 
Optional.ofNullable(connectRecord.getExtensionObj(CALLBACK_EXTENSION))
+                .map(v -> (SendMessageCallback) v);
 
             int retryTimes = 0;
             // retry until MAX_RETRY_TIMES is reached
@@ -192,15 +200,15 @@ public class SourceWorker implements ConnectorWorker {
                         // commit record
                         this.source.commit(connectRecord);
                         
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
+                        callback.ifPresent(cb -> 
cb.onSuccess(convertToSendResult(event)));
                         break;
                     }
-                    retryTimes++;
-                    log.warn("{} failed to send record to {}, retry times = 
{}, failed record {}",
-                        this, event.getSubject(), retryTimes, connectRecord);
+                    throw new EventMeshException("failed to send record.");
                 } catch (Throwable t) {
                     retryTimes++;
                     log.error("{} failed to send record to {}, retry times = 
{}, failed record {}, throw {}",
                         this, event.getSubject(), retryTimes, connectRecord, 
t.getMessage());
+                    callback.ifPresent(cb -> 
cb.onException(convertToExceptionContext(event, t)));
                 }
             }
 
@@ -237,6 +245,21 @@ public class SourceWorker implements ConnectorWorker {
             .build();
     }
 
+    private SendResult convertToSendResult(CloudEvent event) {
+        SendResult result = new SendResult();
+        result.setMessageId(event.getId());
+        result.setTopic(event.getSubject());
+        return result;
+    }
+
+    private SendExcepionContext convertToExceptionContext(CloudEvent event, 
Throwable cause) {
+        SendExcepionContext exceptionContext = new SendExcepionContext();
+        exceptionContext.setTopic(event.getId());
+        exceptionContext.setMessageId(event.getId());
+        exceptionContext.setCause(cause);
+        return exceptionContext;
+    }
+
     @Override
     public void stop() {
         log.info("source worker stopping");
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
similarity index 54%
copy from 
eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
copy to 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
index 37c0b291b..0311ceaef 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
@@ -15,13 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.spring.source;
+package org.apache.eventmesh.openconnect.api.callback;
 
-/**
- * Operations for sending messages.
- */
-public interface MessageSendingOperations {
+public class SendExcepionContext {
+
+    private String messageId;
+    private String topic;
+    private Throwable cause;
+
+    public SendExcepionContext() {
+    }
+
+    public String getMessageId() {
+        return this.messageId;
+    }
+
+    public void setMessageId(String messageId) {
+        this.messageId = messageId;
+    }
+
+    public String getTopic() {
+        return this.topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
 
-    void send(Object message);
+    public Throwable getCause() {
+        return this.cause;
+    }
 
+    public void setCause(Throwable cause) {
+        this.cause = cause;
+    }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
similarity index 77%
copy from 
eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
copy to 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
index 37c0b291b..fd6baba7e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.spring.source;
+package org.apache.eventmesh.openconnect.api.callback;
 
 /**
- * Operations for sending messages.
+ * Message sending callback interface.
  */
-public interface MessageSendingOperations {
+public interface SendMessageCallback {
 
-    void send(Object message);
+    void onSuccess(SendResult sendResult);
 
+    void onException(SendExcepionContext sendExcepionContext);
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
similarity index 57%
copy from 
eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
copy to 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
index 37c0b291b..8cd861f6d 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
@@ -15,13 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.spring.source;
+package org.apache.eventmesh.openconnect.api.callback;
 
-/**
- * Operations for sending messages.
- */
-public interface MessageSendingOperations {
+public class SendResult {
+
+    private String messageId;
+    private String topic;
+
+    public SendResult() {
+    }
+
+    public String getMessageId() {
+        return this.messageId;
+    }
+
+    public void setMessageId(String messageId) {
+        this.messageId = messageId;
+    }
+
+    public String getTopic() {
+        return this.topic;
+    }
 
-    void send(Object message);
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
 
+    public String toString() {
+        return "SendResult[topic=" + this.topic + ", messageId=" + 
this.messageId + ']';
+    }
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index 13c97344d..7766162e5 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -87,7 +87,7 @@ public class ConnectRecord {
         }
     }
 
-    public void addExtension(String key, String value) {
+    public void addExtension(String key, Object value) {
         if (this.extensions == null) {
             this.extensions = new DefaultKeyValue();
         }
@@ -101,6 +101,13 @@ public class ConnectRecord {
         return this.extensions.getString(key);
     }
 
+    public Object getExtensionObj(String key) {
+        if (this.extensions == null) {
+            return null;
+        }
+        return this.extensions.getObject(key);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
index 847e222c2..39f65e86b 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
@@ -23,14 +23,14 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class DefaultKeyValue implements KeyValue {
 
-    private Map<String, String> properties = new ConcurrentHashMap<>();
+    private Map<String, Object> properties = new ConcurrentHashMap<>();
 
     @Override
     public boolean getBoolean(String key) {
         if (!properties.containsKey(key)) {
             return false;
         }
-        return Boolean.parseBoolean(properties.get(key));
+        return Boolean.parseBoolean(String.valueOf(properties.get(key)));
     }
 
     @Override
@@ -43,7 +43,7 @@ public class DefaultKeyValue implements KeyValue {
         if (!properties.containsKey(key)) {
             return 0;
         }
-        return Short.parseShort(properties.get(key));
+        return Short.parseShort(String.valueOf(properties.get(key)));
     }
 
     @Override
@@ -64,7 +64,7 @@ public class DefaultKeyValue implements KeyValue {
     }
 
     public DefaultKeyValue() {
-        properties = new ConcurrentHashMap<String, String>();
+        properties = new ConcurrentHashMap<String, Object>();
     }
 
     @Override
@@ -86,8 +86,8 @@ public class DefaultKeyValue implements KeyValue {
     }
 
     @Override
-    public KeyValue put(String key, String value) {
-        properties.put(key, String.valueOf(value));
+    public KeyValue put(String key, Object value) {
+        properties.put(key, value);
         return this;
     }
 
@@ -96,7 +96,7 @@ public class DefaultKeyValue implements KeyValue {
         if (!properties.containsKey(key)) {
             return 0;
         }
-        return Integer.parseInt(properties.get(key));
+        return Integer.parseInt(String.valueOf(properties.get(key)));
     }
 
     @Override
@@ -109,7 +109,7 @@ public class DefaultKeyValue implements KeyValue {
         if (!properties.containsKey(key)) {
             return 0;
         }
-        return Long.parseLong(properties.get(key));
+        return Long.parseLong(String.valueOf(properties.get(key)));
     }
 
     @Override
@@ -122,7 +122,7 @@ public class DefaultKeyValue implements KeyValue {
         if (!properties.containsKey(key)) {
             return 0;
         }
-        return Double.parseDouble(properties.get(key));
+        return Double.parseDouble(String.valueOf(properties.get(key)));
     }
 
     @Override
@@ -131,10 +131,15 @@ public class DefaultKeyValue implements KeyValue {
     }
 
     @Override
-    public String getString(String key) {
+    public Object getObject(String key) {
         return properties.get(key);
     }
 
+    @Override
+    public String getString(String key) {
+        return String.valueOf(properties.get(key));
+    }
+
     @Override
     public String getString(final String key, final String defaultValue) {
         return properties.containsKey(key) ? getString(key) : defaultValue;
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
index a5e566a1e..9cc8893a0 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
@@ -34,7 +34,7 @@ public interface KeyValue {
 
     KeyValue put(String key, double value);
 
-    KeyValue put(String key, String value);
+    KeyValue put(String key, Object value);
 
     boolean getBoolean(String key);
 
@@ -60,6 +60,8 @@ public interface KeyValue {
 
     String getString(String key, String defaultValue);
 
+    Object getObject(String key);
+
     Set<String> keySet();
 
     boolean containsKey(String key);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index 911e87f4b..f6ae561c7 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -297,8 +297,8 @@ public class AbstractTCPServer extends 
AbstractRemotingServer {
         private boolean isNeedTrace(Command cmd) {
             return eventMeshTCPConfiguration.isEventMeshServerTraceEnable()
                 && (Command.REQUEST_TO_SERVER == cmd
-                || Command.ASYNC_MESSAGE_TO_SERVER == cmd
-                || Command.BROADCAST_MESSAGE_TO_SERVER == cmd);
+                    || Command.ASYNC_MESSAGE_TO_SERVER == cmd
+                    || Command.BROADCAST_MESSAGE_TO_SERVER == cmd);
         }
 
         private void writeToClient(Command cmd, Package pkg, 
ChannelHandlerContext ctx, Exception e) {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
index 34aca9907..3360027a6 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
@@ -392,9 +392,9 @@ public final class HttpClientGroupMapping {
             for (final String unSubTopic : unSubTopicList) {
                 isChange = isChange
                     || removeSubscriptionByTopic(consumerGroup,
-                    unSubscribeUrl,
-                    unSubscribeRequestHeader.getIdc(),
-                    unSubTopic);
+                        unSubscribeUrl,
+                        unSubscribeRequestHeader.getIdc(),
+                        unSubTopic);
             }
         } finally {
             READ_WRITE_LOCK.writeLock().unlock();
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
index e146871ad..0554197bb 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
@@ -150,6 +150,6 @@ public class CreateTopicProcessor implements 
AsyncHttpProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.CREATE_TOPIC.getRequestURI()};
+        return new String[]{RequestURI.CREATE_TOPIC.getRequestURI()};
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
index 6ad858b89..417397721 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
@@ -173,6 +173,6 @@ public class DeleteTopicProcessor implements 
AsyncHttpProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.DELETE_TOPIC.getRequestURI()};
+        return new String[]{RequestURI.DELETE_TOPIC.getRequestURI()};
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index eb03c8dc8..7e9086d71 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -189,7 +189,7 @@ public class LocalSubscribeEventProcessor extends 
AbstractEventProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
+        return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
     }
 
     private ClientInfo getClientInfo(final HttpEventWrapper requestWrapper) {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index aede5ebab..0160d36e3 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -193,7 +193,7 @@ public class LocalUnSubscribeEventProcessor extends 
AbstractEventProcessor {
 
                 } catch (Exception e) {
                     LogUtils.error(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
-                            + "|topic={}|url={}", System.currentTimeMillis() - 
startTime,
+                        + "|topic={}|url={}", System.currentTimeMillis() - 
startTime,
                         JsonUtils.toJSONString(unSubTopicList), 
unSubscribeUrl, e);
                     
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, 
responseHeaderMap,
                         responseBodyMap, null);
@@ -215,7 +215,7 @@ public class LocalUnSubscribeEventProcessor extends 
AbstractEventProcessor {
                         .removeIf(s -> StringUtils.equals(consumerGroup, s));
                 } catch (Exception e) {
                     LogUtils.error(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
-                            + "|topic={}|url={}", System.currentTimeMillis() - 
startTime,
+                        + "|topic={}|url={}", System.currentTimeMillis() - 
startTime,
                         JsonUtils.toJSONString(unSubTopicList), 
unSubscribeUrl, e);
                     
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, 
responseHeaderMap,
                         responseBodyMap, null);
@@ -229,7 +229,7 @@ public class LocalUnSubscribeEventProcessor extends 
AbstractEventProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
+        return new String[]{RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
     }
 
     private void registerClient(final HttpEventWrapper requestWrapper, final 
String consumerGroup, final List<String> topicList, final String url) {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
index 97629d489..ea0149444 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
@@ -113,6 +113,6 @@ public class QuerySubscriptionProcessor implements 
AsyncHttpProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.SUBSCRIPTION_QUERY.getRequestURI()};
+        return new String[]{RequestURI.SUBSCRIPTION_QUERY.getRequestURI()};
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index 6b0d178ec..22e83c1cf 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -284,6 +284,6 @@ public class SendAsyncEventProcessor implements 
AsyncHttpProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.PUBLISH.getRequestURI()};
+        return new String[]{RequestURI.PUBLISH.getRequestURI()};
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 405f8ed38..909872038 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -324,7 +324,7 @@ public class SendAsyncRemoteEventProcessor implements 
AsyncHttpProcessor {
 
     @Override
     public String[] paths() {
-        return new String[] {RequestURI.PUBLISH_BRIDGE.getRequestURI()};
+        return new String[]{RequestURI.PUBLISH_BRIDGE.getRequestURI()};
     }
 
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index ee1a1d834..e8e712aef 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -218,7 +218,7 @@ public class SendSyncMessageProcessor implements 
HttpRequestProcessor {
                 @Override
                 public void onSuccess(final CloudEvent event) {
                     LogUtils.info(log, 
"message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
-                            + "|bizSeqNo={}|uniqueId={}", 
System.currentTimeMillis() - startTime,
+                        + "|bizSeqNo={}|uniqueId={}", 
System.currentTimeMillis() - startTime,
                         topic, bizNo, uniqueId);
 
                     try {
@@ -266,7 +266,7 @@ public class SendSyncMessageProcessor implements 
HttpRequestProcessor {
 
                     
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                     LogUtils.error(log, 
"message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
-                            + "|bizSeqNo={}|uniqueId={}",
+                        + "|bizSeqNo={}|uniqueId={}",
                         System.currentTimeMillis() - startTime,
                         topic, bizNo, uniqueId, e);
                 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index f2bfeb1c6..971631689 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -188,7 +188,7 @@ public class SubscribeProcessor implements 
HttpRequestProcessor {
                     SubscribeResponseBody.class);
                 final long endTime = System.currentTimeMillis();
                 LogUtils.error(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}"
-                        + "|bizSeqNo={}|uniqueId={}",
+                    + "|bizSeqNo={}|uniqueId={}",
                     endTime - startTime,
                     JsonUtils.toJSONString(subscribeRequestBody.getTopics()),
                     subscribeRequestBody.getUrl(), e);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 747db0c9e..c60a01f5c 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -241,7 +241,7 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
                 } else {
                     
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHttpPushMsgFailed();
                     LogUtils.info(MESSAGE_LOGGER, 
"message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
-                            + "|uniqueId={}|cost={}",
+                        + "|uniqueId={}|cost={}",
                         currPushUrl, handleMsgContext.getTopic(),
                         handleMsgContext.getBizSeqNo(), 
handleMsgContext.getUniqueId(), cost);
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 7b95c4fbe..1985e4c51 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -274,7 +274,7 @@ public class EventMeshUtil {
 
     public static void printState(final ThreadPoolExecutor 
scheduledExecutorService) {
         LogUtils.info(log, "{} [{} {} {} {}]", ((EventMeshThreadFactory) 
scheduledExecutorService.getThreadFactory())
-                .getThreadNamePrefix(), 
scheduledExecutorService.getQueue().size(), 
scheduledExecutorService.getPoolSize(),
+            .getThreadNamePrefix(), 
scheduledExecutorService.getQueue().size(), 
scheduledExecutorService.getPoolSize(),
             scheduledExecutorService.getActiveCount(), 
scheduledExecutorService.getCompletedTaskCount());
     }
 
diff --git 
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
 
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
index 282979315..27385d119 100644
--- 
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
+++ 
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
@@ -54,7 +54,7 @@ public class SubStreamHandler<T> extends Thread implements 
Serializable {
     private final ReceiveMsgHook<T> listener;
 
     public SubStreamHandler(final ConsumerServiceStub consumerAsyncClient, 
final EventMeshGrpcClientConfig clientConfig,
-                            final ReceiveMsgHook<T> listener) {
+        final ReceiveMsgHook<T> listener) {
         this.consumerAsyncClient = consumerAsyncClient;
         this.clientConfig = clientConfig;
         this.listener = listener;
diff --git 
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
 
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
index d8cab6452..c07abe2a5 100644
--- 
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
+++ 
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
@@ -66,8 +66,8 @@ public class EventMeshHttpConsumer extends AbstractHttpClient 
implements AutoClo
         this(eventMeshHttpClientConfig, null);
     }
 
-    public EventMeshHttpConsumer(final EventMeshHttpClientConfig 
eventMeshHttpClientConfig, final ThreadPoolExecutor customExecutor)
-        throws EventMeshException {
+    public EventMeshHttpConsumer(final EventMeshHttpClientConfig 
eventMeshHttpClientConfig,
+        final ThreadPoolExecutor customExecutor) throws EventMeshException {
         super(eventMeshHttpClientConfig);
         this.consumeExecutor = Optional.ofNullable(customExecutor).orElseGet(
             () -> 
ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpClientConfig.getConsumeThreadCore(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to