This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6786a2f Topic compaction documentation (#1466)
6786a2f is described below
commit 6786a2f69e5ccd8e0800a9a62288c8418e1478ef
Author: Luc Perkins <[email protected]>
AuthorDate: Wed May 30 11:31:39 2018 -0700
Topic compaction documentation (#1466)
* add deduplication diagram
* add CLI docs to YAML file
* add docs for new broker.conf settings
* add link to streamlio blog post and remove errant TODOs
* add more to theory section
* add producer idempotency section
* add message deduplication cookbook
* add multiple new sections to cookbook
* add baseline notes to C&A doc plus a cookbook
* add missing flags to pulsar command docs
* begin adding admin docs to cookbook
* add basic stock ticker example and mention
* add empty message example
* more details in client doc
* more material for cookbook
* finish draft of Java client doc
* change admon text
* begin revamping concepts section
* add C+A section on Pulsar messages
* finish section on messages and add popover definition
* fix reader example
* complete description of sequence ID
* fix remaining reader examples in C+A doc
* finish conceptual docs for compaction
* add note to CLI docs
* begin addressing @ivankelly comments
* change header level
* provide clarification around BK ledger
---
site/_data/cli/pulsar-admin.yaml | 8 +-
site/_data/cli/pulsar.yaml | 5 +-
site/_data/popovers.yaml | 2 +-
site/_data/sidebar.yaml | 4 +-
site/docs/latest/cookbooks/compaction.md | 107 +++++++++++++++++++++
.../docs/latest/cookbooks/message-deduplication.md | 4 +-
.../getting-started/ConceptsAndArchitecture.md | 69 +++++++++++--
7 files changed, 184 insertions(+), 15 deletions(-)
diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml
index 2180619..110be5c 100644
--- a/site/_data/cli/pulsar-admin.yaml
+++ b/site/_data/cli/pulsar-admin.yaml
@@ -683,4 +683,10 @@ commands:
- flags: -b, --bundle
description: A bundle of the form `{start-boundary}_{end_boundary}`.
- flags: -n, --namespace
- description: The namespace as `tenant/namespace`, for example
`my-tenant/my-ns`.
+ description: The namespace as `property/cluster/namespace`, for example
`my-prop/my-cluster/my-ns`.
+- name: topics
+ description: Operations related to Pulsar topics of all kinds (both
persistent and non-persistent)
+ subcommands:
+ - name: compact
+ description: Runs a compaction operation on the specified topic
+ argument: topic-name
diff --git a/site/_data/cli/pulsar.yaml b/site/_data/cli/pulsar.yaml
index dd252a7..9f08852 100644
--- a/site/_data/cli/pulsar.yaml
+++ b/site/_data/cli/pulsar.yaml
@@ -58,13 +58,16 @@ commands:
description: Run a BookKeeper autorecovery daemon on the same host as the
Pulsar broker
default: 'false'
- name: compact-topic
- description: Run compaction against a Pulsar topic
+ description: Run compaction against a Pulsar topic (in a new process)
example: |
pulsar compact-topic \
--topic topic-to-compact
options:
- flags: -t, --topic
description: The Pulsar topic that you would like to compact
+ - flags: -c, --broker-conf
+ description: Configuration file for the broker
+ default: ${pulsarDirectory}/conf/broker.conf
- name: discovery
description: Run a discovery server
example: |
diff --git a/site/_data/popovers.yaml b/site/_data/popovers.yaml
index f04df85..c8cf38c 100644
--- a/site/_data/popovers.yaml
+++ b/site/_data/popovers.yaml
@@ -63,7 +63,7 @@ ledger:
def: An append-only data structure in BookKeeper that is used to
persistently store messages in Pulsar topics.
message:
q: What is a message in Pulsar?
- def: A
+ def: Messages are the basic "unit" of Pulsar. They're what producers publish
to topics and what consumers then consume from topics.
multi-tenancy:
q: What is multi-tenancy?
def: The ability to isolate namespaces, specify quotas, and configure
authentication and authorization on a per-property basis.
diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml
index 04d68aa..014c2dd 100644
--- a/site/_data/sidebar.yaml
+++ b/site/_data/sidebar.yaml
@@ -134,7 +134,9 @@ groups:
- title: Cookbooks
dir: cookbooks
docs:
- - title: Message deduplication
+ - title: Topic compaction
+ endpoint: compaction
+ - title: Managing message deduplication
endpoint: message-deduplication
- title: Non-persistent messaging
endpoint: non-persistent-messaging
diff --git a/site/docs/latest/cookbooks/compaction.md
b/site/docs/latest/cookbooks/compaction.md
new file mode 100644
index 0000000..261327e
--- /dev/null
+++ b/site/docs/latest/cookbooks/compaction.md
@@ -0,0 +1,107 @@
+---
+title: Topic compaction cookbook
+tags: [admin, clients, compaction]
+---
+
+Pulsar's [topic
compaction](../../getting-started/ConceptsAndArchitecture#compaction) feature
enables you to create **compacted** topics in which older, "obscured" entries
are pruned from the topic, allowing for faster reads through the topic's
history (which messages are deemed obscured/outdated/irrelevant will depend on
your use case).
+
+To use compaction:
+
+* You need to give messages keys, as topic compaction in Pulsar takes place on
a *per-key basis* (i.e. messages are compacted based on their key). For a stock
ticker use case, the stock symbol---e.g. `AAPL` or `GOOG`---could serve as the
key (more on this [below](#when)). Messages without keys will be left alone by
the compaction process.
+* You must manually [trigger](#trigger) compaction using the Pulsar
administrative API. This will both run a compaction operation *and* mark the
topic as a compacted topic.
+* Your {% popover consumers %} must be [configured](#config) to read from
compacted topics ([Java consumers](#java), for example, have a `readCompacted`
setting that must be set to `true`). If this configuration is not set,
consumers will still be able to read from the non-compacted topic.
+
+## When should I use compacted topics? {#when}
+
+The classic example of a topic that could benefit from compaction would be a
stock ticker topic through which {% popover consumers %} can access up-to-date
values for specific stocks. On a stock ticker topic you only care about the
most recent value of each stock; "historical values" don't matter, so there's
no need to read through outdated data when processing a topic's messages.
+
+{% include admonition.html type="info" content="For topics where older values
are important, for example when you need to process a long series of messages
in order, many of which have the same key, compaction is unnecessary and could
possibly even be harmful." %}
+
+{% include admonition.html type="warning" content="Compaction only works on
topics where each message has a key (as in the stock ticker example, where the
stock symbol serves as the key). Keys can be thought of as the axis along which
compaction is applied." %}
+
+## Triggering compaction {#trigger}
+
+In order to run compaction on a topic, you need to use the [`topics
compact`](../../CliTools#pulsar-admin-topics-compact) command for the
[`pulsar-admin`](../../CliTools#pulsar-admin) CLI tool. Here's an example:
+
+```bash
+$ bin/pulsar-admin topics compact \
+ persistent://my-tenant/my-namespace/my-topic
+```
+
+The `pulsar-admin` tool runs compaction via the Pulsar [REST
API](../../reference/RestApi). To run compaction in its own dedicated process,
i.e. *not* through the REST API, you can use the [`pulsar
compact-topic`](../../CliTools#pulsar-compact-topic) command. Here's an example:
+
+```bash
+$ bin/pulsar compact-topic \
+ --topic persistent://my-tenant-namespace/my-topic
+```
+
+The `pulsar compact-topic` command communicates with
[ZooKeeper](https://zookeeper.apache.org) directly. In order to establish
communication with ZooKeeper, though, the `pulsar` CLI tool will need to have a
valid [broker configuration](../../Configuration#broker). You can either supply
a proper configuration in `conf/broker.conf` or specify a non-default location
for the configuration:
+
+```bash
+$ bin/pulsar compact-topic \
+ --broker-conf /path/to/broker.conf \
+ --topic persistent://my-tenant/my-namespace/my-topic
+
+# If the configuration is in conf/broker.conf
+$ bin/pulsar compact-topic \
+ --topic persistent://my-tenant/my-namespace/my-topic
+```
+
+#### When should I trigger compaction?
+
+How often you [trigger compaction](#trigger) will vary widely based on the use
case. If you want a compacted topic to be extremely speedy on read, then you
should run compaction fairly frequently.
+
+{% include admonition.html type="warning" title="No automatic compaction"
content="Currently, all topic compaction in Pulsar must be initiated manually
via the [CLI](#trigger) or [REST API](../../reference/RestApi)." %}
+
+## Consumer configuration {#config}
+
+Pulsar consumers and readers need to be configured to read from compacted
topics. The sections below show you how to enable compacted topic reads for
Pulsar's language clients. If the
+
+{% include admonition.html type="warning" title="Java only"
content="Currently, only [Java](#java) clients can consume messages from
compacted topics." %}
+
+### Java
+
+In order to read from a compacted topic using a Java consumer, the
`readCompacted` parameter must be set to `true`. Here's an example consumer for
a compacted topic:
+
+```java
+Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
+ .topic("some-compacted-topic")
+ .readCompacted(true)
+ .subscribe();
+```
+
+As mentioned above, topic compaction in Pulsar works on a *per-key basis*.
That means that messages that you produce on compacted topics need to have keys
(the content of the key will depend on your use case). Messages that don't have
keys will be ignored by the compaction process. Here's an example Pulsar
message with a key:
+
+```java
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+
+Message<byte[]> msg = MessageBuilder.create()
+ .setContent(someByteArray)
+ .setKey("some-key")
+ .build();
+```
+
+The example below shows a message with a key being produced on a compacted
Pulsar topic:
+
+```java
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .build();
+
+Producer<byte[]> compactedTopicProducer = client.newProducer()
+ .topic("some-compacted-topic")
+ .create();
+
+Message<byte[]> msg = MessageBuilder.create()
+ .setContent(someByteArray)
+ .setKey("some-key")
+ .build();
+
+compactedTopicProducer.send(msg);
+```
\ No newline at end of file
diff --git a/site/docs/latest/cookbooks/message-deduplication.md
b/site/docs/latest/cookbooks/message-deduplication.md
index e63e0d5..9a63264 100644
--- a/site/docs/latest/cookbooks/message-deduplication.md
+++ b/site/docs/latest/cookbooks/message-deduplication.md
@@ -28,8 +28,6 @@ Parameter | Description | Default
`brokerDeduplicationEntriesInterval` | The number of entries after which a
deduplication informational snapshot is taken. A larger interval will lead to
fewer snapshots being taken, though this would also lengthen the topic recovery
time (the time required for entries published after the snapshot to be
replayed). | `1000`
`brokerDeduplicationProducerInactivityTimeoutMinutes` | The time of inactivity
(in minutes) after which the broker will discard deduplication information
related to a disconnected producer. | `360` (6 hours)
-Any configuration changes you make won't take effect until you re-start the
broker.
-
### Setting the broker-level default {#default}
By default, message deduplication is *disabled* on all Pulsar namespaces. To
enable it by default on all namespaces, set the `brokerDeduplicationEnabled`
parameter to `true` and re-start the broker.
@@ -98,7 +96,7 @@ producer = client.create_producer(
send_timeout_millis=0)
```
-## C++ clients {#cpp}
+### C++ clients {#cpp}
To enable message deduplication on a [C++
producer](../../clients/Cpp#producer), set the producer name using
`producer_name` and the timeout to 0 using `send_timeout_millis`. Here's an
example:
diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
index 1fe8776..6fcad59 100644
--- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
+++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
@@ -38,6 +38,22 @@ Pulsar's key features include:
* Multiple [subscription modes](#subscription-modes) for {% popover topics %}
([exclusive](#exclusive), [shared](#shared), and [failover](#failover))
* Guaranteed message delivery with [persistent message
storage](#persistent-storage) provided by [Apache
BookKeeper](http://bookkeeper.apache.org/)
+## Messages
+
+Messages are the basic "unit" of Pulsar. They're what {% popover producers %}
publish to {% popover topics %} and what {% popover consumers %} then consume
from topics (and {% popover acknowledge %} when the message has been
processed). Messages are the analogue of letters in a postal service system.
+
+Component | Purpose
+:---------|:-------
+Value / data payload | The data carried by the message. All Pulsar messages
carry raw bytes, although message data can also conform to data
[schemas](#schema-registry)
+Key | Messages can optionally be tagged with keys, which can be useful for
things like [topic compaction](#compaction)
+Properties | An optional key/value map of user-defined properties
+Producer name | The name of the {% popover producer %} that produced the
message (producers are automatically given default names, but you can apply
your own explicitly as well)
+Sequence ID | Each Pulsar message belongs to an ordered sequence on its {%
popover topic %}. A message's sequence ID is its ordering in that sequence.
+Publish time | The timestamp of when the message was published (automatically
applied by the {% popover producer %})
+Event time | An optional timestamp that applications can attach to the message
representing when something happened, e.g. when the message was processed. The
event time of a message is 0 if none is explicitly set.
+
+{% include admonition.html type="info" content="For a more in-depth breakdown
of Pulsar message contents, see the documentation on Pulsar's [binary
protocol](../../reference/BinaryProtocol)." %}
+
## Producers, consumers, topics, and subscriptions
Pulsar is built on the
[publish-subscribe](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern)
pattern, aka {% popover pub-sub %}. In this pattern, [producers](#producers)
publish messages to [topics](#topics). [Consumers](#consumers) can then
[subscribe](#subscription-modes) to those topics, process incoming messages,
and send an {% popover acknowledgement %} when processing is complete.
@@ -492,11 +508,11 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
-String topic = "persistent://public/default/reader-api-test";
-MessageId id = MessageId.earliest;
-
// Create a reader on a topic and for a specific message (and onward)
-Reader reader = pulsarClient.createReader(topic, id, new
ReaderConfiguration());
+Reader<byte[]> reader = pulsarClient.newReader()
+ .topic("reader-api-test")
+ .startMessageId(MessageId.earliest)
+ .create();
while (true) {
Message message = reader.readNext();
@@ -508,8 +524,10 @@ while (true) {
To create a reader that will read from the latest available message:
```java
-MessageId id = MessageId.latest;
-Reader reader = pulsarClient.createReader(topic, id, new
ReaderConfiguration());
+Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.latest)
+ .create();
```
To create a reader that will read from some message between earliest and
latest:
@@ -517,9 +535,44 @@ To create a reader that will read from some message
between earliest and latest:
```java
byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
-Reader reader = pulsarClient.createReader(topic, id, new
ReaderConfiguration());
+Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(id)
+ .create();
```
+## Topic compaction {#compaction}
+
+Pulsar was built with highly scalable [persistent
storage](#persistent-storage) of message data as a primary objective. Pulsar {%
popover topics %} enable you to persistently store as many unacknowledged
messages as you need while preserving message ordering. By default, Pulsar
stores *all* unacknowledged/unprocessed messages produced on a topic.
Accumulating many unacknowledged messages on a topic is necessary for many
Pulsar use cases but it can also be very time intensive for Pulsar { [...]
+
+{% include admonition.html type="success" content="For a more practical guide
to topic compaction, see the [Topic compaction
cookbook](../../cookbooks/compaction)." %}
+
+For some use cases consumers don't need a complete "image" of the topic log.
They may only need a few values to construct a more "shallow" image of the log,
perhaps even just the most recent value. For these kinds of use cases Pulsar
offers **topic compaction**. When you run compaction on a topic, Pulsar goes
through a topic's backlog and removes messages that are *obscured* by later
messages, i.e. it goes through the topic on a per-key basis and leaves only the
most recent message assoc [...]
+
+Pulsar's topic compaction feature:
+
+* Allos for much more efficient "rewind" through topic logs
+* Applies only to [persistent topics](#persistent-storage)
+* Is triggered manually via the command line. See the [Topic compaction
cookbook](../../cookbooks/compaction)
+* Is conceptually and operationally distinct from [retention and
expiry](#message-retention-and-expiry). Topic compaction *does*, however,
respect retention. If retention has removed a message from the message backlog
of a topic, the message will also not be readable from the compacted topic
ledger.
+
+{% include admonition.html type="info" title="Topic compaction example: the
stock ticker"
+ content="An example use case for a compacted Pulsar topic would be a stock
ticker topic. On a stock ticker topic, each message bears a timestamped dollar
value for stocks for purchase (with the message key holding the stock symbol,
e.g. `AAPL` or `GOOG`). With a stock ticker you may care only about the most
recent value(s) of the stock and have no interest in historical data (i.e. you
don't need to construct a complete image of the topic's sequence of messages
per key). Compaction wou [...]
+
+### How topic compaction works
+
+When topic compaction is triggered [via the CLI](../../cookbooks/compaction),
Pulsar will iterate over the entire topic from beginning to end. For each key
that it encounters the {% popover broker %} responsible will keep a record of
the latest occurrence of that key. When this iterative process is finished, the
broker will create a [BookKeeper ledger](#ledgers) to store the compacted topic.
+
+After that, the broker will make a second iteration through each message on
the topic. For each message, if the key matches the latest occurrence of that
key, then the key's data payload, message ID, and metadata will be written to
the new BookKeeper ledger (the one that was created when compaction was
manually initiated). If the key doesn't match the latest then the message will
be skipped and left alone. If any given message has an empty payload, it will
be skipped and considered delet [...]
+
+{% include admonition.html type="info" title="Compaction leaves the original
topic intact" %}
+
+In addition to performing compaction, Pulsar {% popover brokers %} listen for
changes on each topic's metadata. If the ledger for the topic changes:
+
+* Clients (consumers and readers) that have read compacted enabled will
attempt to read messages from a topic and either:
+ * Read from the topic like normal (if the message ID is greater than or
equal to the compaction horizon) or
+ * Read beginning at the compaction horizon (if the message ID is lower than
the compaction horizon)
+
## Schema registry
Type safety is extremely important in any application built around a message
bus like Pulsar. {% popover Producers %} and {% popover consumers %} need some
kind of mechanism for coordinating types at the {% popover topic %} level lest
a wide variety of potential problems arise (for example serialization and
deserialization issues). Applications typically adopt one of two basic
approaches to type safety in messaging:
@@ -589,4 +642,4 @@ For usage instructions, see the documentation for your
preferred client library:
* [Java](../../clients/Java#schemas)
-{% include admonition.html type="success" content="Support for other schema
formats will be added in future releases of Pulsar." %}
\ No newline at end of file
+{% include admonition.html type="success" content="Support for other schema
formats will be added in future releases of Pulsar." %}
--
To stop receiving notification emails like this one, please contact
[email protected].