This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 611500987 [ISSUE #4643]Fix Grpc client can't reconnection when runtime
after crashing the runtime and restarting it (#4644)
611500987 is described below
commit 611500987f25b208f413d3e3387bf912aa081ed1
Author: mxsm <[email protected]>
AuthorDate: Thu Dec 14 15:29:26 2023 +0800
[ISSUE #4643]Fix Grpc client can't reconnection when runtime after crashing
the runtime and restarting it (#4644)
* [ISSUE #4643]Fix Grpc client can't reconnection when runtime after
crashing the runtime and restarting it.
* Delete unnecessary code
---
.../eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java | 10 ++++------
.../eventmesh/client/grpc/consumer/SubStreamHandler.java | 3 +--
2 files changed, 5 insertions(+), 8 deletions(-)
diff --git
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
index 83a33317e..868191d99 100644
---
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
+++
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
@@ -85,7 +85,8 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
}
public void init() {
- this.channel =
ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(),
clientConfig.getServerPort()).usePlaintext().build();
+ this.channel =
ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(),
clientConfig.getServerPort()).usePlaintext()
+ .build();
this.consumerClient = ConsumerServiceGrpc.newBlockingStub(channel);
this.consumerAsyncClient = ConsumerServiceGrpc.newStub(channel);
this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(channel);
@@ -246,13 +247,10 @@ public class EventMeshGrpcConsumer implements
AutoCloseable {
return;
}
- final Map<String, List<SubscriptionItem>> subscriptionGroup =
- subscriptionMap.values().stream()
- .collect(Collectors.groupingBy(SubscriptionInfo::getUrl,
- mapping(SubscriptionInfo::getSubscriptionItem, toList())));
+ final Map<String, List<SubscriptionItem>> subscriptionGroup =
subscriptionMap.values().stream()
+ .collect(Collectors.groupingBy(SubscriptionInfo::getUrl,
mapping(SubscriptionInfo::getSubscriptionItem, toList())));
subscriptionGroup.forEach((url, items) -> {
- // Subscription subscription = buildSubscription(items, url);
CloudEvent subscription =
EventMeshCloudEventBuilder.buildEventSubscription(clientConfig,
EventMeshProtocolType.EVENT_MESH_MESSAGE, url,
items);
subStreamHandler.sendSubscription(subscription);
diff --git
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
index 0d4e7fd94..c678f4166 100644
---
a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
+++
b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java
@@ -99,8 +99,7 @@ public class SubStreamHandler<T> extends Thread implements
Serializable {
@Override
public void onError(final Throwable t) {
- LogUtils.error(log, "Received Server side error", t);
- close();
+ log.error("Received Server side error", t);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]