This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d8e0f261 [ISSUE #4619]Fix Grpc request reply can not revice reply 
message (#4620)
5d8e0f261 is described below

commit 5d8e0f261373704b4f5bfec3b52ffdf2faa13048
Author: mxsm <[email protected]>
AuthorDate: Thu Dec 7 10:26:20 2023 +0800

    [ISSUE #4619]Fix Grpc request reply can not revice reply message (#4620)
    
    * [ISSUE #4619]Fix Grpc request reply can not revice reply message
    
    * fix codestyle
    
    * fomat codetyle
---
 .../common/protocol/grpc/common/ProtocolKey.java   |  2 ++
 .../protocol/grpc/common/SubscriptionReply.java    |  2 ++
 .../protocol/grpc/service/ConsumerService.java     | 26 ++++++++++------------
 .../client/grpc/consumer/SubStreamHandler.java     |  6 +++--
 4 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
index f45333cbf..b4d48c9b7 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java
@@ -47,6 +47,8 @@ public class ProtocolKey {
     public static final String GRPC_RESPONSE_MESSAGE = "response_message";
     public static final String GRPC_RESPONSE_TIME = "time";
 
+    public static final String SUB_MESSAGE_TYPE = "submessagetype";
+
     /**
      * CloudEvents spec
      *
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
index dc2894620..da40a0363 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java
@@ -31,6 +31,8 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 public class SubscriptionReply {
 
+    public static final String TYPE = "subscription_reply";
+
     private String producerGroup;
 
     private String topic;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
index 1b8b028d4..8852c1b40 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java
@@ -17,12 +17,13 @@
 
 package org.apache.eventmesh.runtime.core.protocol.grpc.service;
 
-import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
+import 
org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent.CloudEventAttributeValue;
 import 
org.apache.eventmesh.common.protocol.grpc.cloudevents.ConsumerServiceGrpc;
 import 
org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
+import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
-import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.common.protocol.grpc.common.SubscriptionReply;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.processor.ReplyMessageProcessor;
@@ -30,15 +31,13 @@ import 
org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeProces
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeStreamProcessor;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.processor.UnsubscribeProcessor;
 
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
-import java.util.Set;
+import java.util.Optional;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import io.grpc.stub.StreamObserver;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -86,19 +85,18 @@ public class ConsumerService extends 
ConsumerServiceGrpc.ConsumerServiceImplBase
 
             @Override
             public void onNext(CloudEvent subscription) {
-                Set<SubscriptionItem> subscriptionItems = 
JsonUtils.parseTypeReferenceObject(subscription.getTextData(),
-                    new TypeReference<Set<SubscriptionItem>>() {
-                    });
-                if (CollectionUtils.isNotEmpty(subscriptionItems)) {
+                final String subMessageType = 
Optional.ofNullable(subscription.getAttributesMap().get(ProtocolKey.SUB_MESSAGE_TYPE))
+                    
.orElse(CloudEventAttributeValue.newBuilder().build()).getCeString();
+                if (StringUtils.equals(subMessageType, 
SubscriptionReply.TYPE)) {
+                    log.info("cmd={}|{}|client2eventMesh|from={}|to={}", 
"reply-to-server", EventMeshConstants.PROTOCOL_GRPC,
+                        EventMeshCloudEventUtils.getIp(subscription), 
eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
+                    handleSubscribeReply(subscription, emitter);
+                } else {
                     log.info("cmd={}|{}|client2eventMesh|from={}|to={}", 
"subscribeStream", EventMeshConstants.PROTOCOL_GRPC,
                         EventMeshCloudEventUtils.getIp(subscription), 
eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
 
                     
eventMeshGrpcServer.getMetricsMonitor().recordReceiveMsgFromClient();
                     handleSubscriptionStream(subscription, emitter);
-                } else {
-                    log.info("cmd={}|{}|client2eventMesh|from={}|to={}", 
"reply-to-server", EventMeshConstants.PROTOCOL_GRPC,
-                        EventMeshCloudEventUtils.getIp(subscription), 
eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
-                    handleSubscribeReply(subscription, emitter);
                 }
             }
 
diff --git 
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
 
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
index 27385d119..5f1ee67ca 100644
--- 
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
+++ 
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
@@ -128,13 +128,15 @@ public class SubStreamHandler<T> extends Thread 
implements Serializable {
         Map<String, String> prop = new HashMap<>();
         Map<String, CloudEventAttributeValue> reqMessageMap = 
reqMessage.getAttributesMap();
         reqMessageMap.entrySet().forEach(entry -> prop.put(entry.getKey(), 
entry.getValue().getCeString()));
-        Map<String, CloudEventAttributeValue> cloudEventMap = 
reqMessage.getAttributesMap();
+        Map<String, CloudEventAttributeValue> cloudEventMap = 
cloudEvent.getAttributesMap();
         cloudEventMap.entrySet().forEach(entry -> prop.put(entry.getKey(), 
entry.getValue().getCeString()));
         subscriptionReply.putAllProperties(prop);
 
-        return 
CloudEvent.newBuilder().putAllAttributes(cloudEvent.getAttributesMap())
+        return 
CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessageMap)
             .putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
                 
CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
+            //Indicate that it is a subscription response
+            .putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, 
CloudEventAttributeValue.newBuilder().setCeString(SubscriptionReply.TYPE).build())
             .setTextData(JsonUtils.toJSONString(subscriptionReply)).build();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to