shibd commented on code in PR #25270:
URL: https://github.com/apache/pulsar/pull/25270#discussion_r2923101849


##########
pip/pip-456.md:
##########
@@ -0,0 +1,190 @@
+# PIP-456: Add enableReadingMarkerMessage option to RawReader to allow reading 
marker messages
+
+# Background knowledge
+
+In Apache Pulsar, **marker messages** are internal messages used by the broker 
for various operations such as compaction, replication, and transactions. These 
markers are not meant to be visible to end users in normal consumer reads.
+
+The broker filters out marker messages before delivering them to consumers. 
The filtering logic is implemented in `AbstractBaseDispatcher.java`, which 
checks if a message is a marker type and decides whether to deliver it to the 
subscription.
+
+**RawReader** is a low-level reader API that allows reading raw messages 
directly from a topic without any client-side filtering or processing. It's 
useful for administrative tasks, debugging, and implementing custom message 
processing pipelines.
+
+**Subscription properties** are key-value pairs that can be passed when 
creating a subscription via the `CommandSubscription` protocol message. These 
properties are stored with the subscription and can be used to control 
subscription behavior.
+
+# Motivation
+
+When using RawReader to read messages for specific use cases like:
+- Compaction verification and monitoring
+- Custom data analysis pipelines
+- Debugging and troubleshooting
+
+Users want to read **all** messages from the topic, including marker messages, 
so they can handle marker types on the reader side. Currently, the broker 
filters out these marker messages before delivering them to any consumer, 
including RawReader.
+
+This feature is particularly useful for:
+1. **Compaction monitoring**: Users want to verify compaction results by 
reading all marker messages
+2. **Custom processing**: Users need access to all message types for their 
specific use cases
+3. **Debugging**: Developers need to inspect marker messages for 
troubleshooting
+
+# Goals
+
+## In Scope
+
+- Add a new option `enableReadingMarkerMessage` to RawReader API
+- Pass the option to broker via subscription properties (using key 
`enableReadingMarkerMessage`)
+- Modify broker to check subscription properties and skip marker filtering 
when enabled
+
+## Out of Scope
+
+- Changing the default behavior for existing subscriptions
+- Adding marker message handling on the client side
+- Supporting marker message reading for regular consumers
+
+# High Level Design
+
+The solution adds a new option in RawReader that sets a subscription property 
when creating the subscription. The broker checks this property to determine 
whether to skip filtering marker messages.
+
+```
+┌─────────────┐     ┌─────────────┐     ┌─────────────┐
+│   Client    │────▶│   Broker    │────▶│  Dispatcher │
+│  (RawReader)│     │             │     │             │
+└─────────────┘     └─────────────┘     └─────────────┘
+      │                                       │
+      │ subscriptionProperty:                 │
+      │   enableReadingMarkerMessages: true                    │
+      │                                       ▼
+      │                              ┌───────────────────┐
+      │                              │ Check subscription │
+      │                              │ properties for     │
+      │                              │ "enableReadingMarkerMessages"       │
+      │                              └───────────────────┘
+      │                                         │
+      ▼                                         ▼
+┌───────────────────┐              ┌───────────────────┐
+│ Subscription with │              │  Deliver marker    │
+│ enableReadingMarkerMessages=true   │              │  messages          │
+│ receives markers  │              └───────────────────┘
+└───────────────────┘
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Client-side Changes (RawReader)
+
+Add a new option to RawReader to enable reading marker messages. This can be 
done by setting the subscription property `enableReadingMarkerMessages` when 
creating the consumer.
+
+The option should be added to `ConsumerConfigurationData` and propagated to 
the subscription when the RawReader creates a consumer.
+
+Example usage:
+```java
+ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
+conf.getTopicNames().add(topic);
+conf.setSubscriptionName(subscription);
+conf.setEnableReadingMarkerMessage(true);
+
+var reader = RawReader.create(client, conf, true, true).get();
+```
+
+In the ConsumerConfigurationData:
+
+```java
+    private boolean enableReadingMarkerMessages;
+
+    public Map<String, String> getSubscriptionProperties() {
+        if (this.subscriptionProperties == null) {
+            this.subscriptionProperties = new TreeMap<>();
+        }
+        if (this.enableReadingMarkerMessages) {
+            subscriptionProperties.put("enableReadingMarkerMessages", 
String.valueOf(enableReadingMarkerMessages));
+        }
+        return subscriptionProperties;
+    }
+```
+
+### Broker-side Changes
+
+The broker already has logic to check subscription properties in 
`AbstractBaseDispatcher.java`:
+
+```java
+// Existing code in isMarkerAllowedDeliveryToSubscription
+if 
(!subscription.getSubscriptionProperties().getOrDefault("enableReadingMarkerMessages",
 "false")) {
+    // ... skip deliver the marker message to the consumer
+}
+```
+
+### Configuration
+
+No new broker configuration is required. The feature is controlled entirely 
via the client-side option.
+
+## Public-facing Changes
+
+### Public API
+
+| API | Description |
+|-----|-------------|
+| `ConsumerConfigurationData.setEnableReadingMarkerMessage(boolean)` | Set 
whether to read marker messages |

Review Comment:
   I'm wondering whether adding `enableReadingMarkerMessages` to 
`ConsumerConfigurationData` is the right approach. This field would become part 
of the `public API surface`, but it has no meaningful use case for **regular 
consumers** — it's only relevant for `RawReader`.
   
   To avoid leaking an internal-only concern into the general consumer API, I'd 
suggest keeping it out of ConsumerConfigurationData entirely and instead 
exposing it directly on RawReader:
   
   
   To avoid leaking an internal-only concern into the general consumer API, I'd 
suggest keeping it out of ConsumerConfigurationData entirely and instead 
exposing it directly on RawReader:
   
   
   ```java
   // RawReader.java
   public static CompletableFuture<RawReader> create(PulsarClient client, 
String topic, String subscription,
       boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors,
       boolean enableReadingMarkerMessages);
   ```
   
   
   Then set the subscription property internally inside RawReaderImpl:
   
   ```java
   public RawReaderImpl(PulsarClientImpl client, String topic, String 
subscription,
                        CompletableFuture<Consumer<byte[]>> consumerFuture,
                        boolean createTopicIfDoesNotExist, boolean 
retryOnRecoverableErrors,
                        boolean enableReadingMarkerMessages) {
       consumerConfiguration = new ConsumerConfigurationData<>();
       // ...
       if (enableReadingMarkerMessages) {
           consumerConfiguration.getSubscriptionProperties()
               .put("enableReadingMarkerMessages", "true");
       }
       consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture,
           createTopicIfDoesNotExist, retryOnRecoverableErrors);
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to