This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b12fb  commit chapter Adaptors (#12814)
77b12fb is described below

commit 77b12fb186a1525b06d3304c0f5e108c6cffd882
Author: Yan <[email protected]>
AuthorDate: Tue Nov 16 20:14:28 2021 +0800

    commit chapter Adaptors (#12814)
    
    Co-authored-by: Anonymitaet <[email protected]>
---
 .../versioned_docs/version-2.7.2/adaptors-kafka.md | 278 +++++++++++++++++++++
 .../versioned_docs/version-2.7.2/adaptors-spark.md |  83 ++++++
 .../versioned_docs/version-2.7.2/adaptors-storm.md | 100 ++++++++
 .../versioned_sidebars/version-2.7.2-sidebars.json |  13 +
 4 files changed, 474 insertions(+)

diff --git a/site2/website-next/versioned_docs/version-2.7.2/adaptors-kafka.md 
b/site2/website-next/versioned_docs/version-2.7.2/adaptors-kafka.md
new file mode 100644
index 0000000..24bf975
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.7.2/adaptors-kafka.md
@@ -0,0 +1,278 @@
+---
+id: adaptors-kafka
+title: Pulsar adaptor for Apache Kafka
+sidebar_label: "Kafka client wrapper"
+original_id: adaptors-kafka
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+
+Pulsar provides an easy option for applications that are currently written 
using the [Apache Kafka](http://kafka.apache.org) Java client API.
+
+## Using the Pulsar Kafka compatibility wrapper
+
+In an existing application, change the regular Kafka client dependency and 
replace it with the Pulsar Kafka wrapper. Remove the following dependency in 
`pom.xml`:
+
+```xml
+
+<dependency>
+  <groupId>org.apache.kafka</groupId>
+  <artifactId>kafka-clients</artifactId>
+  <version>0.10.2.1</version>
+</dependency>
+
+```
+
+Then include this dependency for the Pulsar Kafka wrapper:
+
+```xml
+
+<dependency>
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-client-kafka</artifactId>
+  <version>@pulsar:version@</version>
+</dependency>
+
+```
+
+With the new dependency, the existing code works without any changes. You need 
to adjust the configuration, and make sure it points the
+producers and consumers to Pulsar service rather than Kafka, and uses a 
particular
+Pulsar topic.
+
+## Using the Pulsar Kafka compatibility wrapper together with existing kafka 
client
+
+When migrating from Kafka to Pulsar, the application might use the original 
kafka client
+and the pulsar kafka wrapper together during migration. You should consider 
using the
+unshaded pulsar kafka client wrapper.
+
+```xml
+
+<dependency>
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-client-kafka-original</artifactId>
+  <version>@pulsar:version@</version>
+</dependency>
+
+```
+
+When using this dependency, construct producers using 
`org.apache.kafka.clients.producer.PulsarKafkaProducer`
+instead of `org.apache.kafka.clients.producer.KafkaProducer` and 
`org.apache.kafka.clients.producer.PulsarKafkaConsumer` for consumers.
+
+## Producer example
+
+```java
+
+// Topic needs to be a regular Pulsar topic
+String topic = "persistent://public/default/my-topic";
+
+Properties props = new Properties();
+// Point to a Pulsar service
+props.put("bootstrap.servers", "pulsar://localhost:6650");
+
+props.put("key.serializer", IntegerSerializer.class.getName());
+props.put("value.serializer", StringSerializer.class.getName());
+
+Producer<Integer, String> producer = new KafkaProducer(props);
+
+for (int i = 0; i < 10; i++) {
+    producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
+    log.info("Message {} sent successfully", i);
+}
+
+producer.close();
+
+```
+
+## Consumer example
+
+```java
+
+String topic = "persistent://public/default/my-topic";
+
+Properties props = new Properties();
+// Point to a Pulsar service
+props.put("bootstrap.servers", "pulsar://localhost:6650");
+props.put("group.id", "my-subscription-name");
+props.put("enable.auto.commit", "false");
+props.put("key.deserializer", IntegerDeserializer.class.getName());
+props.put("value.deserializer", StringDeserializer.class.getName());
+
+Consumer<Integer, String> consumer = new KafkaConsumer(props);
+consumer.subscribe(Arrays.asList(topic));
+
+while (true) {
+    ConsumerRecords<Integer, String> records = consumer.poll(100);
+    records.forEach(record -> {
+        log.info("Received record: {}", record);
+    });
+
+    // Commit last offset
+    consumer.commitSync();
+}
+
+```
+
+## Complete Examples
+
+You can find the complete producer and consumer examples 
[here](https://github.com/apache/pulsar/tree/master/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples).
+
+## Compatibility matrix
+
+Currently the Pulsar Kafka wrapper supports most of the operations offered by 
the Kafka API.
+
+#### Producer
+
+APIs:
+
+| Producer Method                                                              
 | Supported | Notes                                                            
        |
+|:------------------------------------------------------------------------------|:----------|:-------------------------------------------------------------------------|
+| `Future<RecordMetadata> send(ProducerRecord<K, V> record)`                   
 | Yes       |                                                                  
        |
+| `Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback)` | Yes       |                                                        
                  |
+| `void flush()`                                                               
 | Yes       |                                                                  
        |
+| `List<PartitionInfo> partitionsFor(String topic)`                            
 | No        |                                                                  
        |
+| `Map<MetricName, ? extends Metric> metrics()`                                
 | No        |                                                                  
        |
+| `void close()`                                                               
 | Yes       |                                                                  
        |
+| `void close(long timeout, TimeUnit unit)`                                    
 | Yes       |                                                                  
        |
+
+Properties:
+
+| Config property                         | Supported | Notes                  
                                                       |
+|:----------------------------------------|:----------|:------------------------------------------------------------------------------|
+| `acks`                                  | Ignored   | Durability and quorum 
writes are configured at the namespace level            |
+| `auto.offset.reset`                     | Yes       | Will have a default 
value of `latest` if user does not give specific setting. |
+| `batch.size`                            | Ignored   |                        
                                                       |
+| `bootstrap.servers`                     | Yes       |                        
         |
+| `buffer.memory`                         | Ignored   |                        
                                                       |
+| `client.id`                             | Ignored   |                        
                                                       |
+| `compression.type`                      | Yes       | Allows `gzip` and 
`lz4`. No `snappy`.                                         |
+| `connections.max.idle.ms`               | Yes       | Only support up to 
2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time|
+| `interceptor.classes`                   | Yes       |                        
                                                       |
+| `key.serializer`                        | Yes       |                        
                                                       |
+| `linger.ms`                             | Yes       | Controls the group 
commit time when batching messages                         |
+| `max.block.ms`                          | Ignored   |                        
                                                       |
+| `max.in.flight.requests.per.connection` | Ignored   | In Pulsar ordering is 
maintained even with multiple requests in flight        |
+| `max.request.size`                      | Ignored   |                        
                                                       |
+| `metric.reporters`                      | Ignored   |                        
                                                       |
+| `metrics.num.samples`                   | Ignored   |                        
                                                       |
+| `metrics.sample.window.ms`              | Ignored   |                        
                                                       |
+| `partitioner.class`                     | Yes       |                        
                                                       |
+| `receive.buffer.bytes`                  | Ignored   |                        
                                                       |
+| `reconnect.backoff.ms`                  | Ignored   |                        
                                                       |
+| `request.timeout.ms`                    | Ignored   |                        
                                                       |
+| `retries`                               | Ignored   | Pulsar client retries 
with exponential backoff until the send timeout expires. |
+| `send.buffer.bytes`                     | Ignored   |                        
                                                       |
+| `timeout.ms`                            | Yes       |                        
                                                       |
+| `value.serializer`                      | Yes       |                        
                                                       |
+
+
+#### Consumer
+
+The following table lists consumer APIs.
+
+| Consumer Method                                                              
                           | Supported | Notes |
+|:--------------------------------------------------------------------------------------------------------|:----------|:------|
+| `Set<TopicPartition> assignment()`                                           
                           | No        |       |
+| `Set<String> subscription()`                                                 
                           | Yes       |       |
+| `void subscribe(Collection<String> topics)`                                  
                           | Yes       |       |
+| `void subscribe(Collection<String> topics, ConsumerRebalanceListener 
callback)`                         | No        |       |
+| `void assign(Collection<TopicPartition> partitions)`                         
                           | No        |       |
+| `void subscribe(Pattern pattern, ConsumerRebalanceListener callback)`        
                           | No        |       |
+| `void unsubscribe()`                                                         
                           | Yes       |       |
+| `ConsumerRecords<K, V> poll(long timeoutMillis)`                             
                           | Yes       |       |
+| `void commitSync()`                                                          
                           | Yes       |       |
+| `void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)`            
                           | Yes       |       |
+| `void commitAsync()`                                                         
                           | Yes       |       |
+| `void commitAsync(OffsetCommitCallback callback)`                            
                           | Yes       |       |
+| `void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback)`       | Yes       |       |
+| `void seek(TopicPartition partition, long offset)`                           
                           | Yes       |       |
+| `void seekToBeginning(Collection<TopicPartition> partitions)`                
                           | Yes       |       |
+| `void seekToEnd(Collection<TopicPartition> partitions)`                      
                           | Yes       |       |
+| `long position(TopicPartition partition)`                                    
                           | Yes       |       |
+| `OffsetAndMetadata committed(TopicPartition partition)`                      
                           | Yes       |       |
+| `Map<MetricName, ? extends Metric> metrics()`                                
                           | No        |       |
+| `List<PartitionInfo> partitionsFor(String topic)`                            
                           | No        |       |
+| `Map<String, List<PartitionInfo>> listTopics()`                              
                           | No        |       |
+| `Set<TopicPartition> paused()`                                               
                           | No        |       |
+| `void pause(Collection<TopicPartition> partitions)`                          
                           | No        |       |
+| `void resume(Collection<TopicPartition> partitions)`                         
                           | No        |       |
+| `Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, 
Long> timestampsToSearch)` | No        |       |
+| `Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> 
partitions)`                     | No        |       |
+| `Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions)`                           | No        |       |
+| `void close()`                                                               
                           | Yes       |       |
+| `void close(long timeout, TimeUnit unit)`                                    
                           | Yes       |       |
+| `void wakeup()`                                                              
                           | No        |       |
+
+Properties:
+
+| Config property                 | Supported | Notes                          
                       |
+|:--------------------------------|:----------|:------------------------------------------------------|
+| `group.id`                      | Yes       | Maps to a Pulsar subscription 
name                    |
+| `max.poll.records`              | Yes       |                                
                       |
+| `max.poll.interval.ms`          | Ignored   | Messages are "pushed" from 
broker                     |
+| `session.timeout.ms`            | Ignored   |                                
                       |
+| `heartbeat.interval.ms`         | Ignored   |                                
                       |
+| `bootstrap.servers`             | Yes       | Needs to point to a single 
Pulsar service URL         |
+| `enable.auto.commit`            | Yes       |                                
                       |
+| `auto.commit.interval.ms`       | Ignored   | With auto-commit, acks are 
sent immediately to broker |
+| `partition.assignment.strategy` | Ignored   |                                
                       |
+| `auto.offset.reset`             | Yes       | Only support earliest and 
latest.                     |
+| `fetch.min.bytes`               | Ignored   |                                
                       |
+| `fetch.max.bytes`               | Ignored   |                                
                       |
+| `fetch.max.wait.ms`             | Ignored   |                                
                       |
+| `interceptor.classes`           | Yes       |                                
                       |
+| `metadata.max.age.ms`           | Ignored   |                                
                       |
+| `max.partition.fetch.bytes`     | Ignored   |                                
                       |
+| `send.buffer.bytes`             | Ignored   |                                
                       |
+| `receive.buffer.bytes`          | Ignored   |                                
                       |
+| `client.id`                     | Ignored   |                                
                       |
+
+
+## Customize Pulsar configurations
+
+You can configure Pulsar authentication provider directly from the Kafka 
properties.
+
+### Pulsar client properties
+
+| Config property                        | Default | Notes                     
                                                             |
+|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
+| 
[`pulsar.authentication.class`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-org.apache.pulsar.client.api.Authentication-)
          |         | Configure to auth provider. For example, 
`org.apache.pulsar.client.impl.auth.AuthenticationTls`.|
+| 
[`pulsar.authentication.params.map`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.util.Map-)
          |         | Map which represents parameters for the 
Authentication-Plugin. |
+| 
[`pulsar.authentication.params.string`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.lang.String-)
          |         | String which represents parameters for the 
Authentication-Plugin, for example, `key1:val1,key2:val2`. |
+| 
[`pulsar.use.tls`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTls-boolean-)
                       | `false` | Enable TLS transport encryption.             
                                           |
+| 
[`pulsar.tls.trust.certs.file.path`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsTrustCertsFilePath-java.lang.String-)
   |         | Path for the TLS trust certificate store.                        
                       |
+| 
[`pulsar.tls.allow.insecure.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsAllowInsecureConnection-boolean-)
 | `false` | Accept self-signed certificates from brokers.                      
                     |
+| 
[`pulsar.operation.timeout.ms`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setOperationTimeout-int-java.util.concurrent.TimeUnit-)
 | `30000` | General operations timeout. |
+| 
[`pulsar.stats.interval.seconds`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setStatsInterval-long-java.util.concurrent.TimeUnit-)
 | `60` | Pulsar client lib stats printing interval. |
+| 
[`pulsar.num.io.threads`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setIoThreads-int-)
 | `1` | The number of Netty IO threads to use. |
+| 
[`pulsar.connections.per.broker`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConnectionsPerBroker-int-)
 | `1` | The maximum number of connection to each broker. |
+| 
[`pulsar.use.tcp.nodelay`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTcpNoDelay-boolean-)
 | `true` | TCP no-delay. |
+| 
[`pulsar.concurrent.lookup.requests`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConcurrentLookupRequest-int-)
 | `50000` | The maximum number of concurrent topic lookups. |
+| 
[`pulsar.max.number.rejected.request.per.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setMaxNumberOfRejectedRequestPerConnection-int-)
 | `50` | The threshold of errors to forcefully close a connection. |
+| 
[`pulsar.keepalive.interval.ms`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientBuilder.html#keepAliveInterval-int-java.util.concurrent.TimeUnit-)|
 `30000` | Keep alive interval for each client-broker-connection.  |
+
+
+### Pulsar producer properties
+
+| Config property                        | Default | Notes                     
                                                             |
+|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
+| 
[`pulsar.producer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setProducerName-java.lang.String-)
 | | Specify the producer name. |
+| 
[`pulsar.producer.initial.sequence.id`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setInitialSequenceId-long-)
 |  | Specify baseline for sequence ID of this producer. |
+| 
[`pulsar.producer.max.pending.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessages-int-)
 | `1000` | Set the maximum size of the message queue pending to receive an 
acknowledgment from the broker.  |
+| 
[`pulsar.producer.max.pending.messages.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessagesAcrossPartitions-int-)
 | `50000` | Set the maximum number of pending messages across all the 
partitions.  |
+| 
[`pulsar.producer.batching.enabled`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingEnabled-boolean-)
 | `true` | Control whether automatic batching of messages is enabled for the 
producer. |
+| 
[`pulsar.producer.batching.max.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingMaxMessages-int-)
 | `1000` | The maximum number of messages in a batch. |
+| 
[`pulsar.block.if.producer.queue.full`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBlockIfQueueFull-boolean-)
 | | Specify the block producer if queue  is full. |
+
+
+### Pulsar consumer Properties
+
+| Config property                        | Default | Notes                     
                                                             |
+|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
+| 
[`pulsar.consumer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setConsumerName-java.lang.String-)
 | | Specify the consumer name. |
+| 
[`pulsar.consumer.receiver.queue.size`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-)
 | 1000 | Set the size of the consumer receiver queue. |
+| 
[`pulsar.consumer.acknowledgments.group.time.millis`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#acknowledgmentGroupTime-long-java.util.concurrent.TimeUnit-)
 | 100 | Set the maximum amount of group time for consumers to send the 
acknowledgments to the broker. |
+| 
[`pulsar.consumer.total.receiver.queue.size.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-)
 | 50000 | Set the maximum size of the total receiver queue across partitions. |
+| 
[`pulsar.consumer.subscription.topics.mode`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#subscriptionTopicsMode-Mode-)
 | PersistentOnly | Set the subscription topic mode for consumers. |
diff --git a/site2/website-next/versioned_docs/version-2.7.2/adaptors-spark.md 
b/site2/website-next/versioned_docs/version-2.7.2/adaptors-spark.md
new file mode 100644
index 0000000..940d401e
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.7.2/adaptors-spark.md
@@ -0,0 +1,83 @@
+---
+id: adaptors-spark
+title: Pulsar adaptor for Apache Spark
+sidebar_label: "Apache Spark"
+original_id: adaptors-spark
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+The Spark Streaming receiver for Pulsar is a custom receiver that enables 
Apache [Spark Streaming](https://spark.apache.org/streaming/) to receive raw 
data from Pulsar.
+
+An application can receive data in [Resilient Distributed 
Dataset](https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds)
 (RDD) format via the Spark Streaming receiver and can process it in a variety 
of ways.
+
+## Prerequisites
+
+To use the receiver, include a dependency for the `pulsar-spark` library in 
your Java configuration.
+
+### Maven
+
+If you're using Maven, add this to your `pom.xml`:
+
+```xml
+
+<!-- in your <properties> block -->
+<pulsar.version>@pulsar:version@</pulsar.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-spark</artifactId>
+  <version>${pulsar.version}</version>
+</dependency>
+
+```
+
+### Gradle
+
+If you're using Gradle, add this to your `build.gradle` file:
+
+```groovy
+
+def pulsarVersion = "@pulsar:version@"
+
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: 
pulsarVersion
+}
+
+```
+
+## Usage
+
+Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` 
method in `JavaStreamingContext`:
+
+```java
+
+    String serviceUrl = "pulsar://localhost:6650/";
+    String topic = "persistent://public/default/test_src";
+    String subs = "test_sub";
+
+    SparkConf sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
+
+    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
Durations.seconds(60));
+
+    ConsumerConfigurationData<byte[]> pulsarConf = new 
ConsumerConfigurationData();
+
+    Set<String> set = new HashSet();
+    set.add(topic);
+    pulsarConf.setTopicNames(set);
+    pulsarConf.setSubscriptionName(subs);
+
+    SparkStreamingPulsarReceiver pulsarReceiver = new 
SparkStreamingPulsarReceiver(
+        serviceUrl,
+        pulsarConf,
+        new AuthenticationDisabled());
+
+    JavaReceiverInputDStream<byte[]> lineDStream = 
jsc.receiverStream(pulsarReceiver);
+
+```
+
+For a complete example, click 
[here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java).
 In this example, the number of messages that contain the string "Pulsar" in 
received messages is counted.
+
diff --git a/site2/website-next/versioned_docs/version-2.7.2/adaptors-storm.md 
b/site2/website-next/versioned_docs/version-2.7.2/adaptors-storm.md
new file mode 100644
index 0000000..e4b07db
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.7.2/adaptors-storm.md
@@ -0,0 +1,100 @@
+---
+id: adaptors-storm
+title: Pulsar adaptor for Apache Storm
+sidebar_label: "Apache Storm"
+original_id: adaptors-storm
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+Pulsar Storm is an adaptor for integrating with [Apache 
Storm](http://storm.apache.org/) topologies. It provides core Storm 
implementations for sending and receiving data.
+
+An application can inject data into a Storm topology via a generic Pulsar 
spout, as well as consume data from a Storm topology via a generic Pulsar bolt.
+
+## Using the Pulsar Storm Adaptor
+
+Include dependency for Pulsar Storm Adaptor:
+
+```xml
+
+<dependency>
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-storm</artifactId>
+  <version>${pulsar.version}</version>
+</dependency>
+
+```
+
+## Pulsar Spout
+
+The Pulsar Spout allows for the data published on a topic to be consumed by a 
Storm topology. It emits a Storm tuple based on the message received and the 
`MessageToValuesMapper` provided by the client.
+
+The tuples that fail to be processed by the downstream bolts will be 
re-injected by the spout with an exponential backoff, within a configurable 
timeout (the default is 60 seconds) or a configurable number of retries, 
whichever comes first, after which it is acknowledged by the consumer. Here's 
an example construction of a spout:
+
+```java
+
+MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
+
+    @Override
+    public Values toValues(Message msg) {
+        return new Values(new String(msg.getData()));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        // declare the output fields
+        declarer.declare(new Fields("string"));
+    }
+};
+
+// Configure a Pulsar Spout
+PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
+spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
+spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
+spoutConf.setSubscriptionName("my-subscriber-name1");
+spoutConf.setMessageToValuesMapper(messageToValuesMapper);
+
+// Create a Pulsar Spout
+PulsarSpout spout = new PulsarSpout(spoutConf);
+
+```
+
+For a complete example, click 
[here](https://github.com/apache/pulsar-adapters/blob/master/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java).
+
+## Pulsar Bolt
+
+The Pulsar bolt allows data in a Storm topology to be published on a topic. It 
publishes messages based on the Storm tuple received and the 
`TupleToMessageMapper` provided by the client.
+
+A partitioned topic can also be used to publish messages on different topics. 
In the implementation of the `TupleToMessageMapper`, a "key" will need to be 
provided in the message which will send the messages with the same key to the 
same topic. Here's an example bolt:
+
+```java
+
+TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
+
+    @Override
+    public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> 
msgBuilder, Tuple tuple) {
+        String receivedMessage = tuple.getString(0);
+        // message processing
+        String processedMsg = receivedMessage + "-processed";
+        return msgBuilder.value(processedMsg.getBytes());
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        // declare the output fields
+    }
+};
+
+// Configure a Pulsar Bolt
+PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
+boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
+boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
+boltConf.setTupleToMessageMapper(tupleToMessageMapper);
+
+// Create a Pulsar Bolt
+PulsarBolt bolt = new PulsarBolt(boltConf);
+
+```
+
diff --git a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json 
b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json
index 35f42bc..4866df2 100644
--- a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json
+++ b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json
@@ -396,6 +396,19 @@
     },
     {
       "type": "category",
+      "label": "Adaptors",
+      "items": [
+        {
+          "type": "doc",
+          "id": "version-2.7.2/adaptors-kafka"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.2/adaptors-spark"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.7.2/adaptors-storm"
       "label": "Admin API",
       "items": [
         {

Reply via email to