shibd commented on code in PR #25270:
URL: https://github.com/apache/pulsar/pull/25270#discussion_r2923101849
##########
pip/pip-456.md:
##########
@@ -0,0 +1,190 @@
+# PIP-456: Add enableReadingMarkerMessage option to RawReader to allow reading
marker messages
+
+# Background knowledge
+
+In Apache Pulsar, **marker messages** are internal messages used by the broker
for various operations such as compaction, replication, and transactions. These
markers are not meant to be visible to end users in normal consumer reads.
+
+The broker filters out marker messages before delivering them to consumers.
The filtering logic is implemented in `AbstractBaseDispatcher.java`, which
checks if a message is a marker type and decides whether to deliver it to the
subscription.
+
+**RawReader** is a low-level reader API that allows reading raw messages
directly from a topic without any client-side filtering or processing. It's
useful for administrative tasks, debugging, and implementing custom message
processing pipelines.
+
+**Subscription properties** are key-value pairs that can be passed when
creating a subscription via the `CommandSubscription` protocol message. These
properties are stored with the subscription and can be used to control
subscription behavior.
+
+# Motivation
+
+When using RawReader to read messages for specific use cases like:
+- Compaction verification and monitoring
+- Custom data analysis pipelines
+- Debugging and troubleshooting
+
+Users want to read **all** messages from the topic, including marker messages,
so they can handle marker types on the reader side. Currently, the broker
filters out these marker messages before delivering them to any consumer,
including RawReader.
+
+This feature is particularly useful for:
+1. **Compaction monitoring**: Users want to verify compaction results by
reading all marker messages
+2. **Custom processing**: Users need access to all message types for their
specific use cases
+3. **Debugging**: Developers need to inspect marker messages for
troubleshooting
+
+# Goals
+
+## In Scope
+
+- Add a new option `enableReadingMarkerMessage` to RawReader API
+- Pass the option to broker via subscription properties (using key
`enableReadingMarkerMessage`)
+- Modify broker to check subscription properties and skip marker filtering
when enabled
+
+## Out of Scope
+
+- Changing the default behavior for existing subscriptions
+- Adding marker message handling on the client side
+- Supporting marker message reading for regular consumers
+
+# High Level Design
+
+The solution adds a new option in RawReader that sets a subscription property
when creating the subscription. The broker checks this property to determine
whether to skip filtering marker messages.
+
+```
+┌─────────────┐ ┌─────────────┐ ┌─────────────┐
+│ Client │────▶│ Broker │────▶│ Dispatcher │
+│ (RawReader)│ │ │ │ │
+└─────────────┘ └─────────────┘ └─────────────┘
+ │ │
+ │ subscriptionProperty: │
+ │ enableReadingMarkerMessages: true │
+ │ ▼
+ │ ┌───────────────────┐
+ │ │ Check subscription │
+ │ │ properties for │
+ │ │ "enableReadingMarkerMessages" │
+ │ └───────────────────┘
+ │ │
+ ▼ ▼
+┌───────────────────┐ ┌───────────────────┐
+│ Subscription with │ │ Deliver marker │
+│ enableReadingMarkerMessages=true │ │ messages │
+│ receives markers │ └───────────────────┘
+└───────────────────┘
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Client-side Changes (RawReader)
+
+Add a new option to RawReader to enable reading marker messages. This can be
done by setting the subscription property `enableReadingMarkerMessages` when
creating the consumer.
+
+The option should be added to `ConsumerConfigurationData` and propagated to
the subscription when the RawReader creates a consumer.
+
+Example usage:
+```java
+ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
+conf.getTopicNames().add(topic);
+conf.setSubscriptionName(subscription);
+conf.setEnableReadingMarkerMessage(true);
+
+var reader = RawReader.create(client, conf, true, true).get();
+```
+
+In the ConsumerConfigurationData:
+
+```java
+ private boolean enableReadingMarkerMessages;
+
+ public Map<String, String> getSubscriptionProperties() {
+ if (this.subscriptionProperties == null) {
+ this.subscriptionProperties = new TreeMap<>();
+ }
+ if (this.enableReadingMarkerMessages) {
+ subscriptionProperties.put("enableReadingMarkerMessages",
String.valueOf(enableReadingMarkerMessages));
+ }
+ return subscriptionProperties;
+ }
+```
+
+### Broker-side Changes
+
+The broker already has logic to check subscription properties in
`AbstractBaseDispatcher.java`:
+
+```java
+// Existing code in isMarkerAllowedDeliveryToSubscription
+if
(!subscription.getSubscriptionProperties().getOrDefault("enableReadingMarkerMessages",
"false")) {
+ // ... skip deliver the marker message to the consumer
+}
+```
+
+### Configuration
+
+No new broker configuration is required. The feature is controlled entirely
via the client-side option.
+
+## Public-facing Changes
+
+### Public API
+
+| API | Description |
+|-----|-------------|
+| `ConsumerConfigurationData.setEnableReadingMarkerMessage(boolean)` | Set
whether to read marker messages |
Review Comment:
I'm wondering whether adding `enableReadingMarkerMessages` to
`ConsumerConfigurationData` is the right approach. This field would become part
of the `public API surface`, but it has no meaningful use case for **regular
consumers** — it's only relevant for `RawReader`.
To avoid leaking an internal-only concern into the general consumer API, I'd
suggest keeping it out of ConsumerConfigurationData entirely and instead
exposing it directly on RawReader:
To avoid leaking an internal-only concern into the general consumer API, I'd
suggest keeping it out of ConsumerConfigurationData entirely and instead
exposing it directly on RawReader:
```java
// RawReader.java
public static CompletableFuture<RawReader> create(PulsarClient client,
String topic, String subscription,
boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors,
boolean enableReadingMarkerMessages);
```
Then set the subscription property internally inside RawReaderImpl:
```java
public RawReaderImpl(PulsarClientImpl client, String topic, String
subscription,
CompletableFuture<Consumer<byte[]>> consumerFuture,
boolean createTopicIfDoesNotExist, boolean
retryOnRecoverableErrors,
boolean enableReadingMarkerMessages) {
consumerConfiguration = new ConsumerConfigurationData<>();
// ...
if (enableReadingMarkerMessages) {
consumerConfiguration.getSubscriptionProperties()
.put("enableReadingMarkerMessages", "true");
}
consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture,
createTopicIfDoesNotExist, retryOnRecoverableErrors);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]