This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d4ab528  [website][upgrade]feat: docs migration - 2.7.1 / Cookbooks 
(#12678)
d4ab528 is described below

commit d4ab528add2b195504e693c1d5cd05fccdc00fc2
Author: Li Li <[email protected]>
AuthorDate: Wed Nov 10 12:26:33 2021 +0800

    [website][upgrade]feat: docs migration - 2.7.1 / Cookbooks (#12678)
    
    * [website][upgrade]feat: docs migration - 2.7.1 / Admin API
    
    Signed-off-by: LiLi <[email protected]>
    
    * [website][upgrade]feat: docs migration - 2.7.1 / Adaptors
    
    Signed-off-by: LiLi <[email protected]>
    
    * [website][upgrade]feat: docs migration - 2.7.1 / Cookbooks
    
    Signed-off-by: LiLi <[email protected]>
---
 .../version-2.7.1/cookbooks-bookkeepermetadata.md  |  25 ++
 .../version-2.7.1/cookbooks-compaction.md          | 146 +++++++
 .../version-2.7.1/cookbooks-deduplication.md       | 159 ++++++++
 .../version-2.7.1/cookbooks-encryption.md          | 188 +++++++++
 .../version-2.7.1/cookbooks-message-queue.md       | 130 +++++++
 .../version-2.7.1/cookbooks-non-persistent.md      |  67 ++++
 .../version-2.7.1/cookbooks-retention-expiry.md    | 421 +++++++++++++++++++++
 .../versioned_sidebars/version-2.7.1-sidebars.json |  34 ++
 8 files changed, 1170 insertions(+)

diff --git 
a/site2/website-next/versioned_docs/version-2.7.1/cookbooks-bookkeepermetadata.md
 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-bookkeepermetadata.md
new file mode 100644
index 0000000..ee8df2b
--- /dev/null
+++ 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-bookkeepermetadata.md
@@ -0,0 +1,25 @@
+---
+id: cookbooks-bookkeepermetadata
+title: BookKeeper Ledger Metadata
+original_id: cookbooks-bookkeepermetadata
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+Pulsar stores data on BookKeeper ledgers, you can understand the contents of a 
ledger by inspecting the metadata attached to the ledger.
+Such metadata are stored on ZooKeeper and they are readable using BookKeeper 
APIs.
+
+Description of current metadata:
+
+| Scope  | Metadata name | Metadata value |
+| ------------- | ------------- | ------------- |
+| All ledgers  | application  | 'pulsar' |
+| All ledgers  | component  | 'managed-ledger', 'schema', 'compacted-topic' |
+| Managed ledgers | pulsar/managed-ledger | name of the ledger |
+| Cursor | pulsar/cursor | name of the cursor |
+| Compacted topic | pulsar/compactedTopic | name of the original topic |
+| Compacted topic | pulsar/compactedTo | id of the last compacted message |
+
+
diff --git 
a/site2/website-next/versioned_docs/version-2.7.1/cookbooks-compaction.md 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-compaction.md
new file mode 100644
index 0000000..ebcf4ba
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-compaction.md
@@ -0,0 +1,146 @@
+---
+id: cookbooks-compaction
+title: Topic compaction
+sidebar_label: "Topic compaction"
+original_id: cookbooks-compaction
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+Pulsar's [topic compaction](concepts-topic-compaction.md#compaction) feature 
enables you to create **compacted** topics in which older, "obscured" entries 
are pruned from the topic, allowing for faster reads through the topic's 
history (which messages are deemed obscured/outdated/irrelevant will depend on 
your use case).
+
+To use compaction:
+
+* You need to give messages keys, as topic compaction in Pulsar takes place on 
a *per-key basis* (i.e. messages are compacted based on their key). For a stock 
ticker use case, the stock symbol---e.g. `AAPL` or `GOOG`---could serve as the 
key (more on this [below](#when-should-i-use-compacted-topics)). Messages 
without keys will be left alone by the compaction process.
+* Compaction can be configured to run 
[automatically](#configuring-compaction-to-run-automatically), or you can 
manually [trigger](#triggering-compaction-manually) compaction using the Pulsar 
administrative API.
+* Your consumers must be [configured](#consumer-configuration) to read from 
compacted topics ([Java consumers](#java), for example, have a `readCompacted` 
setting that must be set to `true`). If this configuration is not set, 
consumers will still be able to read from the non-compacted topic.
+
+
+> Compaction only works on messages that have keys (as in the stock ticker 
example the stock symbol serves as the key for each message). Keys can thus be 
thought of as the axis along which compaction is applied. Messages that don't 
have keys are simply ignored by compaction.
+
+## When should I use compacted topics?
+
+The classic example of a topic that could benefit from compaction would be a 
stock ticker topic through which consumers can access up-to-date values for 
specific stocks. Imagine a scenario in which messages carrying stock value data 
use the stock symbol as the key (`GOOG`, `AAPL`, `TWTR`, etc.). Compacting this 
topic would give consumers on the topic two options:
+
+* They can read from the "original," non-compacted topic in case they need 
access to "historical" values, i.e. the entirety of the topic's messages.
+* They can read from the compacted topic if they only want to see the most 
up-to-date messages.
+
+Thus, if you're using a Pulsar topic called `stock-values`, some consumers 
could have access to all messages in the topic (perhaps because they're 
performing some kind of number crunching of all values in the last hour) while 
the consumers used to power the real-time stock ticker only see the compacted 
topic (and thus aren't forced to process outdated messages). Which variant of 
the topic any given consumer pulls messages from is determined by the 
consumer's [configuration](#consumer-con [...]
+
+> One of the benefits of compaction in Pulsar is that you aren't forced to 
choose between compacted and non-compacted topics, as the compaction process 
leaves the original topic as-is and essentially adds an alternate topic. In 
other words, you can run compaction on a topic and consumers that need access 
to the non-compacted version of the topic will not be adversely affected.
+
+
+## Configuring compaction to run automatically
+
+Tenant administrators can configure a policy for compaction at the namespace 
level. The policy specifies how large the topic backlog can grow before 
compaction is triggered.
+
+For example, to trigger compaction when the backlog reaches 100MB:
+
+```bash
+
+$ bin/pulsar-admin namespaces set-compaction-threshold \
+  --threshold 100M my-tenant/my-namespace
+
+```
+
+Configuring the compaction threshold on a namespace will apply to all topics 
within that namespace.
+
+## Triggering compaction manually
+
+In order to run compaction on a topic, you need to use the [`topics 
compact`](reference-pulsar-admin.md#topics-compact) command for the 
[`pulsar-admin`](reference-pulsar-admin) CLI tool. Here's an example:
+
+```bash
+
+$ bin/pulsar-admin topics compact \
+  persistent://my-tenant/my-namespace/my-topic
+
+```
+
+The `pulsar-admin` tool runs compaction via the Pulsar {@inject: rest:REST:/} 
API. To run compaction in its own dedicated process, i.e. *not* through the 
REST API, you can use the [`pulsar 
compact-topic`](reference-cli-tools.md#pulsar-compact-topic) command. Here's an 
example:
+
+```bash
+
+$ bin/pulsar compact-topic \
+  --topic persistent://my-tenant-namespace/my-topic
+
+```
+
+> Running compaction in its own process is recommended when you want to avoid 
interfering with the broker's performance. Broker performance should only be 
affected, however, when running compaction on topics with a large keyspace (i.e 
when there are many keys on the topic). The first phase of the compaction 
process keeps a copy of each key in the topic, which can create memory pressure 
as the number of keys grows. Using the `pulsar-admin topics compact` command to 
run compaction through  [...]
+
+The `pulsar compact-topic` command communicates with 
[ZooKeeper](https://zookeeper.apache.org) directly. In order to establish 
communication with ZooKeeper, though, the `pulsar` CLI tool will need to have a 
valid [broker configuration](reference-configuration.md#broker). You can either 
supply a proper configuration in `conf/broker.conf` or specify a non-default 
location for the configuration:
+
+```bash
+
+$ bin/pulsar compact-topic \
+  --broker-conf /path/to/broker.conf \
+  --topic persistent://my-tenant/my-namespace/my-topic
+
+# If the configuration is in conf/broker.conf
+$ bin/pulsar compact-topic \
+  --topic persistent://my-tenant/my-namespace/my-topic
+
+```
+
+#### When should I trigger compaction?
+
+How often you [trigger compaction](#triggering-compaction-manually) will vary 
widely based on the use case. If you want a compacted topic to be extremely 
speedy on read, then you should run compaction fairly frequently.
+
+## Consumer configuration
+
+Pulsar consumers and readers need to be configured to read from compacted 
topics. The sections below show you how to enable compacted topic reads for 
Pulsar's language clients. If the
+
+### Java
+
+In order to read from a compacted topic using a Java consumer, the 
`readCompacted` parameter must be set to `true`. Here's an example consumer for 
a compacted topic:
+
+```java
+
+Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
+        .topic("some-compacted-topic")
+        .readCompacted(true)
+        .subscribe();
+
+```
+
+As mentioned above, topic compaction in Pulsar works on a *per-key basis*. 
That means that messages that you produce on compacted topics need to have keys 
(the content of the key will depend on your use case). Messages that don't have 
keys will be ignored by the compaction process. Here's an example Pulsar 
message with a key:
+
+```java
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+
+Message<byte[]> msg = MessageBuilder.create()
+        .setContent(someByteArray)
+        .setKey("some-key")
+        .build();
+
+```
+
+The example below shows a message with a key being produced on a compacted 
Pulsar topic:
+
+```java
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar://localhost:6650")
+        .build();
+
+Producer<byte[]> compactedTopicProducer = client.newProducer()
+        .topic("some-compacted-topic")
+        .create();
+
+Message<byte[]> msg = MessageBuilder.create()
+        .setContent(someByteArray)
+        .setKey("some-key")
+        .build();
+
+compactedTopicProducer.send(msg);
+
+```
+
diff --git 
a/site2/website-next/versioned_docs/version-2.7.1/cookbooks-deduplication.md 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-deduplication.md
new file mode 100644
index 0000000..332228c
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-deduplication.md
@@ -0,0 +1,159 @@
+---
+id: cookbooks-deduplication
+title: Message deduplication
+sidebar_label: "Message deduplication"
+original_id: cookbooks-deduplication
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+When **Message deduplication** is enabled, it ensures that each message 
produced on Pulsar topics is persisted to disk *only once*, even if the message 
is produced more than once. Message deduplication is handled automatically on 
the server side. 
+
+To use message deduplication in Pulsar, you need to configure your Pulsar 
brokers and clients.
+
+## How it works
+
+You can enable or disable message deduplication at the namespace level or the 
topic level. By default, it is disabled on all namespaces or topics. You can 
enable it in the following ways:
+
+* Enable deduplication for all namespaces/topics at the broker-level.
+* Enable deduplication for a specific namespace with the `pulsar-admin 
namespaces` interface.
+* Enable deduplication for a specific topic with the `pulsar-admin topics` 
interface.
+
+## Configure message deduplication
+
+You can configure message deduplication in Pulsar using the 
[`broker.conf`](reference-configuration.md#broker) configuration file. The 
following deduplication-related parameters are available.
+
+Parameter | Description | Default
+:---------|:------------|:-------
+`brokerDeduplicationEnabled` | Sets the default behavior for message 
deduplication in the Pulsar broker. If it is set to `true`, message 
deduplication is enabled on all namespaces/topics. If it is set to `false`, you 
have to enable or disable deduplication at the namespace level or the topic 
level. | `false`
+`brokerDeduplicationMaxNumberOfProducers` | The maximum number of producers 
for which information is stored for deduplication purposes. | `10000`
+`brokerDeduplicationEntriesInterval` | The number of entries after which a 
deduplication informational snapshot is taken. A larger interval leads to fewer 
snapshots being taken, though this lengthens the topic recovery time (the time 
required for entries published after the snapshot to be replayed). | `1000`
+`brokerDeduplicationProducerInactivityTimeoutMinutes` | The time of inactivity 
(in minutes) after which the broker discards deduplication information related 
to a disconnected producer. | `360` (6 hours)
+
+### Set default value at the broker-level
+
+By default, message deduplication is *disabled* on all Pulsar 
namespaces/topics. To enable it on all namespaces/topics, set the 
`brokerDeduplicationEnabled` parameter to `true` and re-start the broker.
+
+Even if you set the value for `brokerDeduplicationEnabled`, enabling or 
disabling via Pulsar admin CLI overrides the default settings at the 
broker-level.
+
+### Enable message deduplication
+
+Though message deduplication is disabled by default at the broker level, you 
can enable message deduplication for a specific namespace or topic using the 
[`pulsar-admin namespaces 
set-deduplication`](reference-pulsar-admin.md#namespace-set-deduplication) or 
the [`pulsar-admin topics 
set-deduplication`](reference-pulsar-admin.md#topic-set-deduplication) command. 
You can use the `--enable`/`-e` flag and specify the namespace/topic. 
+
+The following example shows how to enable message deduplication at the 
namespace level.
+
+```bash
+
+$ bin/pulsar-admin namespaces set-deduplication \
+  public/default \
+  --enable # or just -e
+
+```
+
+### Disable message deduplication
+
+Even if you enable message deduplication at the broker level, you can disable 
message deduplication for a specific namespace or topic using the 
[`pulsar-admin namespace 
set-deduplication`](reference-pulsar-admin.md#namespace-set-deduplication) or 
the [`pulsar-admin topics 
set-deduplication`](reference-pulsar-admin.md#topic-set-deduplication) command. 
Use the `--disable`/`-d` flag and specify the namespace/topic.
+
+The following example shows how to disable message deduplication at the 
namespace level.
+
+```bash
+
+$ bin/pulsar-admin namespaces set-deduplication \
+  public/default \
+  --disable # or just -d
+
+```
+
+## Pulsar clients
+
+If you enable message deduplication in Pulsar brokers, you need complete the 
following tasks for your client producers:
+
+1. Specify a name for the producer.
+1. Set the message timeout to `0` (namely, no timeout).
+
+The instructions for Java, Python, and C++ clients are different.
+
+<Tabs 
+  defaultValue="Java clients"
+  values={[
+  {
+    "label": "Java clients",
+    "value": "Java clients"
+  },
+  {
+    "label": "Python clients",
+    "value": "Python clients"
+  },
+  {
+    "label": "C++ clients",
+    "value": "C++ clients"
+  }
+]}>
+<TabItem value="Java clients">
+
+To enable message deduplication on a [Java 
producer](client-libraries-java.md#producers), set the producer name using the 
`producerName` setter, and set the timeout to `0` using the `sendTimeout` 
setter. 
+
+```java
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import java.util.concurrent.TimeUnit;
+
+PulsarClient pulsarClient = PulsarClient.builder()
+        .serviceUrl("pulsar://localhost:6650")
+        .build();
+Producer producer = pulsarClient.newProducer()
+        .producerName("producer-1")
+        .topic("persistent://public/default/topic-1")
+        .sendTimeout(0, TimeUnit.SECONDS)
+        .create();
+
+```
+
+</TabItem>
+<TabItem value="Python clients">
+
+To enable message deduplication on a [Python 
producer](client-libraries-python.md#producers), set the producer name using 
`producer_name`, and set the timeout to `0` using `send_timeout_millis`. 
+
+```python
+
+import pulsar
+
+client = pulsar.Client("pulsar://localhost:6650")
+producer = client.create_producer(
+    "persistent://public/default/topic-1",
+    producer_name="producer-1",
+    send_timeout_millis=0)
+
+```
+
+</TabItem>
+<TabItem value="C++ clients">
+
+To enable message deduplication on a [C++ 
producer](client-libraries-cpp.md#producer), set the producer name using 
`producer_name`, and set the timeout to `0` using `send_timeout_millis`. 
+
+```cpp
+
+#include <pulsar/Client.h>
+
+std::string serviceUrl = "pulsar://localhost:6650";
+std::string topic = "persistent://some-tenant/ns1/topic-1";
+std::string producerName = "producer-1";
+
+Client client(serviceUrl);
+
+ProducerConfiguration producerConfig;
+producerConfig.setSendTimeout(0);
+producerConfig.setProducerName(producerName);
+
+Producer producer;
+
+Result result = client.createProducer(topic, producerConfig, producer);
+
+```
+
+</TabItem>
+
+</Tabs>
\ No newline at end of file
diff --git 
a/site2/website-next/versioned_docs/version-2.7.1/cookbooks-encryption.md 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-encryption.md
new file mode 100644
index 0000000..c782e69
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-encryption.md
@@ -0,0 +1,188 @@
+---
+id: cookbooks-encryption
+title: Pulsar Encryption
+sidebar_label: "Encryption"
+original_id: cookbooks-encryption
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+Pulsar encryption allows applications to encrypt messages at the producer and 
decrypt at the consumer. Encryption is performed using the public/private key 
pair configured by the application. Encrypted messages can only be decrypted by 
consumers with a valid key.
+
+## Asymmetric and symmetric encryption
+
+Pulsar uses dynamically generated symmetric AES key to encrypt messages(data). 
The AES key(data key) is encrypted using application provided ECDSA/RSA key 
pair, as a result there is no need to share the secret with everyone.
+
+Key is a public/private key pair used for encryption/decryption. The producer 
key is the public key, and the consumer key is the private key of the key pair.
+
+The application configures the producer with the public  key. This key is used 
to encrypt the AES data key. The encrypted data key is sent as part of message 
header. Only entities with the private key(in this case the consumer) will be 
able to decrypt the data key which is used to decrypt the message.
+
+A message can be encrypted with more than one key.  Any one of the keys used 
for encrypting the message is sufficient to decrypt the message
+
+Pulsar does not store the encryption key anywhere in the pulsar service. If 
you lose/delete the private key, your message is irretrievably lost, and is 
unrecoverable
+
+## Producer
+![alt text](/assets/pulsar-encryption-producer.jpg "Pulsar Encryption 
Producer")
+
+## Consumer
+![alt text](/assets/pulsar-encryption-consumer.jpg "Pulsar Encryption 
Consumer")
+
+## Here are the steps to get started:
+
+1. Create your ECDSA or RSA public/private key pair.
+
+```shell
+
+openssl ecparam -name secp521r1 -genkey -param_enc explicit -out 
test_ecdsa_privkey.pem
+openssl ec -in test_ecdsa_privkey.pem -pubout -outform pkcs8 -out 
test_ecdsa_pubkey.pem
+
+```
+
+2. Add the public and private key to the key management and configure your 
producers to retrieve public keys and consumers clients to retrieve private 
keys.
+3. Implement CryptoKeyReader::getPublicKey() interface from producer and 
CryptoKeyReader::getPrivateKey() interface from consumer, which will be invoked 
by Pulsar client to load the key.
+4. Add encryption key to producer configuration: 
conf.addEncryptionKey("myapp.key")
+5. Add CryptoKeyReader implementation to producer/consumer config: 
conf.setCryptoKeyReader(keyReader)
+6. Sample producer application:
+
+```java
+
+class RawFileKeyReader implements CryptoKeyReader {
+
+    String publicKeyFile = "";
+    String privateKeyFile = "";
+
+    RawFileKeyReader(String pubKeyFile, String privKeyFile) {
+        publicKeyFile = pubKeyFile;
+        privateKeyFile = privKeyFile;
+    }
+
+    @Override
+    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
keyMeta) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        try {
+            keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
+        } catch (IOException e) {
+            System.out.println("ERROR: Failed to read public key from file " + 
publicKeyFile);
+            e.printStackTrace();
+        }
+        return keyInfo;
+    }
+
+    @Override
+    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
keyMeta) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        try {
+            keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
+        } catch (IOException e) {
+            System.out.println("ERROR: Failed to read private key from file " 
+ privateKeyFile);
+            e.printStackTrace();
+        }
+        return keyInfo;
+    }
+}
+PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080";);
+
+ProducerConfiguration prodConf = new ProducerConfiguration();
+prodConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", 
"test_ecdsa_privkey.pem"));
+prodConf.addEncryptionKey("myappkey");
+
+Producer producer = 
pulsarClient.createProducer("persistent://my-tenant/my-ns/my-topic", prodConf);
+
+for (int i = 0; i < 10; i++) {
+    producer.send("my-message".getBytes());
+}
+
+pulsarClient.close();
+
+```
+
+7. Sample Consumer Application:
+
+```java
+
+class RawFileKeyReader implements CryptoKeyReader {
+
+    String publicKeyFile = "";
+    String privateKeyFile = "";
+
+    RawFileKeyReader(String pubKeyFile, String privKeyFile) {
+        publicKeyFile = pubKeyFile;
+        privateKeyFile = privKeyFile;
+    }
+
+    @Override
+    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
keyMeta) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        try {
+            keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
+        } catch (IOException e) {
+            System.out.println("ERROR: Failed to read public key from file " + 
publicKeyFile);
+            e.printStackTrace();
+        }
+        return keyInfo;
+    }
+
+    @Override
+    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
keyMeta) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        try {
+            keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
+        } catch (IOException e) {
+            System.out.println("ERROR: Failed to read private key from file " 
+ privateKeyFile);
+            e.printStackTrace();
+        }
+        return keyInfo;
+    }
+}
+
+ConsumerConfiguration consConf = new ConsumerConfiguration();
+consConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", 
"test_ecdsa_privkey.pem"));
+PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080";);
+Consumer consumer = 
pulsarClient.subscribe("persistent://my-tenant//my-ns/my-topic", 
"my-subscriber-name", consConf);
+Message msg = null;
+
+for (int i = 0; i < 10; i++) {
+    msg = consumer.receive();
+    // do something
+    System.out.println("Received: " + new String(msg.getData()));
+}
+
+// Acknowledge the consumption of all messages at once
+consumer.acknowledgeCumulative(msg);
+pulsarClient.close();
+
+```
+
+## Key rotation
+Pulsar generates new AES data key every 4 hours or after a certain number of 
messages are published. The asymmetric public key is automatically fetched by 
producer every 4 hours by calling CryptoKeyReader::getPublicKey() to retrieve 
the latest version.
+
+## Enabling encryption at the producer application:
+If you produce messages that are consumed across application boundaries, you 
need to ensure that consumers in other applications have access to one of the 
private keys that can decrypt the messages.  This can be done in two ways:
+1. The consumer application provides you access to their public key, which you 
add to your producer keys
+1. You grant access to one of the private keys from the pairs used by producer 
+
+In some cases, the producer may want to encrypt the messages with multiple 
keys. For this, add all such keys to the config. Consumer will be able to 
decrypt the message, as long as it has access to at least one of the keys.
+
+E.g: If messages needs to be encrypted using 2 keys myapp.messagekey1 and 
myapp.messagekey2,
+
+```java
+
+conf.addEncryptionKey("myapp.messagekey1");
+conf.addEncryptionKey("myapp.messagekey2");
+
+```
+
+## Decrypting encrypted messages at the consumer application:
+Consumers require access one of the private keys to decrypt messages produced 
by the producer. If you would like to receive encrypted messages, create a 
public/private key and give your public key to the producer application to 
encrypt messages using your public key.
+
+## Handling Failures:
+* Producer/ Consumer loses access to the key
+  * Producer action will fail indicating the cause of the failure. Application 
has the option to proceed with sending unencrypted message in such cases. Call 
conf.setCryptoFailureAction(ProducerCryptoFailureAction) to control the 
producer behavior. The default behavior is to fail the request.
+  * If consumption failed due to decryption failure or missing keys in 
consumer, application has the option to consume the encrypted message or 
discard it. Call conf.setCryptoFailureAction(ConsumerCryptoFailureAction) to 
control the consumer behavior. The default behavior is to fail the request.
+Application will never be able to decrypt the messages if the private key is 
permanently lost.
+* Batch messaging
+  * If decryption fails and the message contain batch messages, client will 
not be able to retrieve individual messages in the batch, hence message 
consumption fails even if conf.setCryptoFailureAction() is set to CONSUME.
+* If decryption fails, the message consumption stops and application will 
notice backlog growth in addition to decryption failure messages in the client 
log. If application does not have access to the private key to decrypt the 
message, the only option is to skip/discard backlogged messages. 
+
diff --git 
a/site2/website-next/versioned_docs/version-2.7.1/cookbooks-message-queue.md 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-message-queue.md
new file mode 100644
index 0000000..f30da44
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-message-queue.md
@@ -0,0 +1,130 @@
+---
+id: cookbooks-message-queue
+title: Using Pulsar as a message queue
+sidebar_label: "Message queue"
+original_id: cookbooks-message-queue
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+Message queues are essential components of many large-scale data 
architectures. If every single work object that passes through your system 
absolutely *must* be processed in spite of the slowness or downright failure of 
this or that system component, there's a good chance that you'll need a message 
queue to step in and ensure that unprocessed data is retained---with correct 
ordering---until the required actions are taken.
+
+Pulsar is a great choice for a message queue because:
+
+* it was built with [persistent message 
storage](concepts-architecture-overview.md#persistent-storage) in mind
+* it offers automatic load balancing across 
[consumers](reference-terminology.md#consumer) for messages on a topic (or 
custom load balancing if you wish)
+
+> You can use the same Pulsar installation to act as a real-time message bus 
and as a message queue if you wish (or just one or the other). You can set 
aside some topics for real-time purposes and other topics for message queue 
purposes (or use specific namespaces for either purpose if you wish).
+
+
+# Client configuration changes
+
+To use a Pulsar [topic](reference-terminology.md#topic) as a message queue, 
you should distribute the receiver load on that topic across several consumers 
(the optimal number of consumers will depend on the load). Each consumer must:
+
+* Establish a [shared subscription](concepts-messaging.md#shared) and use the 
same subscription name as the other consumers (otherwise the subscription is 
not shared and the consumers can't act as a processing ensemble)
+* If you'd like to have tight control over message dispatching across 
consumers, set the consumers' **receiver queue** size very low (potentially 
even to 0 if necessary). Each Pulsar 
[consumer](reference-terminology.md#consumer) has a receiver queue that 
determines how many messages the consumer will attempt to fetch at a time. A 
receiver queue of 1000 (the default), for example, means that the consumer will 
attempt to process 1000 messages from the topic's backlog upon connection. 
Setti [...]
+
+   The downside to restricting the receiver queue size of consumers is that 
that limits the potential throughput of those consumers and cannot be used with 
[partitioned topics](reference-terminology.md#partitioned-topic). Whether the 
performance/control trade-off is worthwhile will depend on your use case.
+
+## Java clients
+
+Here's an example Java consumer configuration that uses a shared subscription:
+
+```java
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+String SERVICE_URL = "pulsar://localhost:6650";
+String TOPIC = "persistent://public/default/mq-topic-1";
+String subscription = "sub-1";
+
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl(SERVICE_URL)
+        .build();
+
+Consumer consumer = client.newConsumer()
+        .topic(TOPIC)
+        .subscriptionName(subscription)
+        .subscriptionType(SubscriptionType.Shared)
+        // If you'd like to restrict the receiver queue size
+        .receiverQueueSize(10)
+        .subscribe();
+
+```
+
+## Python clients
+
+Here's an example Python consumer configuration that uses a shared 
subscription:
+
+```python
+
+from pulsar import Client, ConsumerType
+
+SERVICE_URL = "pulsar://localhost:6650"
+TOPIC = "persistent://public/default/mq-topic-1"
+SUBSCRIPTION = "sub-1"
+
+client = Client(SERVICE_URL)
+consumer = client.subscribe(
+    TOPIC,
+    SUBSCRIPTION,
+    # If you'd like to restrict the receiver queue size
+    receiver_queue_size=10,
+    consumer_type=ConsumerType.Shared)
+
+```
+
+## C++ clients
+
+Here's an example C++ consumer configuration that uses a shared subscription:
+
+```cpp
+
+#include <pulsar/Client.h>
+
+std::string serviceUrl = "pulsar://localhost:6650";
+std::string topic = "persistent://public/defaultmq-topic-1";
+std::string subscription = "sub-1";
+
+Client client(serviceUrl);
+
+ConsumerConfiguration consumerConfig;
+consumerConfig.setConsumerType(ConsumerType.ConsumerShared);
+// If you'd like to restrict the receiver queue size
+consumerConfig.setReceiverQueueSize(10);
+
+Consumer consumer;
+
+Result result = client.subscribe(topic, subscription, consumerConfig, 
consumer);
+
+```
+
+## Go clients
+
+Here is an example of a Go consumer configuration that uses the shared 
subscription.
+
+```go
+
+import "github.com/apache/pulsar-client-go/pulsar"
+client, err := pulsar.NewClient(pulsar.ClientOptions{
+    URL: "pulsar://localhost:6650",
+})
+if err != nil {
+    log.Fatal(err)
+}
+consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+    Topic:             "persistent://public/default/mq-topic-1",
+    SubscriptionName:  "sub-1",
+    Type:              pulsar.Shared,
+    ReceiverQueueSize: 10, // If you'd like to restrict the receiver queue size
+})
+if err != nil {
+    log.Fatal(err)
+}
+
+```
+
diff --git 
a/site2/website-next/versioned_docs/version-2.7.1/cookbooks-non-persistent.md 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-non-persistent.md
new file mode 100644
index 0000000..f10d837
--- /dev/null
+++ 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-non-persistent.md
@@ -0,0 +1,67 @@
+---
+id: cookbooks-non-persistent
+title: Non-persistent messaging
+sidebar_label: "Non-persistent messaging"
+original_id: cookbooks-non-persistent
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+**Non-persistent topics** are Pulsar topics in which message data is *never* 
[persistently stored](concepts-architecture-overview.md#persistent-storage) and 
kept only in memory. This cookbook provides:
+
+* A basic [conceptual overview](#overview) of non-persistent topics
+* Information about [configurable parameters](#configuration) related to 
non-persistent topics
+* A guide to the [CLI interface](#cli) for managing non-persistent topics
+
+## Overview
+
+By default, Pulsar persistently stores *all* unacknowledged messages on 
multiple [BookKeeper](#persistent-storage) bookies (storage nodes). Data for 
messages on persistent topics can thus survive broker restarts and subscriber 
failover.
+
+Pulsar also, however, supports **non-persistent topics**, which are topics on 
which messages are *never* persisted to disk and live only in memory. When 
using non-persistent delivery, killing a Pulsar 
[broker](reference-terminology.md#broker) or disconnecting a subscriber to a 
topic means that all in-transit messages are lost on that (non-persistent) 
topic, meaning that clients may see message loss.
+
+Non-persistent topics have names of this form (note the `non-persistent` in 
the name):
+
+```http
+
+non-persistent://tenant/namespace/topic
+
+```
+
+> For more high-level information about non-persistent topics, see the 
[Concepts and Architecture](concepts-messaging.md#non-persistent-topics) 
documentation.
+
+## Using
+
+> In order to use non-persistent topics, they must be [enabled](#enabling) in 
your Pulsar broker configuration.
+
+In order to use non-persistent topics, you only need to differentiate them by 
name when interacting with them. This [`pulsar-client 
produce`](reference-cli-tools.md#pulsar-client-produce) command, for example, 
would produce one message on a non-persistent topic in a standalone cluster:
+
+```bash
+
+$ bin/pulsar-client produce non-persistent://public/default/example-np-topic \
+  --num-produce 1 \
+  --messages "This message will be stored only in memory"
+
+```
+
+> For a more thorough guide to non-persistent topics from an administrative 
perspective, see the [Non-persistent topics](admin-api-topics) guide.
+
+## Enabling
+
+In order to enable non-persistent topics in a Pulsar broker, the 
[`enableNonPersistentTopics`](reference-configuration.md#broker-enableNonPersistentTopics)
 must be set to `true`. This is the default, and so you won't need to take any 
action to enable non-persistent messaging.
+
+
+> #### Configuration for standalone mode
+> If you're running Pulsar in standalone mode, the same configurable 
parameters are available but in the 
[`standalone.conf`](reference-configuration.md#standalone) configuration file. 
+
+If you'd like to enable *only* non-persistent topics in a broker, you can set 
the 
[`enablePersistentTopics`](reference-configuration.md#broker-enablePersistentTopics)
 parameter to `false` and the `enableNonPersistentTopics` parameter to `true`.
+
+## Managing with cli
+
+Non-persistent topics can be managed using the [`pulsar-admin 
non-persistent`](reference-pulsar-admin.md#non-persistent) command-line 
interface. With that interface you can perform actions like [create a 
partitioned non-persistent 
topic](reference-pulsar-admin.md#non-persistent-create-partitioned-topic), get 
[stats](reference-pulsar-admin.md#non-persistent-stats) for a non-persistent 
topic, [list](reference-pulsar-admin) non-persistent topics under a namespace, 
and more.
+
+## Using with Pulsar clients
+
+You shouldn't need to make any changes to your Pulsar clients to use 
non-persistent messaging beyond making sure that you use proper [topic 
names](#using) with `non-persistent` as the topic type.
+
diff --git 
a/site2/website-next/versioned_docs/version-2.7.1/cookbooks-retention-expiry.md 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-retention-expiry.md
new file mode 100644
index 0000000..60441a3
--- /dev/null
+++ 
b/site2/website-next/versioned_docs/version-2.7.1/cookbooks-retention-expiry.md
@@ -0,0 +1,421 @@
+---
+id: cookbooks-retention-expiry
+title: Message retention and expiry
+sidebar_label: "Message retention and expiry"
+original_id: cookbooks-retention-expiry
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+Pulsar brokers are responsible for handling messages that pass through Pulsar, 
including [persistent 
storage](concepts-architecture-overview.md#persistent-storage) of messages. By 
default, for each topic, brokers only retain messages that are in at least one 
backlog. A backlog is the set of unacknowledged messages for a particular 
subscription. As a topic can have multiple subscriptions, a topic can have 
multiple backlogs.
+
+As a consequence, no messages are retained (by default) on a topic that has 
not had any subscriptions created for it.
+
+(Note that messages that are no longer being stored are not necessarily 
immediately deleted, and may in fact still be accessible until the next ledger 
rollover. Because clients cannot predict when rollovers may happen, it is not 
wise to rely on a rollover not happening at an inconvenient point in time.)
+
+In Pulsar, you can modify this behavior, with namespace granularity, in two 
ways:
+
+* You can persistently store messages that are not within a backlog (because 
they've been acknowledged by on every existing subscription, or because there 
are no subscriptions) by setting [retention policies](#retention-policies).
+* Messages that are not acknowledged within a specified timeframe can be 
automatically acknowledged, by specifying the [time to live](#time-to-live-ttl) 
(TTL).
+
+Pulsar's [admin interface](admin-api-overview) enables you to manage both 
retention policies and TTL with namespace granularity (and thus within a 
specific tenant and either on a specific cluster or in the 
[`global`](concepts-architecture-overview.md#global-cluster) cluster).
+
+
+> #### Retention and TTL solve two different problems
+> * Message retention: Keep the data for at least X hours (even if 
acknowledged)
+> * Time-to-live: Discard data after some time (by automatically acknowledging)
+>
+> Most applications will want to use at most one of these.
+
+
+## Retention policies
+
+By default, when a Pulsar message arrives at a broker, the message is stored 
until it has been acknowledged on all subscriptions, at which point it is 
marked for deletion. You can override this behavior and retain messages that 
have already been acknowledged on all subscriptions by setting a *retention 
policy* for all topics in a given namespace. Retention is based on both a *size 
limit* and a *time limit*.
+
+Retention policies are useful when you use the Reader interface. The Reader 
interface does not use acknowledgements, and messages do not exist within 
backlogs. It is required to configure retention for Reader-only use cases.
+
+When you set a retention policy on topics in a namespace, you must set 
**both** a *size limit* and a *time limit*. You can refer to the following 
table to set retention policies in `pulsar-admin` and Java.
+
+|Time limit|Size limit| Message retention      |
+|----------|----------|------------------------|
+| -1       | -1       | Infinite retention  |
+| -1       | >0       | Based on the size limit  |
+| >0       | -1       | Based on the time limit  |
+| 0        | 0        | Disable message retention (by default) |
+| 0        | >0       | Invalid  |
+| >0       | 0        | Invalid  |
+| >0       | >0       | Acknowledged messages or messages with no active 
subscription will not be retained when either time or size reaches the limit. |
+
+The retention settings apply to all messages on topics that do not have any 
subscriptions, or to messages that have been acknowledged by all subscriptions. 
The retention policy settings do not affect unacknowledged messages on topics 
with subscriptions. The unacknowledged messages are controlled by the backlog 
quota.
+
+When a retention limit on a topic is exceeded, the oldest message is marked 
for deletion until the set of retained messages falls within the specified 
limits again.
+
+### Defaults
+
+You can set message retention at instance level with the following two 
parameters: `defaultRetentionTimeInMinutes` and `defaultRetentionSizeInMB`. 
Both parameters are set to `0` by default. 
+
+For more information of the two parameters, refer to the 
[`broker.conf`](reference-configuration.md#broker) configuration file.
+
+### Set retention policy
+
+You can set a retention policy for a namespace by specifying the namespace, a 
size limit and a time limit in `pulsar-admin`, REST API and Java.
+
+<Tabs 
+  defaultValue="pulsar-admin"
+  values={[
+  {
+    "label": "pulsar-admin",
+    "value": "pulsar-admin"
+  },
+  {
+    "label": "REST API",
+    "value": "REST API"
+  },
+  {
+    "label": "Java",
+    "value": "Java"
+  }
+]}>
+<TabItem value="pulsar-admin">
+
+You can use the 
[`set-retention`](reference-pulsar-admin.md#namespaces-set-retention) 
subcommand and specify a namespace, a size limit using the `-s`/`--size` flag, 
and a time limit using the `-t`/`--time` flag. 
+
+In the following example, the size limit is set to 10 GB and the time limit is 
set to 3 hours for each topic within the `my-tenant/my-ns` namespace. 
+- When the size of messages reaches 10 GB on a topic within 3 hours, the 
acknowledged messages will not be retained. 
+- After 3 hours, even if the message size is less than 10 GB, the acknowledged 
messages will not be retained. 
+
+```shell
+
+$ pulsar-admin namespaces set-retention my-tenant/my-ns \
+  --size 10G \
+  --time 3h
+
+```
+
+In the following example, the time is not limited and the size limit is set to 
1 TB. The size limit determines the retention.
+
+```shell
+
+$ pulsar-admin namespaces set-retention my-tenant/my-ns \
+  --size 1T \
+  --time -1
+
+```
+
+In the following example, the size is not limited and the time limit is set to 
3 hours. The time limit determines the retention.
+
+```shell
+
+$ pulsar-admin namespaces set-retention my-tenant/my-ns \
+  --size -1 \
+  --time 3h
+
+```
+
+To achieve infinite retention, set both values to `-1`.
+
+```shell
+
+$ pulsar-admin namespaces set-retention my-tenant/my-ns \
+  --size -1 \
+  --time -1
+
+```
+
+To disable the retention policy, set both values to `0`.
+
+```shell
+
+$ pulsar-admin namespaces set-retention my-tenant/my-ns \
+  --size 0 \
+  --time 0
+
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+{@inject: 
endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/retention|operation/setRetention?version=@pulsar:version_number@}
+
+:::note
+
+To disable the retention policy, you need to set both the size and time limit 
to `0`. Set either size or time limit to `0` is invalid. 
+
+:::
+
+</TabItem>
+<TabItem value="Java">
+
+```java
+
+int retentionTime = 10; // 10 minutes
+int retentionSize = 500; // 500 megabytes
+RetentionPolicies policies = new RetentionPolicies(retentionTime, 
retentionSize);
+admin.namespaces().setRetention(namespace, policies);
+
+```
+
+</TabItem>
+
+</Tabs>
+
+### Get retention policy
+
+You can fetch the retention policy for a namespace by specifying the 
namespace. The output will be a JSON object with two keys: 
`retentionTimeInMinutes` and `retentionSizeInMB`.
+
+#### pulsar-admin
+
+Use the [`get-retention`](reference-pulsar-admin.md#namespaces) subcommand and 
specify the namespace.
+
+##### Example
+
+```shell
+
+$ pulsar-admin namespaces get-retention my-tenant/my-ns
+{
+  "retentionTimeInMinutes": 10,
+  "retentionSizeInMB": 500
+}
+
+```
+
+#### REST API
+
+{@inject: 
endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/retention|operation/getRetention?version=@pulsar:version_number@}
+
+#### Java
+
+```java
+
+admin.namespaces().getRetention(namespace);
+
+```
+
+## Backlog quotas
+
+*Backlogs* are sets of unacknowledged messages for a topic that have been 
stored by bookies. Pulsar stores all unacknowledged messages in backlogs until 
they are processed and acknowledged.
+
+You can control the allowable size of backlogs, at the namespace level, using 
*backlog quotas*. Setting a backlog quota involves setting:
+
+TODO: Expand on is this per backlog or per topic?
+
+* an allowable *size threshold* for each topic in the namespace
+* a *retention policy* that determines which action the 
[broker](reference-terminology.md#broker) takes if the threshold is exceeded.
+
+The following retention policies are available:
+
+Policy | Action
+:------|:------
+`producer_request_hold` | The broker will hold and not persist produce request 
payload
+`producer_exception` | The broker will disconnect from the client by throwing 
an exception
+`consumer_backlog_eviction` | The broker will begin discarding backlog messages
+
+
+> #### Beware the distinction between retention policy types
+> As you may have noticed, there are two definitions of the term "retention 
policy" in Pulsar, one that applies to persistent storage of messages not in 
backlogs, and one that applies to messages within backlogs.
+
+
+Backlog quotas are handled at the namespace level. They can be managed via:
+
+### Set size thresholds and backlog retention policies
+
+You can set a size threshold and backlog retention policy for all of the 
topics in a [namespace](reference-terminology.md#namespace) by specifying the 
namespace, a size limit, and a policy by name.
+
+#### pulsar-admin
+
+Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand 
and specify a namespace, a size limit using the `-l`/`--limit` flag, and a 
retention policy using the `-p`/`--policy` flag.
+
+##### Example
+
+```shell
+
+$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
+  --limit 2G \
+  --policy producer_request_hold
+
+```
+
+#### REST API
+
+{@inject: 
endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/getBacklogQuotaMap?version=@pulsar:version_number@}
+
+#### Java
+
+```java
+
+long sizeLimit = 2147483648L;
+BacklogQuota.RetentionPolicy policy = 
BacklogQuota.RetentionPolicy.producer_request_hold;
+BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
+admin.namespaces().setBacklogQuota(namespace, quota);
+
+```
+
+### Get backlog threshold and backlog retention policy
+
+You can see which size threshold and backlog retention policy has been applied 
to a namespace.
+
+#### pulsar-admin
+
+Use the 
[`get-backlog-quotas`](reference-pulsar-admin.md#pulsar-admin-namespaces-get-backlog-quotas)
 subcommand and specify a namespace. Here's an example:
+
+```shell
+
+$ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
+{
+  "destination_storage": {
+    "limit" : 2147483648,
+    "policy" : "producer_request_hold"
+  }
+}
+
+```
+
+#### REST API
+
+{@inject: 
endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap|operation/getBacklogQuotaMap?version=@pulsar:version_number@}
+
+#### Java
+
+```java
+
+Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
+  admin.namespaces().getBacklogQuotas(namespace);
+
+```
+
+### Remove backlog quotas
+
+#### pulsar-admin
+
+Use the 
[`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota)
 subcommand and specify a namespace. Here's an example:
+
+```shell
+
+$ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns
+
+```
+
+#### REST API
+
+{@inject: 
endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/removeBacklogQuota?version=@pulsar:version_number@}
+
+#### Java
+
+```java
+
+admin.namespaces().removeBacklogQuota(namespace);
+
+```
+
+### Clear backlog
+
+#### pulsar-admin
+
+Use the 
[`clear-backlog`](reference-pulsar-admin.md#pulsar-admin-namespaces-clear-backlog)
 subcommand.
+
+##### Example
+
+```shell
+
+$ pulsar-admin namespaces clear-backlog my-tenant/my-ns
+
+```
+
+By default, you will be prompted to ensure that you really want to clear the 
backlog for the namespace. You can override the prompt using the `-f`/`--force` 
flag.
+
+## Time to live (TTL)
+
+By default, Pulsar stores all unacknowledged messages forever. This can lead 
to heavy disk space usage in cases where a lot of messages are going 
unacknowledged. If disk space is a concern, you can set a time to live (TTL) 
that determines how long unacknowledged messages will be retained.
+
+### Set the TTL for a namespace
+
+#### pulsar-admin
+
+Use the 
[`set-message-ttl`](reference-pulsar-admin.md#pulsar-admin-namespaces-set-message-ttl)
 subcommand and specify a namespace and a TTL (in seconds) using the 
`-ttl`/`--messageTTL` flag.
+
+##### Example
+
+```shell
+
+$ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
+  --messageTTL 120 # TTL of 2 minutes
+
+```
+
+#### REST API
+
+{@inject: 
endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/setNamespaceMessageTTL?version=@pulsar:version_number@}
+
+#### Java
+
+```java
+
+admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);
+
+```
+
+### Get the TTL configuration for a namespace
+
+#### pulsar-admin
+
+Use the 
[`get-message-ttl`](reference-pulsar-admin.md#pulsar-admin-namespaces-get-message-ttl)
 subcommand and specify a namespace.
+
+##### Example
+
+```shell
+
+$ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
+60
+
+```
+
+#### REST API
+
+{@inject: 
endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/getNamespaceMessageTTL?version=@pulsar:version_number@}
+
+#### Java
+
+```java
+
+admin.namespaces().getNamespaceMessageTTL(namespace)
+
+```
+
+### Remove the TTL configuration for a namespace
+
+#### pulsar-admin
+
+Use the 
[`remove-message-ttl`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-message-ttl)
 subcommand and specify a namespace.
+
+##### Example
+
+```shell
+
+$ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns
+
+```
+
+#### REST API
+
+{@inject: 
endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/removeNamespaceMessageTTL?version=@pulsar:version_number@}
+
+#### Java
+
+```java
+
+admin.namespaces().removeNamespaceMessageTTL(namespace)
+
+```
+
+## Delete messages from namespaces
+
+If you do not have any retention period and that you never have much of a 
backlog, the upper limit for retaining messages, which are acknowledged, equals 
to the Pulsar segment rollover period + entry log rollover period + (garbage 
collection interval * garbage collection ratios).
+
+- **Segment rollover period**: basically, the segment rollover period is how 
often a new segment is created. Once a new segment is created, the old segment 
will be deleted. By default, this happens either when you have written 50,000 
entries (messages) or have waited 240 minutes. You can tune this in your broker.
+
+- **Entry log rollover period**: multiple ledgers in BookKeeper are 
interleaved into an [entry 
log](https://bookkeeper.apache.org/docs/4.11.1/getting-started/concepts/#entry-logs).
 In order for a ledger that has been deleted, the entry log must all be rolled 
over.
+The entry log rollover period is configurable, but is purely based on the 
entry log size. For details, see 
[here](https://bookkeeper.apache.org/docs/4.11.1/reference/config/#entry-log-settings).
 Once the entry log is rolled over, the entry log can be garbage collected.
+
+- **Garbage collection interval**: because entry logs have interleaved 
ledgers, to free up space, the entry logs need to be rewritten. The garbage 
collection interval is how often BookKeeper performs garbage collection. which 
is related to minor compaction and major compaction of entry logs. For details, 
see 
[here](https://bookkeeper.apache.org/docs/4.11.1/reference/config/#entry-log-compaction-settings).
diff --git a/site2/website-next/versioned_sidebars/version-2.7.1-sidebars.json 
b/site2/website-next/versioned_sidebars/version-2.7.1-sidebars.json
index 137d0d6..9ec031b 100644
--- a/site2/website-next/versioned_sidebars/version-2.7.1-sidebars.json
+++ b/site2/website-next/versioned_sidebars/version-2.7.1-sidebars.json
@@ -491,6 +491,40 @@
           "id": "version-2.7.1/adaptors-storm"
         }
       ]
+    },
+    {
+      "type": "category",
+      "label": "Cookbooks",
+      "items": [
+        {
+          "type": "doc",
+          "id": "version-2.7.1/cookbooks-compaction"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.1/cookbooks-deduplication"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.1/cookbooks-non-persistent"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.1/cookbooks-retention-expiry"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.1/cookbooks-encryption"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.1/cookbooks-message-queue"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.1/cookbooks-bookkeepermetadata"
+        }
+      ]
     }
   ]
 }
\ No newline at end of file

Reply via email to