This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2ecd03a Reintroduce Aerospike connector (#2524)
2ecd03a is described below
commit 2ecd03abbbf6128d59d27a9d151ed47cc7ead4c7
Author: Ali Ahmed <[email protected]>
AuthorDate: Thu Sep 6 07:39:37 2018 -0700
Reintroduce Aerospike connector (#2524)
---
distribution/io/src/assemble/io.xml | 8 +
pom.xml | 1 +
pulsar-io/{ => aerospike}/pom.xml | 56 +++++--
.../pulsar/io/aerospike/AerospikeAbstractSink.java | 169 +++++++++++++++++++++
.../pulsar/io/aerospike/AerospikeSinkConfig.java | 64 ++++++++
.../pulsar/io/aerospike/AerospikeStringSink.java | 35 +++++
.../resources/META-INF/services/pulsar-io.yaml | 22 +++
pulsar-io/pom.xml | 1 +
site2/docs/deploy-bare-metal.md | 1 +
site2/docs/getting-started-standalone.md | 1 +
site2/docs/io-aerospike.md | 21 +++
site2/docs/io-connectors.md | 1 +
site2/docs/io-overview.md | 3 +-
site2/docs/io-quickstart.md | 3 +-
14 files changed, 368 insertions(+), 18 deletions(-)
diff --git a/distribution/io/src/assemble/io.xml
b/distribution/io/src/assemble/io.xml
index bb75e84..08ff859 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -74,15 +74,23 @@
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
+
<file>
<source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source>
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
+
<file>
<source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
+
+ <file>
+
<source>${basedir}/../../pulsar-io/aerospike/target/pulsar-io-aerospike-${project.version}.nar</source>
+ <outputDirectory>connectors</outputDirectory>
+ <fileMode>644</fileMode>
+ </file>
</files>
</assembly>
diff --git a/pom.xml b/pom.xml
index 681f18e..a35ca81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@ flexible messaging model and an intuitive client
API.</description>
<sketches.version>0.8.3</sketches.version>
<hbc-core.version>2.2.0</hbc-core.version>
<cassandra-driver-core.version>3.4.0</cassandra-driver-core.version>
+ <aerospike-client.version>4.1.11</aerospike-client.version>
<kafka-client.version>0.10.2.1</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.297</aws-sdk.version>
diff --git a/pulsar-io/pom.xml b/pulsar-io/aerospike/pom.xml
similarity index 50%
copy from pulsar-io/pom.xml
copy to pulsar-io/aerospike/pom.xml
index e89cc02..34b2d75 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/aerospike/pom.xml
@@ -19,27 +19,51 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
<parent>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
+ <artifactId>pulsar-io</artifactId>
<version>2.2.0-incubating-SNAPSHOT</version>
</parent>
- <artifactId>pulsar-io</artifactId>
- <name>Pulsar IO :: Parent</name>
-
- <modules>
- <module>core</module>
- <module>twitter</module>
- <module>cassandra</module>
- <module>kafka</module>
- <module>rabbitmq</module>
- <module>kinesis</module>
- <module>jdbc</module>
- <module>data-genenator</module>
- </modules>
+ <artifactId>pulsar-io-aerospike</artifactId>
+ <name>Pulsar IO :: Aerospike</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aerospike</groupId>
+ <artifactId>aerospike-client-bc</artifactId>
+ <version>${aerospike-client.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
new file mode 100644
index 0000000..fe3787a
--- /dev/null
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.aerospike;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.AerospikeException;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Host;
+import com.aerospike.client.Key;
+import com.aerospike.client.Value;
+import com.aerospike.client.async.EventLoop;
+import com.aerospike.client.async.EventPolicy;
+import com.aerospike.client.async.NioEventLoops;
+import com.aerospike.client.listener.WriteListener;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.WritePolicy;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Simple abstract class for Aerospike sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AerospikeAbstractSink.class);
+
+ // ----- Runtime fields
+ private AerospikeSinkConfig aerospikeSinkConfig;
+ private AerospikeClient client;
+ private WritePolicy writePolicy;
+ private BlockingQueue<AWriteListener> queue;
+ private NioEventLoops eventLoops;
+ private EventLoop eventLoop;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ aerospikeSinkConfig = AerospikeSinkConfig.load(config);
+ if (aerospikeSinkConfig.getSeedHosts() == null
+ || aerospikeSinkConfig.getKeyspace() == null
+ || aerospikeSinkConfig.getColumnName() == null) {
+ throw new IllegalArgumentException("Required property not set.");
+ }
+
+ writePolicy = new WritePolicy();
+ writePolicy.maxRetries = aerospikeSinkConfig.getRetries();
+ writePolicy.setTimeout(aerospikeSinkConfig.getTimeoutMs());
+ createClient();
+ queue = new
LinkedBlockingDeque<>(aerospikeSinkConfig.getMaxConcurrentRequests());
+ for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests();
++i) {
+ queue.put(new AWriteListener(queue));
+ }
+
+ eventLoops = new NioEventLoops(new EventPolicy(), 1);
+ eventLoop = eventLoops.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+
+ if (eventLoops != null) {
+ eventLoops.close();
+ }
+ LOG.info("Connection Closed");
+ }
+
+ @Override
+ public void write(Record<byte[]> record) {
+ KeyValue<K, V> keyValue = extractKeyValue(record);
+ Key key = new Key(aerospikeSinkConfig.getKeyspace(),
aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
+ Bin bin = new Bin(aerospikeSinkConfig.getColumnName(),
Value.getAsBlob(keyValue.getValue()));
+ AWriteListener listener = null;
+ try {
+ listener = queue.take();
+ } catch (InterruptedException ex) {
+ record.fail();
+ return;
+ }
+ listener.setContext(record);
+ client.put(eventLoop, listener, writePolicy, key, bin);
+ }
+
+ private void createClient() {
+ String[] hosts = aerospikeSinkConfig.getSeedHosts().split(",");
+ if (hosts.length <= 0) {
+ throw new RuntimeException("Invalid Seed Hosts");
+ }
+ Host[] aeroSpikeHosts = new Host[hosts.length];
+ for (int i = 0; i < hosts.length; ++i) {
+ String[] hostPort = hosts[i].split(":");
+ aeroSpikeHosts[i] = new Host(hostPort[0],
Integer.valueOf(hostPort[1]));
+ }
+ ClientPolicy policy = new ClientPolicy();
+ if (aerospikeSinkConfig.getUserName() != null &&
!aerospikeSinkConfig.getUserName().isEmpty()
+ && aerospikeSinkConfig.getPassword() != null &&
!aerospikeSinkConfig.getPassword().isEmpty()) {
+ policy.user = aerospikeSinkConfig.getUserName();
+ policy.password = aerospikeSinkConfig.getPassword();
+ }
+ client = new AerospikeClient(policy, aeroSpikeHosts);
+ }
+
+ private class AWriteListener implements WriteListener {
+ private Record<byte[]> context;
+ private BlockingQueue<AWriteListener> queue;
+
+ public AWriteListener(BlockingQueue<AWriteListener> queue) {
+ this.queue = queue;
+ }
+
+ public void setContext(Record<byte[]> record) {
+ this.context = record;
+ }
+
+ @Override
+ public void onSuccess(Key key) {
+ if (context != null) {
+ context.ack();
+ }
+ try {
+ queue.put(this);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Interrupted while being added to
the queue" ,ex);
+ }
+ }
+
+ @Override
+ public void onFailure(AerospikeException e) {
+ if (context != null) {
+ context.fail();
+ }
+ try {
+ queue.put(this);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Interrupted while being added to
the queue", ex);
+ }
+ }
+ }
+
+ public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
+}
\ No newline at end of file
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
new file mode 100644
index 0000000..931d280
--- /dev/null
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.aerospike;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class AerospikeSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String seedHosts;
+ private String keyspace;
+ private String columnName;
+
+ // Optional
+ private String userName;
+ private String password;
+ private String keySet;
+ private int maxConcurrentRequests = 100;
+ private int timeoutMs = 100;
+ private int retries = 1;
+
+
+ public static AerospikeSinkConfig load(String yamlFile) throws IOException
{
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
+ }
+
+ public static AerospikeSinkConfig load(Map<String, Object> map) throws
IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map),
AerospikeSinkConfig.class);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
new file mode 100644
index 0000000..bac07a0
--- /dev/null
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.aerospike;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * Aerospike sink that treats incoming messages on the input topic as Strings
+ * and write identical key/value pairs.
+ */
+public class AerospikeStringSink extends AerospikeAbstractSink<String, String>
{
+ @Override
+ public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+ String key = record.getKey().orElseGet(() -> new
String(record.getValue()));
+ return new KeyValue<>(key, new String(record.getValue()));
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..f2a7ab5
--- /dev/null
+++ b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: aerospike
+description: Aerospike database sink
+sinkClass: org.apache.pulsar.io.aerospike.AerospikeStringSink
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index e89cc02..92f2186 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -35,6 +35,7 @@
<module>core</module>
<module>twitter</module>
<module>cassandra</module>
+ <module>aerospike</module>
<module>kafka</module>
<module>rabbitmq</module>
<module>kinesis</module>
diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md
index a65a45a..a438f92 100644
--- a/site2/docs/deploy-bare-metal.md
+++ b/site2/docs/deploy-bare-metal.md
@@ -125,6 +125,7 @@ $ tar xvfz
apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
$ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
$ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
pulsar-io-cassandra-{{pulsar:version}}.nar
pulsar-io-kafka-{{pulsar:version}}.nar
pulsar-io-kinesis-{{pulsar:version}}.nar
diff --git a/site2/docs/getting-started-standalone.md
b/site2/docs/getting-started-standalone.md
index 944eee0..3f95cdd 100644
--- a/site2/docs/getting-started-standalone.md
+++ b/site2/docs/getting-started-standalone.md
@@ -87,6 +87,7 @@ $ tar xvfz
/path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
$ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
$ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
pulsar-io-cassandra-{{pulsar:version}}.nar
pulsar-io-kafka-{{pulsar:version}}.nar
pulsar-io-kinesis-{{pulsar:version}}.nar
diff --git a/site2/docs/io-aerospike.md b/site2/docs/io-aerospike.md
new file mode 100644
index 0000000..b23e2e3
--- /dev/null
+++ b/site2/docs/io-aerospike.md
@@ -0,0 +1,21 @@
+---
+id: io-aerospike
+title: Aerospike Sink Connector
+sidebar_label: Aerospike Sink Connector
+---
+
+The Aerospike Sink connector is used to write messages to an Aerospike Cluster.
+
+## Sink Configuration Options
+
+The following configuration options are specific to the Aerospike Connector:
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `seedHosts` | `true` | `null` | Comma seperated list of one or more
Aerospike cluster hosts; each host can be specified as a valid IP address or
hostname followed by an optional port number (default is 3000). |
+| `keyspace` | `true` | `null` | Aerospike namespace to use. |
+| `keySet` | `false` | `null` | Aerospike set name to use. |
+| `columnName` | `true` | `null` | Aerospike bin name to use. |
+| `maxConcurrentRequests` | `false` | `100` | Maximum number of concurrent
Aerospike transactions that a Sink can open. |
+| `timeoutMs` | `false` | `100` | A single timeout value controls
`socketTimeout` and `totalTimeout` for Aerospike transactions. |
+| `retries` | `false` | `1` | Maximum number of retries before aborting a
write transaction to Aerospike. |
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 6f7d4b3..5a76998 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -9,6 +9,7 @@ These connectors import and export data from some of the most
commonly used data
as easy as writing a simple connector configuration and running the connector
locally or submitting the connector to a
Pulsar Functions cluster.
+- [Aerospike Sink Connector](io-aerospike.md)
- [Cassandra Sink Connector](io-cassandra.md)
- [Kafka Sink Connector](io-kafka.md#sink)
- [Kafka Source Connector](io-kafka.md#source)
diff --git a/site2/docs/io-overview.md b/site2/docs/io-overview.md
index be8792c..0c55716 100644
--- a/site2/docs/io-overview.md
+++ b/site2/docs/io-overview.md
@@ -4,7 +4,7 @@ title: Pulsar IO Overview
sidebar_label: Overview
---
-Messaging systems are most powerful when you can easily use them in
conjunction with external systems like databases and other messaging systems.
**Pulsar IO** is a feature of Pulsar that enables you to easily create, deploy,
and manage Pulsar **connectors** that interact with external systems, such as
[Apache Cassandra](https://cassandra.apache.org), and many others.
+Messaging systems are most powerful when you can easily use them in
conjunction with external systems like databases and other messaging systems.
**Pulsar IO** is a feature of Pulsar that enables you to easily create, deploy,
and manage Pulsar **connectors** that interact with external systems, such as
[Apache Cassandra](https://cassandra.apache.org),
[Aerospike](https://www.aerospike.com), and many others.
> #### Pulsar IO and Pulsar Functions
> Under the hood, Pulsar IO connectors are specialized [Pulsar
> Functions](functions-overview.md) purpose-built to interface with external
> systems. The [administrative interface](io-quickstart.md) for Pulsar IO is,
> in fact, quite similar to that of Pulsar Functions.
@@ -30,6 +30,7 @@ The following connectors are currently available for Pulsar:
|Name|Java Class|Documentation|
|---|---|---|
+|[Aerospike
sink](https://www.aerospike.com/)|[`org.apache.pulsar.io.aerospike.AerospikeSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java)|[Documentation](io-aerospike.md)|
|[Cassandra
sink](https://cassandra.apache.org)|[`org.apache.pulsar.io.cassandra.CassandraSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java)|[Documentation](io-cassandra.md)|
|[Kafka
source](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSource`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java)|[Documentation](io-kafka.md#source)|
|[Kafka
sink](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java)|[Documentation](io-kafka.md#sink)|
diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md
index 4b40f6b..8b8cfd3 100644
--- a/site2/docs/io-quickstart.md
+++ b/site2/docs/io-quickstart.md
@@ -69,6 +69,7 @@ $ tar xvfz
/path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
$ cp -r apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
$ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
pulsar-io-cassandra-{{pulsar:version}}.nar
pulsar-io-kafka-{{pulsar:version}}.nar
pulsar-io-kinesis-{{pulsar:version}}.nar
@@ -122,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
Example output:
```json
-[{"name":"cassandra","description":"Writes data into
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
source and sink
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
sink
connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ
source connector","sour [...]
+[{"name":"aerospike","description":"Aerospike database
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
data into
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
source and sink
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
sink connect [...]
```
If an error occurred while starting Pulsar service, you may be able to seen
exception at the terminal you are running `pulsar/standalone`,