sijie closed pull request #2001: adding pulsar io docs
URL: https://github.com/apache/incubator-pulsar/pull/2001
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 83eb7f577f..767eb87e7f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -64,7 +64,7 @@
import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
@Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Sinks (Egress
data from Pulsar)")
+@Parameters(commandDescription = "Interface for managing Pulsar IO sinks
(egress data from Pulsar)")
public class CmdSinks extends CmdBase {
private final CreateSink createSink;
@@ -102,7 +102,7 @@ void processArguments() throws Exception {
abstract void runCmd() throws Exception;
}
- @Parameters(commandDescription = "Run the Pulsar sink locally (rather than
deploying it to the Pulsar cluster)")
+ @Parameters(commandDescription = "Run a Pulsar IO sink connector locally
(rather than deploying it to the Pulsar cluster)")
class LocalSinkRunner extends CreateSink {
@Parameter(names = "--brokerServiceUrl", description = "The URL for
the Pulsar broker")
@@ -139,7 +139,7 @@ void runCmd() throws Exception {
}
}
- @Parameters(commandDescription = "Create Pulsar sink connectors")
+ @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run
in a Pulsar cluster")
class CreateSink extends SinkCommand {
@Override
void runCmd() throws Exception {
@@ -152,7 +152,7 @@ void runCmd() throws Exception {
}
}
- @Parameters(commandDescription = "Update Pulsar sink connectors")
+ @Parameters(commandDescription = "Update a Pulsar IO sink connector")
class UpdateSink extends SinkCommand {
@Override
void runCmd() throws Exception {
@@ -161,7 +161,6 @@ void runCmd() throws Exception {
}
}
- @Parameters(commandDescription = "Create Pulsar sink connectors")
abstract class SinkCommand extends BaseCommand {
@Parameter(names = "--tenant", description = "The sink's tenant")
protected String tenant;
@@ -177,7 +176,7 @@ void runCmd() throws Exception {
protected String topicsPattern;
@Parameter(names = "--customSerdeInputs", description = "The map of
input topics to SerDe class names (as a JSON string)")
protected String customSerdeInputString;
- @Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the Sink")
+ @Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the sink")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
@Parameter(names = "--parallelism", description = "The sink's
parallelism factor (i.e. the number of sink instances to run)")
protected Integer parallelism;
@@ -187,11 +186,11 @@ void runCmd() throws Exception {
@Parameter(names = "--sinkConfigFile", description = "The path to a
YAML config file specifying the "
+ "sink's configuration")
protected String sinkConfigFile;
- @Parameter(names = "--cpu", description = "The cpu in cores that need
to be allocated per function instance(applicable only to docker runtime)")
+ @Parameter(names = "--cpu", description = "The CPU (in cores) that
needs to be allocated per sink instance (applicable only to Docker runtime)")
protected Double cpu;
- @Parameter(names = "--ram", description = "The ram in bytes that need
to be allocated per function instance(applicable only to process/docker
runtime)")
+ @Parameter(names = "--ram", description = "The RAM (in bytes) that
need to be allocated per sink instance (applicable only to the process and
Docker runtimes)")
protected Long ram;
- @Parameter(names = "--disk", description = "The disk in bytes that
need to be allocated per function instance(applicable only to docker runtime)")
+ @Parameter(names = "--disk", description = "The disk (in bytes) that
need to be allocated per sink instance (applicable only to Docker runtime)")
protected Long disk;
@Parameter(names = "--sinkConfig", description = "Sink config
key/values")
protected String sinkConfigString;
@@ -407,7 +406,7 @@ protected FunctionDetails createSinkConfig(SinkConfig
sinkConfig) {
}
}
- @Parameters(commandDescription = "Stops a Pulsar sink or source")
+ @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
class DeleteSink extends BaseCommand {
@Parameter(names = "--tenant", description = "The tenant of the sink")
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 32d9a6ca4c..d41183a400 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -42,7 +42,6 @@
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import
org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassValidator;
-import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import java.io.File;
@@ -52,18 +51,15 @@
import java.net.MalformedURLException;
import java.util.Map;
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.utils.Utils.getSinkType;
import static org.apache.pulsar.functions.utils.Utils.getSourceType;
import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
@Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Source
(Ingress data to Pulsar)")
+@Parameters(commandDescription = "Interface for managing Pulsar IO Sources
(ingress data into Pulsar)")
public class CmdSources extends CmdBase {
private final CreateSource createSource;
@@ -101,7 +97,7 @@ void processArguments() throws Exception {
abstract void runCmd() throws Exception;
}
- @Parameters(commandDescription = "Run the Pulsar source locally (rather
than deploying it to the Pulsar cluster)")
+ @Parameters(commandDescription = "Run a Pulsar IO source connector locally
(rather than deploying it to the Pulsar cluster)")
class LocalSourceRunner extends CreateSource {
@Parameter(names = "--brokerServiceUrl", description = "The URL for
the Pulsar broker")
@@ -138,7 +134,7 @@ void runCmd() throws Exception {
}
}
- @Parameters(commandDescription = "Create Pulsar source connectors")
+ @Parameters(commandDescription = "Submit a Pulsar IO source connector to
run in a Pulsar cluster")
public class CreateSource extends SourceCommand {
@Override
void runCmd() throws Exception {
@@ -147,7 +143,7 @@ void runCmd() throws Exception {
}
}
- @Parameters(commandDescription = "Update Pulsar source connectors")
+ @Parameters(commandDescription = "Update a Pulsar IO source connector")
public class UpdateSource extends SourceCommand {
@Override
void runCmd() throws Exception {
@@ -167,23 +163,22 @@ void runCmd() throws Exception {
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
@Parameter(names = "--className", description = "The source's class
name")
protected String className;
- @Parameter(names = "--destinationTopicName", description = "Pulsar
topic to ingress data to")
+ @Parameter(names = "--destinationTopicName", description = "The Pulsar
topic to which data is sent")
protected String destinationTopicName;
- @Parameter(names = "--deserializationClassName", description = "The
classname for SerDe class for the source")
+ @Parameter(names = "--deserializationClassName", description = "The
SerDe classname for the source")
protected String deserializationClassName;
@Parameter(names = "--parallelism", description = "The source's
parallelism factor (i.e. the number of source instances to run)")
protected Integer parallelism;
- @Parameter(names = "--jar", description = "Path to the jar file for
the Source. It also supports url-path [http/https/file (file protocol assumes
that file already exists on worker host)] from which worker can download the
package.", listConverter = StringConverter.class)
+ @Parameter(names = "--jar", description = "The path to the jar file
for the Source. It also supports url-path [http/https/file (file protocol
assumes that file already exists on worker host)] from which worker can
download the package.", listConverter = StringConverter.class)
protected String jarFile;
-
@Parameter(names = "--sourceConfigFile", description = "The path to a
YAML config file specifying the "
+ "source's configuration")
protected String sourceConfigFile;
- @Parameter(names = "--cpu", description = "The cpu in cores that need
to be allocated per function instance(applicable only to docker runtime)")
+ @Parameter(names = "--cpu", description = "The CPU (in cores) that
needs to be allocated per source instance (applicable only to Docker runtime)")
protected Double cpu;
- @Parameter(names = "--ram", description = "The ram in bytes that need
to be allocated per function instance(applicable only to process/docker
runtime)")
+ @Parameter(names = "--ram", description = "The RAM (in bytes) that
need to be allocated per source instance (applicable only to the process and
Docker runtimes)")
protected Long ram;
- @Parameter(names = "--disk", description = "The disk in bytes that
need to be allocated per function instance(applicable only to docker runtime)")
+ @Parameter(names = "--disk", description = "The disk (in bytes) that
need to be allocated per source instance (applicable only to Docker runtime)")
protected Long disk;
@Parameter(names = "--sourceConfig", description = "Source config
key/values")
protected String sourceConfigString;
@@ -383,7 +378,7 @@ protected FunctionDetails createSourceConfig(SourceConfig
sourceConfig) {
}
}
- @Parameters(commandDescription = "Stops a Pulsar source")
+ @Parameters(commandDescription = "Stops a Pulsar IO source connector")
class DeleteSource extends BaseCommand {
@Parameter(names = "--tenant", description = "The tenant of a sink or
source")
diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml
index 46fc0a4fac..c0d483fe4e 100644
--- a/site/_data/cli/pulsar-admin.yaml
+++ b/site/_data/cli/pulsar-admin.yaml
@@ -510,6 +510,164 @@ commands:
argument: cluster-name
- name: delete
description: Delete namespace isolation policy of a cluster. This
operation requires superuser privileges.
+- name: sink
+ description: An interface for managing Pulsar IO sinks (egress data from
Pulsar)
+ subcommands:
+ - name: create
+ description: Submit a Pulsar IO sink connector to run in a Pulsar cluster
+ options:
+ - flags: --className
+ description: The sink's Java class name
+ - flags: --cpu
+ description: The CPU (in cores) that needs to be allocated per sink
instance (applicable only to the Docker runtime)
+ - flags: --customSerdeInputs
+ description: The map of input topics to SerDe class names (as a JSON
string)
+ - flags: --disk
+ description: The disk (in bytes) that needs to be allocated per sink
instance (applicable only to the Docker runtime)
+ - flags: --inputs
+ description: The sink's input topic(s) (multiple topics can be specified
as a comma-separated list)
+ - flags: --jar
+ description: Path to the Java jar file for the sink
+ - flags: --name
+ description: The sink's name
+ - flags: --namespace
+ description: The sink's namespace
+ - flags: --parallelism
+ description: |
+ "The sink's parallelism factor (i.e. the number of sink instances to
run)."
+ - flags: --processingGuarantees
+ description: |
+ "The processing guarantees (aka delivery semantics) applied to the
sink. Available values: `ATLEAST_ONCE`, `ATMOST_ONCE`, `EFFECTIVELY_ONCE`."
+ - flags: --ram
+ description: The RAM (in bytes) that needs to be allocated per sink
instance (applicable only to the Docker runtime)
+ - flags: --sinkConfig
+ description: Sink config key/values
+ - flags: --sinkConfigFile
+ description: The path to a YAML config file specifying the sink's
configuration
+ - flags: --tenant
+ description: The sink's tenant
+ - name: delete
+ description: Stops a Pulsar IO sink
+ options:
+ - flags: --name
+ description: The name of the sink
+ - flags: --namespace
+ description: The namespace of the sink
+ - flags: --tenant
+ description: The tenant of the sink
+ - name: localrun
+ description: Run the Pulsar sink locally (rather than in the Pulsar
cluster)
+ options:
+ - flags: --brokerServiceUrl
+ description: The URL for the Pulsar broker
+ - flags: --className
+ description: The sink's Java class name
+ - flags: --cpu
+ description: The CPU (in cores) that needs to be allocated per sink
instance (applicable only to the Docker runtime)
+ - flags: --customSerdeInputs
+ description: The map of input topics to SerDe class names (as a JSON
string)
+ - flags: --disk
+ description: The disk (in bytes) that needs to be allocated per sink
instance (applicable only to the Docker runtime)
+ - flags: --inputs
+ description: The sink's input topic(s) (multiple topics can be specified
as a comma-separated list)
+ - flags: --jar
+ description: Path to the Java jar file for the sink
+ - flags: --name
+ description: The sink's name
+ - flags: --namespace
+ description: The sink's namespace
+ - flags: --parallelism
+ description: |
+ "The sink's parallelism factor (i.e. the number of sink instances to
run)."
+ - flags: --processingGuarantees
+ description: |
+ "The processing guarantees (aka delivery semantics) applied to the
sink. Available values: `ATLEAST_ONCE`, `ATMOST_ONCE`, `EFFECTIVELY_ONCE`."
+ - flags: --ram
+ description: The RAM (in bytes) that needs to be allocated per sink
instance (applicable only to the Docker runtime)
+ - flags: --sinkConfig
+ description: Sink config key/values
+ - flags: --sinkConfigFile
+ description: The path to a YAML config file specifying the sink's
configuration
+ - flags: --tenant
+ description: The sink's tenant
+- name: source
+ description: An interface for managing Pulsar IO sources (ingress data into
Pulsar)
+ subcommands:
+ - name: create
+ description: Submit a Pulsar IO source connector to run in a Pulsar cluster
+ options:
+ - flags: --className
+ description: The source's Java class name
+ - flags: --cpu
+ description: The CPU (in cores) that needs to be allocated per source
instance (applicable only to the Docker runtime)
+ - flags: --deserializationClassName
+ description: The SerDe classname for the source
+ - flags: --destinationTopicName
+ description: The Pulsar topic to which data is sent
+ - flags: --disk
+ description: The disk (in bytes) that needs to be allocated per source
instance (applicable only to the Docker runtime)
+ - flags: --jar
+ description: Path to the Java jar file for the source
+ - flags: --name
+ description: The source's name
+ - flags: --namespace
+ description: The source's namespace
+ - flags: --parallelism
+ description: |
+ The source's parallelism factor (i.e. the number of source instances
to run).
+ - flags: --processingGuarantees
+ description: |
+ "The processing guarantees (aka delivery semantics) applied to the
source. Available values: `ATLEAST_ONCE`, `ATMOST_ONCE`, `EFFECTIVELY_ONCE`."
+ - flags: --ram
+ description: The RAM (in bytes) that needs to be allocated per source
instance (applicable only to the Docker runtime)
+ - flags: --sourceConfig
+ description: Source config key/values
+ - flags: --sourceConfigFile
+ description: The path to a YAML config file specifying the source's
configuration
+ - flags: --tenant
+ description: The source's tenant
+ - name: delete
+ description: Stops a Pulsar IO source
+ options:
+ - flags: --name
+ description: The name of the source
+ - flags: --namespace
+ description: The namespace of the source
+ - flags: --tenant
+ description: The tenant of the source
+ - name: localrun
+ description: Run the Pulsar source locally (rather than in the Pulsar
cluster)
+ options:
+ - flags: --className
+ description: The source's Java class name
+ - flags: --cpu
+ description: The CPU (in cores) that needs to be allocated per source
instance (applicable only to the Docker runtime)
+ - flags: --deserializationClassName
+ description: The SerDe classname for the source
+ - flags: --destinationTopicName
+ description: The Pulsar topic to which data is sent
+ - flags: --disk
+ description: The disk (in bytes) that needs to be allocated per source
instance (applicable only to the Docker runtime)
+ - flags: --jar
+ description: Path to the Java jar file for the source
+ - flags: --name
+ description: The source's name
+ - flags: --namespace
+ description: The source's namespace
+ - flags: --parallelism
+ description: |
+ The source's parallelism factor (i.e. the number of source instances
to run).
+ - flags: --processingGuarantees
+ description: |
+ "The processing guarantees (aka delivery semantics) applied to the
source. Available values: `ATLEAST_ONCE`, `ATMOST_ONCE`, `EFFECTIVELY_ONCE`."
+ - flags: --ram
+ description: The RAM (in bytes) that needs to be allocated per source
instance (applicable only to the Docker runtime)
+ - flags: --sourceConfig
+ description: Source config key/values
+ - flags: --sourceConfigFile
+ description: The path to a YAML config file specifying the source's
configuration
+ - flags: --tenant
+ description: The source's tenant
- name: topics
description: Operations for managing Pulsar topics (both persistent and non
persistent)
subcommands:
diff --git a/site/_data/connectors.yaml b/site/_data/connectors.yaml
new file mode 100644
index 0000000000..37d332cbbf
--- /dev/null
+++ b/site/_data/connectors.yaml
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+- name: Aerospike sink
+ url: https://www.aerospike.com/
+ class:
+ group: aerospike
+ name: AerospikeSink
+- name: Cassandra sink
+ url: https://cassandra.apache.org
+ class:
+ group: cassandra
+ name: CassandraSink
+- name: Kafka source
+ url: https://kafka.apache.org
+ class:
+ group: kafka
+ name: KafkaSource
+- name: Kafka sink
+ url: https://kafka.apache.org
+ class:
+ group: kafka
+ name: KafkaSink
+- name: RabbitMQ source
+ url: https://www.rabbitmq.com
+ class:
+ group: rabbitmq
+ name: RabbitMQSource
+- name: Twitter Firehose source
+ url: https://developer.twitter.com/en/docs
+ class:
+ group: twitter
+ name: TwitterFireHose
\ No newline at end of file
diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml
index 59752d098c..78724df4f6 100644
--- a/site/_data/sidebar.yaml
+++ b/site/_data/sidebar.yaml
@@ -134,6 +134,8 @@ groups:
- title: Cookbooks
dir: cookbooks
docs:
+ - title: Pulsar IO
+ endpoint: pulsar-io
- title: Tiered Storage
endpoint: tiered-storage
- title: Topic compaction
diff --git a/site/_includes/connectors.html b/site/_includes/connectors.html
new file mode 100644
index 0000000000..99c303a17d
--- /dev/null
+++ b/site/_includes/connectors.html
@@ -0,0 +1,44 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<table>
+ <thead>
+ <tr>
+ <th>Name</th>
+ <th>Java class</th>
+ </tr>
+ </thead>
+ <tbody>
+ {% for connector in site.data.connectors %}
+ <tr>
+ <td>
+ <a href="{{ connector.url }}">
+ {{ connector.name }}
+ </a>
+ </td>
+ <td>
+ <a
href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/{{
connector.class.group }}/src/main/java/org/apache/pulsar/connect/{{
connector.class.group }}/{{ connector.class.name }}.java">
+ <code>org.apache.pulsar.io.{{ connector.class.group }}.{{
connector.class.name }}</code>
+ </a>
+ </td>
+ </tr>
+ {% endfor %}
+ </tbody>
+</table>
\ No newline at end of file
diff --git a/site/_includes/figure.html b/site/_includes/figure.html
index cfda529ec7..b69fda5701 100644
--- a/site/_includes/figure.html
+++ b/site/_includes/figure.html
@@ -18,4 +18,11 @@
under the License.
-->
-<img src="{{ include.src }}"{{ if include.width }} width="{{ include.width
}}%"{{ endif }}{{ if include.alt }} alt="{{ include.alt }}"{{ endif }}>
\ No newline at end of file
+<figure>
+ <img src="{{ include.src }}"{% if include.width %} width="{{ include.width
}}%"{% endif %}{% if include.alt %} alt="{{ include.alt }}"{% endif %}>
+ {% if include.caption %}
+ <figcaption>
+ {{ include.caption }}
+ </figcaption>
+ {% endif %}
+</figure>
diff --git a/site/_sass/_docs.scss b/site/_sass/_docs.scss
index 8ce3e57c54..111f9dbe5d 100644
--- a/site/_sass/_docs.scss
+++ b/site/_sass/_docs.scss
@@ -162,6 +162,24 @@
margin-right: 2px;
}
+ figure {
+ margin: 2rem 0;
+
+ img {
+ display: block;
+ max-width: 100%;
+ margin: 0 auto;
+ }
+
+ figcaption {
+ text-align: center;
+ margin-top: 1rem;
+ font-size: 1.5rem;
+ font-weight: 500;
+ }
+ }
+
+
p img {
display: block;
margin: 20px auto;
diff --git a/site/docs/latest/cookbooks/pulsar-io.md
b/site/docs/latest/cookbooks/pulsar-io.md
new file mode 100644
index 0000000000..a5bf0ee5f1
--- /dev/null
+++ b/site/docs/latest/cookbooks/pulsar-io.md
@@ -0,0 +1,82 @@
+---
+title: The Pulsar IO cookbook
+---
+
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+[Pulsar IO](../../getting-started/ConceptsAndArchitecture#pulsar-io) is a
feature of Pulsar that enables you to easily create and manage **connectors**
that interface with external systems, such as databases and other messaging
systems.
+
+## Setup
+
+In order to run Pulsar IO connectors, you'll need to have a binary
distribution of pulsar locally.
+
+## Managing connectors
+
+Pulsar connectors can be managed using the
[`source`](../../reference/CliTools#pulsar-admin-source) and
[`sink`](../../reference/CliTools#pulsar-admin-sink) commands of the
[`pulsar-admin`](../../reference/CliTools#pulsar-admin) CLI tool.
+
+### Running sources
+
+You can use the [`create`](../../reference/CliTools#pulsar-admin-source-create)
+
+You can submit a sink to be run in an existing Pulsar cluster using a command
of this form:
+
+```bash
+$ ./bin/pulsar-admin sink create --className <classname> --jar <jar-location>
--tenant test --namespace <namespace> --name <sink-name> --inputs <input-topics>
+```
+
+Here’s an example command:
+
+```bash
+bin/pulsar-admin source create --className
org.apache.pulsar.io.twitter.TwitterFireHose --jar ~/application.jar --tenant
test --namespace ns1 --name twitter-source --destinationTopicName twitter_data
+```
+
+Instead of submitting a source to run on an existing Pulsar cluster, you
alternatively can run a source as a process on your local machine:
+
+```bash
+bin/pulsar-admin source localrun --className
org.apache.pulsar.io.twitter.TwitterFireHose --jar ~/application.jar --tenant
test --namespace ns1 --name twitter-source --destinationTopicName twitter_data
+```
+
+### Running Sinks
+
+You can submit a sink to be run in an existing Pulsar cluster using a command
of this form:
+
+```bash
+./bin/pulsar-admin sink create --className <classname> --jar <jar-location>
--tenant test --namespace <namespace> --name <sink-name> --inputs <input-topics>
+```
+
+Here’s an example command:
+
+```bash
+./bin/pulsar-admin sink create --className org.apache.pulsar.io.cassandra
--jar ~/application.jar --tenant test --namespace ns1 --name cassandra-sink
--inputs test_topic
+```
+
+Instead of submitting a sink to run on an existing Pulsar cluster, you
alternatively can run a sink as a process on your local machine:
+
+```bash
+./bin/pulsar-admin sink localrun --className org.apache.pulsar.io.cassandra
--jar ~/application.jar --tenant test --namespace ns1 --name cassandra-sink
--inputs test_topic
+```
+
+## Available connectors
+
+At the moment, the following connectors are available for Pulsar:
+
+{% include connectors.html %}
diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
index 96c0be2174..99414174d3 100644
--- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
+++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
@@ -587,6 +587,34 @@ Pulsar currently supports S3 as a long term store.
Offloading to S3 triggered vi
{% include admonition.html type="info" content="For a guide for setting up
tiered storage, see the [Tiered storage
cookbook](../../cookbooks/tiered-storage)." %}
+## Pulsar IO
+
+Messaging systems are most powerful when you can easily use them in
conjunction with external systems like databases and other messaging systems.
**Pulsar IO** is a feature of Pulsar that enables you to easily create, deploy,
and manage Pulsar **connectors** that interact with external systems, such as
[Apache Cassandra](https://cassandra.apache.org),
[Aerospike](https://www.aerospike.com), and many others.
+
+{% include admonition.html type="info" title="Pulsar IO and Pulsar Functions"
+ content="Under the hood, Pulsar IO connectors are specialized [Pulsar
Functions](#pulsar-functions) purpose-built to interface with external systems.
The [administrative interface](../../cookbooks/pulsar-io) for Pulsar IO is, in
fact, quite similar to that of Pulsar Functions." %}
+
+### Sources and sinks
+
+Pulsar IO connectors come in two types:
+
+* **Sources** feed data *into* Pulsar from other systems. Common sources
include other messaging systems and "firehose"-style data pipeline APIs.
+* **Sinks** are fed data *from* Pulsar. Common sinks include other messaging
systems and SQL and NoSQL databases.
+
+This diagram illustrates the relationship between sources, sinks, and Pulsar:
+
+{% include figure.html src="/img/pulsar-io.png" alt="Pulsar IO diagram"
caption="Pulsar IO connectors (sources and sinks)" width="80" %}
+
+### Working with connectors
+
+Pulsar IO connectors can be managed via the
[`pulsar-admin`](../../reference/CliTools#pulsar-admin) CLI tool, in particular
the [`source`](../../reference/CliTools#pulsar-admin-source) and
[`sink`](../../reference/CliTools#pulsar-admin-sink) commands.
+
+{% include admonition.html type="info" content="For a guide to managing
connectors in your Pulsar installation, see the [Pulsar IO
cookbook](../../cookbooks/pulsar-io#managing-connectors)." %}
+
+The following connectors are currently available for Pulsar:
+
+{% include connectors.html %}
+
## Schema registry
Type safety is extremely important in any application built around a message
bus like Pulsar. {% popover Producers %} and {% popover consumers %} need some
kind of mechanism for coordinating types at the {% popover topic %} level lest
a wide variety of potential problems arise (for example serialization and
deserialization issues). Applications typically adopt one of two basic
approaches to type safety in messaging:
diff --git a/site/img/pulsar-io.png b/site/img/pulsar-io.png
new file mode 100644
index 0000000000..3e74d4bab7
Binary files /dev/null and b/site/img/pulsar-io.png differ
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services