This is an automated email from the ASF dual-hosted git repository.
urfree pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push:
new 528bb68988e Docs sync done from apache/pulsar(#4378856)
528bb68988e is described below
commit 528bb68988eb314d80b62e4769a7d752372ba60b
Author: Pulsar Site Updater <[email protected]>
AuthorDate: Wed Aug 31 12:01:48 2022 +0000
Docs sync done from apache/pulsar(#4378856)
---
site2/website-next/docs/client-libraries-java.md | 101 +++++++++++++++++++++
.../version-2.10.x/client-libraries-java.md | 101 +++++++++++++++++++++
.../version-2.9.x/client-libraries-java.md | 101 +++++++++++++++++++++
3 files changed, 303 insertions(+)
diff --git a/site2/website-next/docs/client-libraries-java.md
b/site2/website-next/docs/client-libraries-java.md
index e4978f01037..3c02e4edcf8 100644
--- a/site2/website-next/docs/client-libraries-java.md
+++ b/site2/website-next/docs/client-libraries-java.md
@@ -727,6 +727,47 @@ Producer<byte[]> producer = client.newProducer()
By default, producer chunks the large message based on max message size
(`maxMessageSize`) configured at broker (eg: 5MB). However, client can also
configure max chunked size using producer configuration `chunkMaxMessageSize`.
> **Note:** To enable chunking, you need to disable batching
> (`enableBatching`=`false`) concurrently.
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the
producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the
broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the
broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when
creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .intercept(new ProducerInterceptor {
+ @Override
+ boolean eligible(Message message) {
+ return true; // process all messages
+ }
+
+ @Override
+ Message beforeSend(Producer producer, Message message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ void onSendAcknowledgement(Producer producer, Message
message, MessageId msgId, Throwable exception) {
+ // user-defined processing logic
+ }
+ })
+ .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are
passed to the `intercept` method.
+
+:::
+
## Consumer
In Pulsar, consumers subscribe to topics and handle messages that producers
publish to those topics. You can instantiate a new
[consumer](reference-terminology.md#consumer) by first instantiating a
{@inject:
javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object
and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -1298,6 +1339,66 @@ If the message key is not specified, messages without
key are dispatched to one
:::
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the
consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()`
or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement
to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the
cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative
acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement
timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned)
topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when
creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+ .topic("my-topic")
+ .subscriptionName("my-subscription")
+ .intercept(new ConsumerInterceptor<String> {
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId
messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer,
MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int partitions)
{
+ // user-defined processing logic
+ }
+ })
+ .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are
passed to the `intercept` method.
+
+:::
+
## Reader
With the [reader interface](concepts-clients.md#reader-interface), Pulsar
clients can "manually position" themselves within a topic and reading all
messages from a specified message onward. The Pulsar API for Java enables you
to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader}
objects by specifying a topic and a {@inject:
javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.
diff --git
a/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
b/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
index 0b402f1cc45..5721bf31215 100644
--- a/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
+++ b/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
@@ -652,6 +652,47 @@ Producer<byte[]> producer = client.newProducer()
By default, producer chunks the large message based on max message size
(`maxMessageSize`) configured at broker (eg: 5MB). However, client can also
configure max chunked size using producer configuration `chunkMaxMessageSize`.
> **Note:** To enable chunking, you need to disable batching
> (`enableBatching`=`false`) concurrently.
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the
producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the
broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the
broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when
creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .intercept(new ProducerInterceptor {
+ @Override
+ boolean eligible(Message message) {
+ return true; // process all messages
+ }
+
+ @Override
+ Message beforeSend(Producer producer, Message message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ void onSendAcknowledgement(Producer producer, Message
message, MessageId msgId, Throwable exception) {
+ // user-defined processing logic
+ }
+ })
+ .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are
passed to the `intercept` method.
+
+:::
+
## Consumer
In Pulsar, consumers subscribe to topics and handle messages that producers
publish to those topics. You can instantiate a new
[consumer](reference-terminology.md#consumer) by first instantiating a
{@inject:
javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object
and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -1222,6 +1263,66 @@ If the message key is not specified, messages without
key are dispatched to one
:::
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the
consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()`
or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement
to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the
cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative
acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement
timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned)
topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when
creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+ .topic("my-topic")
+ .subscriptionName("my-subscription")
+ .intercept(new ConsumerInterceptor<String> {
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId
messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer,
MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int partitions)
{
+ // user-defined processing logic
+ }
+ })
+ .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are
passed to the `intercept` method.
+
+:::
+
## Reader
With the [reader interface](concepts-clients.md#reader-interface), Pulsar
clients can "manually position" themselves within a topic and reading all
messages from a specified message onward. The Pulsar API for Java enables you
to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader}
objects by specifying a topic and a {@inject:
javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.
diff --git
a/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
b/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
index 067a3a10de1..edd60396213 100644
--- a/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
+++ b/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
@@ -275,6 +275,47 @@ producer.newMessage()
You can terminate the builder chain with `sendAsync()` and get a future return.
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the
producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the
broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the
broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when
creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .intercept(new ProducerInterceptor {
+ @Override
+ boolean eligible(Message message) {
+ return true; // process all messages
+ }
+
+ @Override
+ Message beforeSend(Producer producer, Message message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ void onSendAcknowledgement(Producer producer, Message
message, MessageId msgId, Throwable exception) {
+ // user-defined processing logic
+ }
+ })
+ .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are
passed to the `intercept` method.
+
+:::
+
## Consumer
In Pulsar, consumers subscribe to topics and handle messages that producers
publish to those topics. You can instantiate a new
[consumer](reference-terminology.md#consumer) by first instantiating a
{@inject:
javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object
and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -765,6 +806,66 @@ If the message key is not specified, messages without key
are dispatched to one
:::
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the
consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()`
or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement
to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the
cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative
acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement
timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned)
topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when
creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+ .topic("my-topic")
+ .subscriptionName("my-subscription")
+ .intercept(new ConsumerInterceptor<String> {
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId
messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer,
MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int partitions)
{
+ // user-defined processing logic
+ }
+ })
+ .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are
passed to the `intercept` method.
+
+:::
+
## Reader
With the [reader interface](concepts-clients.md#reader-interface), Pulsar
clients can "manually position" themselves within a topic and reading all
messages from a specified message onward. The Pulsar API for Java enables you
to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader}
objects by specifying a topic and a {@inject:
javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.