This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 63f016825a6 [fix][client] Fix ConsumerBuilderImpl#subscribe silent
stuck when using pulsar-client:3.0.x with jackson-annotations prior to 2.12.0
(#21985)
63f016825a6 is described below
commit 63f016825a6c833e5a59041902711aced082415d
Author: 萧易客 <[email protected]>
AuthorDate: Tue Jan 30 19:20:29 2024 +0800
[fix][client] Fix ConsumerBuilderImpl#subscribe silent stuck when using
pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 (#21985)
### Motivation
In summary, `jackson-annotations:2.12.0` or later is now required for
`pulsar-client 3.0.x`, and this also applies to versions `3.1.x` and `3.2.x`.
Otherwise, `ConsumerBuilderImpl#subscribe` may become stuck without
displaying any error message.
### Modifications
Modify the `whenComplete` to a combination of `thenAccept` and
`exceptionally`. The modification is harmless.
---
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6ba3aaaaa46..0e346d4ea6a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -999,13 +999,13 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private void subscribeTopicPartitions(CompletableFuture<Void>
subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
- client.preProcessSchemaBeforeSubscribe(client, schema,
topicName).whenComplete((schema, cause) -> {
- if (null == cause) {
- doSubscribeTopicPartitions(schema, subscribeResult, topicName,
numPartitions, createIfDoesNotExist);
- } else {
- subscribeResult.completeExceptionally(cause);
- }
- });
+ client.preProcessSchemaBeforeSubscribe(client, schema, topicName)
+ .thenAccept(schema -> {
+ doSubscribeTopicPartitions(schema, subscribeResult,
topicName, numPartitions, createIfDoesNotExist);
+ }).exceptionally(cause -> {
+ subscribeResult.completeExceptionally(cause);
+ return null;
+ });
}
private void doSubscribeTopicPartitions(Schema<T> schema,