This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 7438a5542 [ISSUE #4529] Add sendcallback for springConnector. (#4532)
7438a5542 is described below
commit 7438a5542e376f75d3c3f38bfae123cf2be3c7cc
Author: yanrongzhen <[email protected]>
AuthorDate: Mon Nov 6 09:50:52 2023 +0800
[ISSUE #4529] Add sendcallback for springConnector. (#4532)
* Add sendcallback for springConnector.
* Do some optimization.
* Remove external dependencies.
* Canonical setter name.
---
.../spring/source/MessageSendingOperations.java | 4 +++
.../source/connector/SpringSourceConnector.java | 21 ++++++++++++
.../http/demo/sub/controller/SubController.java | 3 +-
.../apache/eventmesh/openconnect/SourceWorker.java | 29 +++++++++++++++--
.../api/callback/SendExcepionContext.java | 37 ++++++++++++++++++----
.../api/callback/SendMessageCallback.java | 9 +++---
.../openconnect/api/callback/SendResult.java | 32 +++++++++++++++----
.../offsetmgmt/api/data/ConnectRecord.java | 9 +++++-
.../offsetmgmt/api/data/DefaultKeyValue.java | 25 +++++++++------
.../openconnect/offsetmgmt/api/data/KeyValue.java | 4 ++-
.../eventmesh/runtime/boot/AbstractTCPServer.java | 4 +--
.../http/consumer/HttpClientGroupMapping.java | 6 ++--
.../http/processor/CreateTopicProcessor.java | 2 +-
.../http/processor/DeleteTopicProcessor.java | 2 +-
.../processor/LocalSubscribeEventProcessor.java | 2 +-
.../processor/LocalUnSubscribeEventProcessor.java | 6 ++--
.../http/processor/QuerySubscriptionProcessor.java | 2 +-
.../http/processor/SendAsyncEventProcessor.java | 2 +-
.../processor/SendAsyncRemoteEventProcessor.java | 2 +-
.../http/processor/SendSyncMessageProcessor.java | 4 +--
.../http/processor/SubscribeProcessor.java | 2 +-
.../protocol/http/push/AsyncHTTPPushRequest.java | 2 +-
.../eventmesh/runtime/util/EventMeshUtil.java | 2 +-
.../client/grpc/consumer/SubStreamHandler.java | 2 +-
.../http/consumer/EventMeshHttpConsumer.java | 4 +--
25 files changed, 163 insertions(+), 54 deletions(-)
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
index 37c0b291b..a337c1cd8 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.connector.spring.source;
+import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
+
/**
* Operations for sending messages.
*/
@@ -24,4 +26,6 @@ public interface MessageSendingOperations {
void send(Object message);
+ void send(Object message, SendMessageCallback sendCallback);
+
}
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index f87dc42e4..e57935d4f 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -19,6 +19,8 @@ package
org.apache.eventmesh.connector.spring.source.connector;
import org.apache.eventmesh.connector.spring.source.MessageSendingOperations;
import org.apache.eventmesh.connector.spring.source.config.SpringSourceConfig;
+import org.apache.eventmesh.openconnect.SourceWorker;
+import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
@@ -105,6 +107,10 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations {
return connectRecords;
}
+ /**
+ * Send message.
+ * @param message message to send
+ */
@Override
public void send(Object message) {
RecordPartition partition = new RecordPartition();
@@ -112,4 +118,19 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations {
ConnectRecord record = new ConnectRecord(partition, offset,
System.currentTimeMillis(), message);
queue.offer(record);
}
+
+ /**
+ * Send message with a callback.
+ * @param message message to send.
+ * @param workerCallback After the user sends the message to the Connector,
+ * the SourceWorker will fetch message and invoke.
+ */
+ @Override
+ public void send(Object message, SendMessageCallback workerCallback) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord record = new ConnectRecord(partition, offset,
System.currentTimeMillis(), message);
+ record.addExtension(SourceWorker.CALLBACK_EXTENSION, workerCallback);
+ queue.offer(record);
+ }
}
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
index f3b35ba77..a79b60c94 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
@@ -57,7 +57,8 @@ public class SubController {
public String subTest(final HttpServletRequest request) {
final String content = request.getParameter("content");
LogUtils.info(log, "receive message: {}", content);
- @SuppressWarnings("unchecked") final Map<String, String> contentMap =
JsonUtils.parseObject(content, HashMap.class);
+ @SuppressWarnings("unchecked")
+ final Map<String, String> contentMap = JsonUtils.parseObject(content,
HashMap.class);
if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME,
contentMap.get(ProtocolKey.PROTOCOL_TYPE))) {
final EventFormat eventFormat =
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
if (eventFormat != null) {
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index 3371f599a..ee0a8a865 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -23,10 +23,14 @@ import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.SystemUtils;
+import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
+import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.api.callback.SendResult;
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
@@ -69,6 +73,8 @@ public class SourceWorker implements ConnectorWorker {
private static final int MAX_RETRY_TIMES = 3;
+ public static final String CALLBACK_EXTENSION = "callBackExtension";
+
private OffsetStorageWriterImpl offsetStorageWriter;
private OffsetStorageReaderImpl offsetStorageReader;
@@ -181,6 +187,8 @@ public class SourceWorker implements ConnectorWorker {
// todo: convert connectRecord to cloudevent
CloudEvent event = convertRecordToEvent(connectRecord);
Optional<RecordOffsetManagement.SubmittedPosition>
submittedRecordPosition = prepareToUpdateRecordOffset(connectRecord);
+ Optional<SendMessageCallback> callback =
Optional.ofNullable(connectRecord.getExtensionObj(CALLBACK_EXTENSION))
+ .map(v -> (SendMessageCallback) v);
int retryTimes = 0;
// retry until MAX_RETRY_TIMES is reached
@@ -192,15 +200,15 @@ public class SourceWorker implements ConnectorWorker {
// commit record
this.source.commit(connectRecord);
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
+ callback.ifPresent(cb ->
cb.onSuccess(convertToSendResult(event)));
break;
}
- retryTimes++;
- log.warn("{} failed to send record to {}, retry times =
{}, failed record {}",
- this, event.getSubject(), retryTimes, connectRecord);
+ throw new EventMeshException("failed to send record.");
} catch (Throwable t) {
retryTimes++;
log.error("{} failed to send record to {}, retry times =
{}, failed record {}, throw {}",
this, event.getSubject(), retryTimes, connectRecord,
t.getMessage());
+ callback.ifPresent(cb ->
cb.onException(convertToExceptionContext(event, t)));
}
}
@@ -237,6 +245,21 @@ public class SourceWorker implements ConnectorWorker {
.build();
}
+ private SendResult convertToSendResult(CloudEvent event) {
+ SendResult result = new SendResult();
+ result.setMessageId(event.getId());
+ result.setTopic(event.getSubject());
+ return result;
+ }
+
+ private SendExcepionContext convertToExceptionContext(CloudEvent event,
Throwable cause) {
+ SendExcepionContext exceptionContext = new SendExcepionContext();
+ exceptionContext.setTopic(event.getId());
+ exceptionContext.setMessageId(event.getId());
+ exceptionContext.setCause(cause);
+ return exceptionContext;
+ }
+
@Override
public void stop() {
log.info("source worker stopping");
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
similarity index 54%
copy from
eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
copy to
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
index 37c0b291b..0311ceaef 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
@@ -15,13 +15,38 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.spring.source;
+package org.apache.eventmesh.openconnect.api.callback;
-/**
- * Operations for sending messages.
- */
-public interface MessageSendingOperations {
+public class SendExcepionContext {
+
+ private String messageId;
+ private String topic;
+ private Throwable cause;
+
+ public SendExcepionContext() {
+ }
+
+ public String getMessageId() {
+ return this.messageId;
+ }
+
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
+ }
+
+ public String getTopic() {
+ return this.topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
- void send(Object message);
+ public Throwable getCause() {
+ return this.cause;
+ }
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
}
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
similarity index 77%
copy from
eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
copy to
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
index 37c0b291b..fd6baba7e 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.spring.source;
+package org.apache.eventmesh.openconnect.api.callback;
/**
- * Operations for sending messages.
+ * Message sending callback interface.
*/
-public interface MessageSendingOperations {
+public interface SendMessageCallback {
- void send(Object message);
+ void onSuccess(SendResult sendResult);
+ void onException(SendExcepionContext sendExcepionContext);
}
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
similarity index 57%
copy from
eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
copy to
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
index 37c0b291b..8cd861f6d 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
@@ -15,13 +15,33 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.spring.source;
+package org.apache.eventmesh.openconnect.api.callback;
-/**
- * Operations for sending messages.
- */
-public interface MessageSendingOperations {
+public class SendResult {
+
+ private String messageId;
+ private String topic;
+
+ public SendResult() {
+ }
+
+ public String getMessageId() {
+ return this.messageId;
+ }
+
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
+ }
+
+ public String getTopic() {
+ return this.topic;
+ }
- void send(Object message);
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+ public String toString() {
+ return "SendResult[topic=" + this.topic + ", messageId=" +
this.messageId + ']';
+ }
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index 13c97344d..7766162e5 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -87,7 +87,7 @@ public class ConnectRecord {
}
}
- public void addExtension(String key, String value) {
+ public void addExtension(String key, Object value) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
}
@@ -101,6 +101,13 @@ public class ConnectRecord {
return this.extensions.getString(key);
}
+ public Object getExtensionObj(String key) {
+ if (this.extensions == null) {
+ return null;
+ }
+ return this.extensions.getObject(key);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
index 847e222c2..39f65e86b 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
@@ -23,14 +23,14 @@ import java.util.concurrent.ConcurrentHashMap;
public class DefaultKeyValue implements KeyValue {
- private Map<String, String> properties = new ConcurrentHashMap<>();
+ private Map<String, Object> properties = new ConcurrentHashMap<>();
@Override
public boolean getBoolean(String key) {
if (!properties.containsKey(key)) {
return false;
}
- return Boolean.parseBoolean(properties.get(key));
+ return Boolean.parseBoolean(String.valueOf(properties.get(key)));
}
@Override
@@ -43,7 +43,7 @@ public class DefaultKeyValue implements KeyValue {
if (!properties.containsKey(key)) {
return 0;
}
- return Short.parseShort(properties.get(key));
+ return Short.parseShort(String.valueOf(properties.get(key)));
}
@Override
@@ -64,7 +64,7 @@ public class DefaultKeyValue implements KeyValue {
}
public DefaultKeyValue() {
- properties = new ConcurrentHashMap<String, String>();
+ properties = new ConcurrentHashMap<String, Object>();
}
@Override
@@ -86,8 +86,8 @@ public class DefaultKeyValue implements KeyValue {
}
@Override
- public KeyValue put(String key, String value) {
- properties.put(key, String.valueOf(value));
+ public KeyValue put(String key, Object value) {
+ properties.put(key, value);
return this;
}
@@ -96,7 +96,7 @@ public class DefaultKeyValue implements KeyValue {
if (!properties.containsKey(key)) {
return 0;
}
- return Integer.parseInt(properties.get(key));
+ return Integer.parseInt(String.valueOf(properties.get(key)));
}
@Override
@@ -109,7 +109,7 @@ public class DefaultKeyValue implements KeyValue {
if (!properties.containsKey(key)) {
return 0;
}
- return Long.parseLong(properties.get(key));
+ return Long.parseLong(String.valueOf(properties.get(key)));
}
@Override
@@ -122,7 +122,7 @@ public class DefaultKeyValue implements KeyValue {
if (!properties.containsKey(key)) {
return 0;
}
- return Double.parseDouble(properties.get(key));
+ return Double.parseDouble(String.valueOf(properties.get(key)));
}
@Override
@@ -131,10 +131,15 @@ public class DefaultKeyValue implements KeyValue {
}
@Override
- public String getString(String key) {
+ public Object getObject(String key) {
return properties.get(key);
}
+ @Override
+ public String getString(String key) {
+ return String.valueOf(properties.get(key));
+ }
+
@Override
public String getString(final String key, final String defaultValue) {
return properties.containsKey(key) ? getString(key) : defaultValue;
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
index a5e566a1e..9cc8893a0 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
@@ -34,7 +34,7 @@ public interface KeyValue {
KeyValue put(String key, double value);
- KeyValue put(String key, String value);
+ KeyValue put(String key, Object value);
boolean getBoolean(String key);
@@ -60,6 +60,8 @@ public interface KeyValue {
String getString(String key, String defaultValue);
+ Object getObject(String key);
+
Set<String> keySet();
boolean containsKey(String key);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index 911e87f4b..f6ae561c7 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -297,8 +297,8 @@ public class AbstractTCPServer extends
AbstractRemotingServer {
private boolean isNeedTrace(Command cmd) {
return eventMeshTCPConfiguration.isEventMeshServerTraceEnable()
&& (Command.REQUEST_TO_SERVER == cmd
- || Command.ASYNC_MESSAGE_TO_SERVER == cmd
- || Command.BROADCAST_MESSAGE_TO_SERVER == cmd);
+ || Command.ASYNC_MESSAGE_TO_SERVER == cmd
+ || Command.BROADCAST_MESSAGE_TO_SERVER == cmd);
}
private void writeToClient(Command cmd, Package pkg,
ChannelHandlerContext ctx, Exception e) {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
index 34aca9907..3360027a6 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java
@@ -392,9 +392,9 @@ public final class HttpClientGroupMapping {
for (final String unSubTopic : unSubTopicList) {
isChange = isChange
|| removeSubscriptionByTopic(consumerGroup,
- unSubscribeUrl,
- unSubscribeRequestHeader.getIdc(),
- unSubTopic);
+ unSubscribeUrl,
+ unSubscribeRequestHeader.getIdc(),
+ unSubTopic);
}
} finally {
READ_WRITE_LOCK.writeLock().unlock();
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
index e146871ad..0554197bb 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java
@@ -150,6 +150,6 @@ public class CreateTopicProcessor implements
AsyncHttpProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.CREATE_TOPIC.getRequestURI()};
+ return new String[]{RequestURI.CREATE_TOPIC.getRequestURI()};
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
index 6ad858b89..417397721 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java
@@ -173,6 +173,6 @@ public class DeleteTopicProcessor implements
AsyncHttpProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.DELETE_TOPIC.getRequestURI()};
+ return new String[]{RequestURI.DELETE_TOPIC.getRequestURI()};
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
index eb03c8dc8..7e9086d71 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java
@@ -189,7 +189,7 @@ public class LocalSubscribeEventProcessor extends
AbstractEventProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
+ return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
}
private ClientInfo getClientInfo(final HttpEventWrapper requestWrapper) {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
index aede5ebab..0160d36e3 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java
@@ -193,7 +193,7 @@ public class LocalUnSubscribeEventProcessor extends
AbstractEventProcessor {
} catch (Exception e) {
LogUtils.error(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
- + "|topic={}|url={}", System.currentTimeMillis() -
startTime,
+ + "|topic={}|url={}", System.currentTimeMillis() -
startTime,
JsonUtils.toJSONString(unSubTopicList),
unSubscribeUrl, e);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR,
responseHeaderMap,
responseBodyMap, null);
@@ -215,7 +215,7 @@ public class LocalUnSubscribeEventProcessor extends
AbstractEventProcessor {
.removeIf(s -> StringUtils.equals(consumerGroup, s));
} catch (Exception e) {
LogUtils.error(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
- + "|topic={}|url={}", System.currentTimeMillis() -
startTime,
+ + "|topic={}|url={}", System.currentTimeMillis() -
startTime,
JsonUtils.toJSONString(unSubTopicList),
unSubscribeUrl, e);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR,
responseHeaderMap,
responseBodyMap, null);
@@ -229,7 +229,7 @@ public class LocalUnSubscribeEventProcessor extends
AbstractEventProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
+ return new String[]{RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
}
private void registerClient(final HttpEventWrapper requestWrapper, final
String consumerGroup, final List<String> topicList, final String url) {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
index 97629d489..ea0149444 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java
@@ -113,6 +113,6 @@ public class QuerySubscriptionProcessor implements
AsyncHttpProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.SUBSCRIPTION_QUERY.getRequestURI()};
+ return new String[]{RequestURI.SUBSCRIPTION_QUERY.getRequestURI()};
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index 6b0d178ec..22e83c1cf 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -284,6 +284,6 @@ public class SendAsyncEventProcessor implements
AsyncHttpProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.PUBLISH.getRequestURI()};
+ return new String[]{RequestURI.PUBLISH.getRequestURI()};
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 405f8ed38..909872038 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -324,7 +324,7 @@ public class SendAsyncRemoteEventProcessor implements
AsyncHttpProcessor {
@Override
public String[] paths() {
- return new String[] {RequestURI.PUBLISH_BRIDGE.getRequestURI()};
+ return new String[]{RequestURI.PUBLISH_BRIDGE.getRequestURI()};
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index ee1a1d834..e8e712aef 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -218,7 +218,7 @@ public class SendSyncMessageProcessor implements
HttpRequestProcessor {
@Override
public void onSuccess(final CloudEvent event) {
LogUtils.info(log,
"message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
- + "|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime,
+ + "|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime,
topic, bizNo, uniqueId);
try {
@@ -266,7 +266,7 @@ public class SendSyncMessageProcessor implements
HttpRequestProcessor {
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
LogUtils.error(log,
"message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
- + "|bizSeqNo={}|uniqueId={}",
+ + "|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime,
topic, bizNo, uniqueId, e);
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index f2bfeb1c6..971631689 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -188,7 +188,7 @@ public class SubscribeProcessor implements
HttpRequestProcessor {
SubscribeResponseBody.class);
final long endTime = System.currentTimeMillis();
LogUtils.error(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}"
- + "|bizSeqNo={}|uniqueId={}",
+ + "|bizSeqNo={}|uniqueId={}",
endTime - startTime,
JsonUtils.toJSONString(subscribeRequestBody.getTopics()),
subscribeRequestBody.getUrl(), e);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 747db0c9e..c60a01f5c 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -241,7 +241,7 @@ public class AsyncHTTPPushRequest extends
AbstractHTTPPushRequest {
} else {
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHttpPushMsgFailed();
LogUtils.info(MESSAGE_LOGGER,
"message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
- + "|uniqueId={}|cost={}",
+ + "|uniqueId={}|cost={}",
currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(),
handleMsgContext.getUniqueId(), cost);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 7b95c4fbe..1985e4c51 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -274,7 +274,7 @@ public class EventMeshUtil {
public static void printState(final ThreadPoolExecutor
scheduledExecutorService) {
LogUtils.info(log, "{} [{} {} {} {}]", ((EventMeshThreadFactory)
scheduledExecutorService.getThreadFactory())
- .getThreadNamePrefix(),
scheduledExecutorService.getQueue().size(),
scheduledExecutorService.getPoolSize(),
+ .getThreadNamePrefix(),
scheduledExecutorService.getQueue().size(),
scheduledExecutorService.getPoolSize(),
scheduledExecutorService.getActiveCount(),
scheduledExecutorService.getCompletedTaskCount());
}
diff --git
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
index 282979315..27385d119 100644
---
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
+++
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
@@ -54,7 +54,7 @@ public class SubStreamHandler<T> extends Thread implements
Serializable {
private final ReceiveMsgHook<T> listener;
public SubStreamHandler(final ConsumerServiceStub consumerAsyncClient,
final EventMeshGrpcClientConfig clientConfig,
- final ReceiveMsgHook<T> listener) {
+ final ReceiveMsgHook<T> listener) {
this.consumerAsyncClient = consumerAsyncClient;
this.clientConfig = clientConfig;
this.listener = listener;
diff --git
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
index d8cab6452..c07abe2a5 100644
---
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
+++
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
@@ -66,8 +66,8 @@ public class EventMeshHttpConsumer extends AbstractHttpClient
implements AutoClo
this(eventMeshHttpClientConfig, null);
}
- public EventMeshHttpConsumer(final EventMeshHttpClientConfig
eventMeshHttpClientConfig, final ThreadPoolExecutor customExecutor)
- throws EventMeshException {
+ public EventMeshHttpConsumer(final EventMeshHttpClientConfig
eventMeshHttpClientConfig,
+ final ThreadPoolExecutor customExecutor) throws EventMeshException {
super(eventMeshHttpClientConfig);
this.consumeExecutor = Optional.ofNullable(customExecutor).orElseGet(
() ->
ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpClientConfig.getConsumeThreadCore(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]