9997766 opened a new issue, #4804:
URL: https://github.com/apache/eventmesh/issues/4804

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/eventmesh/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Environment
   
   Linux
   
   ### EventMesh version
   
   1.10.0
   
   ### What happened
   
   There is a problem with the class of SubStreamHandler. grpc is two-way 
communication. I have integrated grpc pub and sub functions in the project.
   This code throws an exception, and every 30 seconds,
   
   
   
    private void senderOnNext(final CloudEvent subscription) {
           try {
               synchronized (sender) {
                   sender.onNext(subscription);
               }
           } catch (Exception e) {
               log.error("StreamObserver Error onNext", e);
           }
       }
   
   
   I analyzed the reason, because when the exception is thrown, you should call 
onCompleted to close the sender, but that is not done here, resulting in
   grpc thinks the connection is still there and keeps throwing exceptions in a 
loop.
   
   I  tried the method below,it solved the problem。 
   Will you change code quickly?   I am using the project,but this problem  
stop me。
   
   
   
   @Slf4j
   public class SubStreamHandler<T> extends Thread implements Serializable {
   
   ...
       public SubStreamHandler(final ConsumerServiceStub consumerAsyncClient, 
final EventMeshGrpcClientConfig clientConfig,
                               final ReceiveMsgHook<T> listener) {
           this.consumerAsyncClient = consumerAsyncClient;
           this.clientConfig = clientConfig;
           this.listener = listener;
       }
   
       public void sendSubscription(final CloudEvent subscription) {
           synchronized (this) {
               if (this.sender == null) {
                   this.sender = 
consumerAsyncClient.subscribeStream(createReceiver());
               }
           }
           senderOnNext(subscription);
       }
   
       private StreamObserver<CloudEvent> createReceiver() {
           return new StreamObserver<CloudEvent>() {
   
               @Override
               public void onNext(final CloudEvent message) {
                   T msg = 
EventMeshCloudEventBuilder.buildMessageFromEventMeshCloudEvent(message, 
listener.getProtocolType());
                   if (msg instanceof Set) {
                       log.info("Received message from Server:{}", message);
                   } else {
                       log.info("Received message from 
Server.|seq={}|uniqueId={}|",
                               EventMeshCloudEventUtils.getSeqNum(message), 
EventMeshCloudEventUtils.getUniqueId(message));
                       CloudEvent streamReply = null;
                       try {
                           Optional<T> reply = listener.handle(msg);
                           if (reply.isPresent()) {
                               streamReply = buildReplyMessage(message, 
reply.get());
                           }
                       } catch (Exception e) {
                           log.error("Error in handling reply 
message.|seq={}|uniqueId={}|",
                                   EventMeshCloudEventUtils.getSeqNum(message), 
EventMeshCloudEventUtils.getUniqueId(message), e);
                       }
                       if (streamReply != null) {
                           log.info("Sending reply message to 
Server.|seq={}|uniqueId={}|",
                                   
EventMeshCloudEventUtils.getSeqNum(streamReply), 
EventMeshCloudEventUtils.getUniqueId(streamReply));
                           senderOnNext(streamReply);
                       }
                   }
               }
   
               @Override
               public void onError(final Throwable t) {
                   log.error("Received Server side error", t);
                   **close();**
               }
   
               @Override
               public void onCompleted() {
                   log.info("Finished receiving messages from server.");
                   close();
               }
           };
       }
   
       private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final 
T replyMessage) {
           final CloudEvent cloudEvent = 
EventMeshCloudEventBuilder.buildEventMeshCloudEvent(replyMessage,
                   clientConfig, listener.getProtocolType());
   
           return 
CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessage.getAttributesMap()).putAllAttributes(cloudEvent.getAttributesMap())
                   .putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
                           
CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
                   // Indicate that it is a subscription response
                   .putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, 
CloudEventAttributeValue.newBuilder().setCeString(ProtocolKey.SUB_REPLY_MESSAGE).build())
                   .build();
       }
   
       @Override
       public void run() {
           try {
               latch.await();
           } catch (InterruptedException e) {
               log.error("SubStreamHandler Thread interrupted", e);
               Thread.currentThread().interrupt();
           }
       }
   
       public void close() {
           if (this.sender != null) {
               senderOnComplete();
           }
   
           latch.countDown();
   
           log.info("SubStreamHandler closed.");
       }
   
       private void senderOnNext(final CloudEvent subscription) {
           try {
               synchronized (sender) {
                   sender.onNext(subscription);
               }
           } catch (Exception e) {
               log.error("StreamObserver Error onNext", e);
               **close();**
           }
       }
   
       private void senderOnComplete() {
           try {
               synchronized (sender) {
                   sender.onCompleted();
                   **sender=null;**
               }
           } catch (Exception e) {
               log.error("StreamObserver Error onComplete", e);
           }
       }
   }
   
   
   ### How to reproduce
   
   when exception happen ,this method will cause endless loop。
   for example,grpc time out。 
    private void senderOnNext(final CloudEvent subscription) {
           try {
               synchronized (sender) {
                   sender.onNext(subscription);
               }
           } catch (Exception e) {
               log.error("StreamObserver Error onNext", e);
           }
       }
   
   
   ### Debug logs
   
   ```Java
   no
   ```
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [ ] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct) *


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org
For additional commands, e-mail: issues-h...@eventmesh.apache.org

Reply via email to