This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 5d8e0f261 [ISSUE #4619]Fix Grpc request reply can not revice reply
message (#4620)
5d8e0f261 is described below
commit 5d8e0f261373704b4f5bfec3b52ffdf2faa13048
Author: mxsm <[email protected]>
AuthorDate: Thu Dec 7 10:26:20 2023 +0800
[ISSUE #4619]Fix Grpc request reply can not revice reply message (#4620)
* [ISSUE #4619]Fix Grpc request reply can not revice reply message
* fix codestyle
* fomat codetyle
---
.../common/protocol/grpc/common/ProtocolKey.java | 2 ++
.../protocol/grpc/common/SubscriptionReply.java | 2 ++
.../protocol/grpc/service/ConsumerService.java | 26 ++++++++++------------
.../client/grpc/consumer/SubStreamHandler.java | 6 +++--
4 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
index f45333cbf..b4d48c9b7 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
@@ -47,6 +47,8 @@ public class ProtocolKey {
public static final String GRPC_RESPONSE_MESSAGE = "response_message";
public static final String GRPC_RESPONSE_TIME = "time";
+ public static final String SUB_MESSAGE_TYPE = "submessagetype";
+
/**
* CloudEvents spec
*
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
index dc2894620..da40a0363 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
@@ -31,6 +31,8 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class SubscriptionReply {
+ public static final String TYPE = "subscription_reply";
+
private String producerGroup;
private String topic;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
index 1b8b028d4..8852c1b40 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
@@ -17,12 +17,13 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.service;
-import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
+import
org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent.CloudEventAttributeValue;
import
org.apache.eventmesh.common.protocol.grpc.cloudevents.ConsumerServiceGrpc;
import
org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
+import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
-import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.common.protocol.grpc.common.SubscriptionReply;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import
org.apache.eventmesh.runtime.core.protocol.grpc.processor.ReplyMessageProcessor;
@@ -30,15 +31,13 @@ import
org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeProces
import
org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeStreamProcessor;
import
org.apache.eventmesh.runtime.core.protocol.grpc.processor.UnsubscribeProcessor;
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
-import java.util.Set;
+import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import io.grpc.stub.StreamObserver;
-import com.fasterxml.jackson.core.type.TypeReference;
-
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -86,19 +85,18 @@ public class ConsumerService extends
ConsumerServiceGrpc.ConsumerServiceImplBase
@Override
public void onNext(CloudEvent subscription) {
- Set<SubscriptionItem> subscriptionItems =
JsonUtils.parseTypeReferenceObject(subscription.getTextData(),
- new TypeReference<Set<SubscriptionItem>>() {
- });
- if (CollectionUtils.isNotEmpty(subscriptionItems)) {
+ final String subMessageType =
Optional.ofNullable(subscription.getAttributesMap().get(ProtocolKey.SUB_MESSAGE_TYPE))
+
.orElse(CloudEventAttributeValue.newBuilder().build()).getCeString();
+ if (StringUtils.equals(subMessageType,
SubscriptionReply.TYPE)) {
+ log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
"reply-to-server", EventMeshConstants.PROTOCOL_GRPC,
+ EventMeshCloudEventUtils.getIp(subscription),
eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
+ handleSubscribeReply(subscription, emitter);
+ } else {
log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
"subscribeStream", EventMeshConstants.PROTOCOL_GRPC,
EventMeshCloudEventUtils.getIp(subscription),
eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
eventMeshGrpcServer.getMetricsMonitor().recordReceiveMsgFromClient();
handleSubscriptionStream(subscription, emitter);
- } else {
- log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
"reply-to-server", EventMeshConstants.PROTOCOL_GRPC,
- EventMeshCloudEventUtils.getIp(subscription),
eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
- handleSubscribeReply(subscription, emitter);
}
}
diff --git
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
index 27385d119..5f1ee67ca 100644
---
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
+++
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
@@ -128,13 +128,15 @@ public class SubStreamHandler<T> extends Thread
implements Serializable {
Map<String, String> prop = new HashMap<>();
Map<String, CloudEventAttributeValue> reqMessageMap =
reqMessage.getAttributesMap();
reqMessageMap.entrySet().forEach(entry -> prop.put(entry.getKey(),
entry.getValue().getCeString()));
- Map<String, CloudEventAttributeValue> cloudEventMap =
reqMessage.getAttributesMap();
+ Map<String, CloudEventAttributeValue> cloudEventMap =
cloudEvent.getAttributesMap();
cloudEventMap.entrySet().forEach(entry -> prop.put(entry.getKey(),
entry.getValue().getCeString()));
subscriptionReply.putAllProperties(prop);
- return
CloudEvent.newBuilder().putAllAttributes(cloudEvent.getAttributesMap())
+ return
CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessageMap)
.putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
+ //Indicate that it is a subscription response
+ .putAttributes(ProtocolKey.SUB_MESSAGE_TYPE,
CloudEventAttributeValue.newBuilder().setCeString(SubscriptionReply.TYPE).build())
.setTextData(JsonUtils.toJSONString(subscriptionReply)).build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]