This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push:
new 82ac4506e7e docs: fix Java code example compilation errors (closes
apache/pulsar#23246) (#1134)
82ac4506e7e is described below
commit 82ac4506e7e3ba4ec1f67de288982ccfbd2a59f7
Author: Mukunda Rao Katta <[email protected]>
AuthorDate: Sun May 3 23:00:58 2026 -0700
docs: fix Java code example compilation errors (closes apache/pulsar#23246)
(#1134)
- client-libraries/java-use.md: add missing semicolon after MessageListener
lambda assignment, switch raw Consumer/Message/Reader to
Consumer<byte[]>/Message<byte[]>/Reader<byte[]> so the snippets compile without
raw-type warnings.
- client-libraries/consumers.md: add missing semicolons after .subscribe()
in the Exclusive/Failover/Shared/Key_Shared examples; switch raw Consumer to
Consumer<byte[]>; fix conumser typo and stray double space in inline comment.
- client-libraries/producers.md: update the MessageRouter example to
implement the non-deprecated choosePartition(Message<?>, TopicMetadata)
overload; fix the ProducerInterceptor snippet (missing () after new, missing
close() implementation, beforeSend had no return value, raw types) so it
compiles against the current
org.apache.pulsar.client.api.interceptor.ProducerInterceptor interface.
Signed-off-by: MukundaKatta <[email protected]>
---
client-libraries/consumers.md | 32 +++++++++++++++----------------
client-libraries/java-use.md | 14 +++++++-------
client-libraries/producers.md | 44 +++++++++++++++++++++++++------------------
3 files changed, 49 insertions(+), 41 deletions(-)
diff --git a/client-libraries/consumers.md b/client-libraries/consumers.md
index dd57582de72..84a8b9e427c 100644
--- a/client-libraries/consumers.md
+++ b/client-libraries/consumers.md
@@ -79,11 +79,11 @@ Create a new consumer and subscribe with the `Exclusive`
subscription type.
<TabItem value="Java">
```java
-Consumer consumer = client.newConsumer()
+Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
- .subscribe()
+ .subscribe();
```
</TabItem>
@@ -117,18 +117,18 @@ Create new consumers and subscribe with the `Failover`
subscription type.
<TabItem value="Java">
```java
-Consumer consumer1 = client.newConsumer()
+Consumer<byte[]> consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
- .subscribe()
-Consumer consumer2 = client.newConsumer()
+ .subscribe();
+Consumer<byte[]> consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
- .subscribe()
-//conumser1 is the active consumer, consumer2 is the standby consumer.
-//consumer1 receives 5 messages and then crashes, consumer2 takes over as an
active consumer.
+ .subscribe();
+//consumer1 is the active consumer, consumer2 is the standby consumer.
+//consumer1 receives 5 messages and then crashes, consumer2 takes over as an
active consumer.
```
</TabItem>
@@ -184,17 +184,17 @@ Create new consumers and subscribe with `Shared`
subscription type.
<TabItem value="Java">
```java
-Consumer consumer1 = client.newConsumer()
+Consumer<byte[]> consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
- .subscribe()
+ .subscribe();
-Consumer consumer2 = client.newConsumer()
+Consumer<byte[]> consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
- .subscribe()
+ .subscribe();
//Both consumer1 and consumer2 are active consumers.
```
@@ -253,17 +253,17 @@ When using Key_Shared subscriptions, producers **must**
either **disable batchin
<TabItem value="Java">
```java
-Consumer consumer1 = client.newConsumer()
+Consumer<byte[]> consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
- .subscribe()
+ .subscribe();
-Consumer consumer2 = client.newConsumer()
+Consumer<byte[]> consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
- .subscribe()
+ .subscribe();
//Both consumer1 and consumer2 are active consumers.
```
diff --git a/client-libraries/java-use.md b/client-libraries/java-use.md
index 07ed45bcb95..f048608b957 100644
--- a/client-libraries/java-use.md
+++ b/client-libraries/java-use.md
@@ -54,7 +54,7 @@ In Pulsar, consumers subscribe to topics and handle messages
that producers publ
Once you've instantiated a
[PulsarClient](@pulsar:javadoc:client@/org/apache/pulsar/client/api/PulsarClient)
object, you can create a
[Consumer](@pulsar:javadoc:client@/org/apache/pulsar/client/api/Consumer) by
specifying a [topic](pathname:///docs/reference-terminology#topic) and a
[subscription](pathname:///docs/concepts-messaging#subscription-types).
```java
-Consumer consumer = client.newConsumer()
+Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
@@ -65,7 +65,7 @@ The `subscribe` method will auto-subscribe the consumer to
the specified topic a
```java
while (true) {
// Wait for a message
- Message msg = consumer.receive();
+ Message<byte[]> msg = consumer.receive();
try {
// Do something with the message
@@ -83,16 +83,16 @@ while (true) {
If you don't want to block your main thread but constantly listen for new
messages, consider using a `MessageListener`. The `MessageListener` will use a
thread pool inside the PulsarClient. You can set the number of threads to use
for message listeners in the ClientBuilder.
```java
-MessageListener myMessageListener = (consumer, msg) -> {
+MessageListener<byte[]> myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
-}
+};
-Consumer consumer = client.newConsumer()
+Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.messageListener(myMessageListener)
@@ -108,13 +108,13 @@ The following is an example.
```java
byte[] msgIdBytes = // Some message ID byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
-Reader reader = pulsarClient.newReader()
+Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
while (true) {
- Message message = reader.readNext();
+ Message<byte[]> message = reader.readNext();
// Process message
}
```
diff --git a/client-libraries/producers.md b/client-libraries/producers.md
index e54fc8f31e9..34c33b9d23f 100644
--- a/client-libraries/producers.md
+++ b/client-libraries/producers.md
@@ -411,11 +411,11 @@ The following is an example:
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Go","value":"Go"},{"label":"Python","value":"Python"}]}>
<TabItem value="Java">
-To use a custom message router, you need to provide an implementation of the
[MessageRouter](@pulsar:javadoc:client@/org/apache/pulsar/client/api/MessageRouter)
interface, which has just one `choosePartition` method:
+To use a custom message router, you need to provide an implementation of the
[MessageRouter](@pulsar:javadoc:client@/org/apache/pulsar/client/api/MessageRouter)
interface. Implement the `choosePartition(Message<?>, TopicMetadata)` method
(the single-argument overload is deprecated since 1.22.0):
```java
public interface MessageRouter extends Serializable {
- int choosePartition(Message msg);
+ int choosePartition(Message<?> msg, TopicMetadata metadata);
}
```
@@ -423,7 +423,8 @@ The following router routes every message to partition 10:
```java
public class AlwaysTenRouter implements MessageRouter {
- public int choosePartition(Message msg) {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 10;
}
}
@@ -627,21 +628,28 @@ To intercept messages, you can add a
`ProducerInterceptor` or multiple ones when
```java
Producer<byte[]> producer = client.newProducer()
.topic(topic)
- .intercept(new ProducerInterceptor {
- @Override
- boolean eligible(Message message) {
- return true; // process all messages
- }
-
- @Override
- Message beforeSend(Producer producer, Message message) {
- // user-defined processing logic
- }
-
- @Override
- void onSendAcknowledgement(Producer producer, Message
message, MessageId msgId, Throwable exception) {
- // user-defined processing logic
- }
+ .intercept(new ProducerInterceptor() {
+ @Override
+ public void close() {
+ // release any resources held by the interceptor
+ }
+
+ @Override
+ public boolean eligible(Message<?> message) {
+ return true; // process all messages
+ }
+
+ @Override
+ public Message<?> beforeSend(Producer<?> producer, Message<?>
message) {
+ // user-defined processing logic; return the (possibly
modified) message
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer<?> producer, Message<?>
message,
+ MessageId msgId, Throwable
exception) {
+ // user-defined processing logic
+ }
})
.create();
```