EronWright opened a new issue #12267:
URL: https://github.com/apache/pulsar/issues/12267
- **Status:** Draft
- **Author:** Eron Wright
- **Pull Request:**
- **Mailing List discussion:** (discussion) (vote)
- **Release:**
## Motivation
Unified stream processing frameworks such as Apache Flink rely on event-time
watermarks to perform time-based computations. Watermarks facilitate forward
progress in the processing of unbounded streams, and may be understood as an
assertion made by the producer about the progression of event time.
Pulsar has a limited concept of event time today: the `eventTime` field of a
message. This field (or a timestamp extracted from the message body) is
sometimes used by the consumer to generate watermarks, e.g. using the bounded
out-of-orderness heuristic. This approach is highly sensitive to Pulsar’s
system internals, especially in historical processing and when working with
partitioned topics.
## Goal
The proposal is to enhance Pulsar to broker watermarks from producers to
consumers as an alternative to heuristics-based watermarks. Watermarks are a
new type of message, generated by producers, aggregated by brokers, and
delivered by subscriptions, with special semantics for routing and dispatching.
The intention is that all routing and subscription modes be supported. This
means that numerous producers may emit watermarks for a given topic, and are
broadcast to all partitions of the topic. The broker aggregates these
watermarks and dispatches to subscribers a watermark that is the minimum across
all producers for the current position in the topic. The watermark should
advance as messages are _acknowledged_, to support consumer failover and
various subscription modes. Transactional messaging should be fully supported.
## API Changes
### Producer API
The producer emits watermarks via a new method on the Producer interface.
The producer is making an assertion that the minimum event time for any
subsequent message is at least the specified time (see `setEventTime` field of
`TypedMessageBuilder<T>`).
```java
package org.apache.pulsar.client.api;
public interface Producer<T> extends Closeable {
/**
* Create a new watermark builder.
*
* @return a watermark builder that can be used to construct the
watermark to be sent through this producer.
*/
WatermarkBuilder newWatermark();
}
```
Watermarks may also be sent transactionally:
```java
public interface Producer<T> extends Closeable {
/**
* Create a new watermark builder with transaction.
*/
WatermarkBuilder newWatermark(Transaction txn);
}
```
The builder allows for an event time to be set on the watermark, and/or for
the producer to be idled (taken out of consideration for watermarking purposes).
```java
public interface WatermarkBuilder extends Serializable {
/**
* Set the event time for a given watermark.
*
* @return the watermark builder instance
*/
WatermarkBuilder eventTime(long timestamp);
/**
* Marks the producer as idle.
*
* @return the watermark builder instance
*/
WatermarkBuilder markIdle();
CompletableFuture<WatermarkId> sendAsync();
WatermarkId send() throws PulsarClientException;
}
```
### Consumer API
The consumer opts into watermarking using a new method on the
`ConsumerBuilder`. There are minor semantic changes to the read methods which
warrant this.
```java
public interface ConsumerBuilder<T> extends Cloneable {
/**
* Enable or disable receiving watermarks.
* @param watermarkingEnabled true if watermarks should be delivered.
* @return
*/
ConsumerBuilder<T> enableWatermarking(boolean watermarkingEnabled);
}
```
The consumer receives watermarks using a new method on the `MessageListener`
interface.
```java
package org.apache.pulsar.client.api;
public interface MessageListener<T> extends Serializable {
/**
* This method is called whenever a new watermark is received.
*
* <p>Watermarks are guaranteed to be delivered in order (with respect
* to messages) and from the same thread for a single consumer.
*
* <p>This method will only be called once for each watermark
* @param consumer
* the consumer that received the message
* @param watermark
* the watermark object
*/
default void receivedWatermark(Consumer<T> consumer, Watermark
watermark) {
// no-op
}
}
```
The consumer may also obtain the latest watermark via the `Consumer`
interface. This should be called after read or `readAsync` completes. To
accelerate the receipt of watermarks, any outstanding async read is
automatically completed with a null message. It is suggested that apps use
read with a timeout if the synchronous approach is preferred.
```java
public interface Consumer<T> extends Closeable {
/**
* @return The latest watermark, or null if watermarking is not enabled
or a watermark has not been received.
*/
Watermark getLastWatermark();
}
```
The consumer receives watermarks on the same thread as ordinary messages.
The consumer may expect that any subsequent message will have an event
timestamp of at least the watermark value. If the expectation is violated, it
is due to a false assertion made by a producer. The application should treat
such messages as true late messages.
## Implementation
### Protocol
Watermarks sent to the broker are sent and persisted into the managed ledger
as “marker” messages (using the existing `markerType` metadata field). A
number of variants are defined below. Note that marker messages aren't vended
directly to consumers.
Watermarks sent by the broker to the consumer aren’t messages; watermarks
aren’t acknowledged nor have a `MessageId`.
#### Marker: Set Watermark
A message with marker type `W_SET` records a given producer’s watermark
assertion, with the corresponding timestamp stored in the `eventTime` metadata
field, and producer name stored in the `producerName` metadata field.
Once a watermark from a given producer is stored in the ledger, the producer
becomes an active member of the set of producers to consider when materializing
an overall watermark (based on the minimum across active producers). There is
no explicit way for a producer to join the set aside from sending an initial
watermark. To maximize correctness, producers should eagerly send an initial
watermark (e.g. during an initialization phase).
#### Marker: Idle Producer
A message with marker type `W_IDLE` removes a given producer from the set of
producers to consider when materializing an overall watermark, with producer
name stored in the `producerName` metadata field.
#### Marker: Watermark Snapshot
A message with marker type `W_SNAPSHOT` records a snapshot of the latest
watermark information across all active producers. Note that watermarks are
transactional, thus the snapshot also stores uncommitted watermarks (organized
by transaction ID).
#### Command: SendWatermark
The dispatcher uses a new command, `SendWatermark`, to send a watermark to a
consumer. The watermark simply consists of an `eventTime` field.
Watermark commands are sent when a consumer first attaches to a subscription
(if a watermark is available), and when the watermark changes.
### Broker
The broker materializes watermarks for consumers based on watermark messages
in the ledger. The broker aggregates the watermarks sent by the producers to
determine the effective watermark for a given offset in a topic or partition.
#### Watermark Cursor
Consumers use subscriptions to coordinate message delivery according to a
subscription mode and to track message acknowledgement. For each subscription,
the broker uses a secondary managed cursor to materialize the effective (i.e.
minimum) watermark across all producers. The watermark tracks _acknowledged
messages_, so that the watermark doesn't advance prematurely for any one
consumer. This allows the system to work well with all subscription types, for
example in failover mode where un-acked messages from one consumer may be
redirected to another consumer. Implementation-wise, the watermark cursor
tracks the _mark-delete point_ of the subscription's main cursor.
The watermark cursor is durable or non-durable, consistent with the
durability of the main cursor.
The mark-delete point of the watermark cursor tracks the latest watermark
snapshot.
#### Watermark Snapshots
The broker must be able to efficiently materialize a watermark from a given
topic position, to support subscriptions where the initial position is
specified, to support seeking, etc. It would be impractical to scan the ledger
from the beginning to gather the watermark messages. Also, watermark messages
aren’t retained indefinitely.
To solve this requirement, the broker periodically writes a snapshot message
to the ledger. The snapshot consists of the latest watermark keyed by producer
name, and a table of outstanding transactions, with each transaction consisting
of the latest watermark keyed by producer name.
New policy elements are defined to control snapshotting, e.g. limiting the
amount of message data between snapshots.
#### Watermark Generator
Within the broker, the watermark cursor is encapsulated in a
`WatermarkGenerator` that accepts tracking updates from the subscription.
```java
package org.apache.pulsar.broker.service.eventtime;
public interface WatermarkGenerator {
/**
* Get the current watermark.
* @return
*/
Long getWatermark();
/**
* Advance the tracking position of the watermark generator.
*/
CompletableFuture<Void> seek(Position position);
/**
* Register a listener for changes to the watermark.
*/
setListener(WatermarkGeneratorListener listener);
}
```
The generator materializes a watermark corresponding to a given tracking
position, with the semantic that the watermark reflects the minimum event
timestamp of any message subsequent to that position in the ledger. A managed
cursor is used for this purpose. The generator reads the ledger from the
earliest available message or from the latest watermark snapshot, vending
watermarks to the registered listener as watermark messages are observed.
The generator is careful not to read past the tracking position. As the
tracking position advances, reading continues.
The generator is also responsible for generating snapshots, and advancing
the mark-delete position of the watermark cursor such that the latest snapshot
is retained. When a subscription is created or seeked, the generator must read
backwards from the tracking position to locate the most recent snapshot.
The generator supports transactions. If an uncommitted watermark is
encountered, the generator holds it in state until the transaction is
committed. This state is also snapshotted.
#### Dispatcher
The dispatcher for a given subscription listens to the watermark generator.
The dispatcher forwards watermarks to all consumers and handles sending the
latest watermark to any new consumer. The latest watermark may be dispatched
without considering unacknowledged messages, and may use broadcast semantics in
all cases.
## Reject Alternatives
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]