This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08ef448 [doc] Generate connector yaml files for connectors (#3065)
08ef448 is described below
commit 08ef448c1c7e70db345092c2b56662c7b6f888fe
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Nov 27 10:08:32 2018 -0800
[doc] Generate connector yaml files for connectors (#3065)
*Motivation*
Include the example yaml files in the io distribution package
---
distribution/io/pom.xml | 1 +
.../org/apache/pulsar/io/canal/CanalSource.java | 9 +++-
.../apache/pulsar/io/canal/CanalSourceConfig.java | 41 +++++++++++++++-
.../pulsar/io/cassandra/CassandraSinkConfig.java | 21 +++++++++
.../pulsar/io/cassandra/CassandraStringSink.java | 7 +++
pulsar-io/docs/pom.xml | 45 ++++++++++++++++++
.../io/elasticsearch/ElasticSearchConfig.java | 31 +++++++++++++
.../pulsar/io/elasticsearch/ElasticSearchSink.java | 8 ++++
.../apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java | 8 ++++
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 31 +++++++++++++
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 8 ++++
.../pulsar/io/kinesis/KinesisSinkConfig.java | 54 +++++++++++++++++++++-
.../apache/pulsar/io/rabbitmq/RabbitMQConfig.java | 16 +++++++
.../apache/pulsar/io/rabbitmq/RabbitMQSource.java | 7 +++
.../apache/pulsar/io/twitter/TwitterFireHose.java | 8 ++++
.../pulsar/io/twitter/TwitterFireHoseConfig.java | 44 +++++++++++++++++-
16 files changed, 334 insertions(+), 5 deletions(-)
diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml
index 67e3509..125fe70 100644
--- a/distribution/io/pom.xml
+++ b/distribution/io/pom.xml
@@ -79,6 +79,7 @@
</goals>
<configuration>
<executable>${project.basedir}/../../src/pulsar-io-gen</executable>
+
<outputFile>${project.basedir}/target/pulsar-io-gen.output</outputFile>
<arguments>
<argument>conf</argument>
<argument>-o</argument>
diff --git
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
index 9021b23..c348964 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
@@ -30,6 +30,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.MDC;
import java.net.InetSocketAddress;
@@ -39,8 +41,13 @@ import java.util.Objects;
import java.util.Optional;
/**
- * A Simple class for mysql binlog sync to pulsar
+ * A Simple class for mysql binlog sync to pulsar.
*/
+@Connector(
+ name = "canal",
+ type = IOType.SOURCE,
+ help = "The CanalSource is used for syncing mysql binlog to Pulsar.",
+ configClass = CanalSourceConfig.class)
@Slf4j
public class CanalSource extends PushSource<byte[]> {
diff --git
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
index 87cbc9f..1af70c1 100644
---
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
+++
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
@@ -27,8 +27,12 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+/**
+ * Canal source config.
+ */
@Data
@Setter
@Getter
@@ -37,14 +41,47 @@ import java.util.Map;
@Accessors(chain = true)
public class CanalSourceConfig implements Serializable{
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Username to connect to mysql database")
private String username;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Password to connect to mysql database")
private String password;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Source destination that Canal source connector connects to")
private String destination;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The mysql database hostname")
private String singleHostname;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The mysql database port")
private int singlePort;
- private Boolean cluster;
+ @FieldDoc(
+ required = true,
+ defaultValue = "false",
+ help = "If setting to true, it will be talking to `zkServers` to
figure out the actual database hosts."
+ + " If setting to false, it will connect to the database specified
by `singleHostname` and `singlePort`.")
+ private Boolean cluster = false;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The zookeeper servers that canal source connector talks to
figure out the actual database hosts")
private String zkServers;
- private int batchSize;
+ @FieldDoc(
+ required = false,
+ defaultValue = "1000",
+ help = "The batch size to fetch from canal.")
+ private int batchSize = 1000;
public static CanalSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
index c3f6587..2406952 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSinkConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@Setter
@@ -39,10 +40,30 @@ public class CassandraSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "A comma-separated list of cassandra hosts to connect to")
private String roots;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The key space used for writing pulsar messages to")
private String keyspace;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The key name of the cassandra column family")
private String keyname;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The cassandra column family name")
private String columnFamily;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The column name of the cassandra column family")
private String columnName;
public static CassandraSinkConfig load(String yamlFile) throws IOException
{
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index 4e7feb5..694b79b 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -21,11 +21,18 @@ package org.apache.pulsar.io.cassandra;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
/**
* Cassandra sink that treats incoming messages on the input topic as Strings
* and write identical key/value pairs.
*/
+@Connector(
+ name = "cassandra",
+ type = IOType.SINK,
+ help = "The CassandraStringSink is used for moving messages from Pulsar to
Cassandra.",
+ configClass = CassandraSinkConfig.class)
public class CassandraStringSink extends CassandraAbstractSink<String, String>
{
@Override
public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 04fffee..04d44c8 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -49,9 +49,54 @@
<!-- include connectors -->
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-canal</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-cassandra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-data-generator</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-elastic-search</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-hdfs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-kinesis</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-rabbitmq</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-twitter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 8b54353..f929f97 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -31,6 +31,7 @@ import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
* Configuration class for the ElasticSearch Sink Connector.
@@ -45,16 +46,46 @@ public class ElasticSearchConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The url of elastic search cluster that the connector connects
to"
+ )
private String elasticSearchUrl;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The index name that the connector writes messages to"
+ )
private String indexName;
+ @FieldDoc(
+ required = false,
+ defaultValue = "1",
+ help = "The number of shards of the index"
+ )
private int indexNumberOfShards = 1;
+ @FieldDoc(
+ required = false,
+ defaultValue = "1",
+ help = "The number of replicas of the index"
+ )
private int indexNumberOfReplicas = 1;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The username used by the connector to connect to the elastic
search cluster. If username is set, a password should also be provided."
+ )
private String username;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The password used by the connector to connect to the elastic
search cluster. If password is set, a username should also be provided"
+ )
private String password;
public static ElasticSearchConfig load(String yamlFile) throws IOException
{
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 86546f3..dca1504 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -33,6 +33,8 @@ import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -51,6 +53,12 @@ import org.elasticsearch.common.xcontent.XContentType;
* Users need to implement extractKeyValue function to use this sink.
* This class assumes that the input will be JSON documents
*/
+@Connector(
+ name = "elastic_search",
+ type = IOType.SINK,
+ help = "A sink connector that sends pulsar messages to elastic search",
+ configClass = ElasticSearchConfig.class
+)
public class ElasticSearchSink implements Sink<byte[]> {
protected static final String DOCUMENT = "doc";
diff --git
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index c18dc81..640972a 100644
---
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
+++
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -23,11 +23,19 @@ import java.sql.PreparedStatement;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
/**
* A Simple Jdbc sink, which interprets input Record in generic record.
*/
+@Connector(
+ name = "jdbc",
+ type = IOType.SINK,
+ help = "A simple JDBC sink that writes pulser messages to a database
table",
+ configClass = JdbcSinkConfig.class
+)
@Slf4j
public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
diff --git
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 6cc95d6..b0508ba 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@Setter
@@ -39,13 +40,43 @@ public class JdbcSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "Username used to connect to the database specified by
`jdbcUrl`"
+ )
private String userName;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "Password used to connect to the database specified by
`jdbcUrl`"
+ )
private String password;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The JDBC url of the database this connector connects to"
+ )
private String jdbcUrl;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The name of the table this connector writes messages to"
+ )
private String tableName;
// Optional
+ @FieldDoc(
+ required = false,
+ defaultValue = "500",
+ help = "The jdbc operation timeout in milliseconds"
+ )
private int timeoutMs = 500;
+ @FieldDoc(
+ required = false,
+ defaultValue = "200",
+ help = "The batch size of updates made to the database"
+ )
private int batchSize = 200;
public static JdbcSinkConfig load(String yamlFile) throws IOException {
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 1056f57..5d4c8a0 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -51,6 +51,8 @@ import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +82,12 @@ import org.slf4j.LoggerFactory;
*
*
*/
+@Connector(
+ name = "kinesis",
+ type = IOType.SINK,
+ help = "A sink connector that copies messages from Pulsar to Kinesis",
+ configClass = KinesisSinkConfig.class
+)
public class KinesisSink implements Sink<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(KinesisSink.class);
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index ba476ab..3345026 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -28,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@Setter
@@ -39,13 +40,64 @@ public class KinesisSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Kinesis end-point url. It can be found at
https://docs.aws.amazon.com/general/latest/gr/rande.html"
+ )
private String awsEndpoint;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Appropriate aws region. E.g. us-west-1, us-west-2"
+ )
private String awsRegion;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Kinesis stream name"
+ )
private String awsKinesisStreamName;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "Fully-Qualified class name of implementation of
AwsCredentialProviderPlugin."
+ + " It is a factory class which creates an AWSCredentialsProvider
that will be used by Kinesis Sink."
+ + " If it is empty then KinesisSink will create a default
AWSCredentialsProvider which accepts json-map"
+ + " of credentials in `awsCredentialPluginParam`")
private String awsCredentialPluginName;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "json-parameters to initialize `AwsCredentialsProviderPlugin`")
private String awsCredentialPluginParam;
+ @FieldDoc(
+ required = true,
+ defaultValue = "ONLY_RAW_PAYLOAD",
+ help = "Message format in which kinesis sink converts pulsar messages
and publishes to kinesis streams.\n"
+ + " #\n"
+ + " # The available messages formats are: \n"
+ + " #\n"
+ + " # - ONLY_RAW_PAYLOAD \n"
+ + " #\n"
+ + " # Kinesis sink directly publishes pulsar message payload as
a message into the configured kinesis stream. \n"
+ + " #\n"
+ + " # - FULL_MESSAGE_IN_JSON \n"
+ + " #\n"
+ + " # Kinesis sink creates a json payload with pulsar message
payload, properties and encryptionCtx, \n"
+ + " # and publishes json payload into the configured kinesis
stream.\n"
+ + " #\n"
+ + " # - FULL_MESSAGE_IN_FB \n"
+ + " #\n"
+ + " # Kinesis sink creates a flatbuffer serialized paylaod with
pulsar message payload, \n"
+ + " # properties and encryptionCtx, and publishes flatbuffer
payload into the configured kinesis stream."
+ )
private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; //
default : ONLY_RAW_PAYLOAD
- private boolean retainOrdering;
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "A flag to tell Pulsar IO to retain ordering when moving
messages from Pulsar to Kinesis")
+ private boolean retainOrdering = false;
public static KinesisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
index 33bc9c1..f4fd61f 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
@@ -28,7 +28,11 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+/**
+ * RabbitMQ source connector config.
+ */
@Data
@Setter
@Getter
@@ -39,8 +43,20 @@ public class RabbitMQConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The connection name used for connecting to RabbitMQ")
private String connectionName;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The AMQ uri used for connecting to RabbitMQ")
private String amqUri;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The RabbitMQ queue name")
private String queueName;
public static RabbitMQConfig load(String yamlFile) throws IOException {
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index 2277cbb..6bb9613 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -35,12 +35,19 @@ import lombok.Data;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple connector to consume messages from a RabbitMQ queue
*/
+@Connector(
+ name = "rabbitmq",
+ type = IOType.SOURCE,
+ help = "A simple connector to move messages from a RabbitMQ queue to a
Pulsar topic",
+ configClass = RabbitMQConfig.class)
public class RabbitMQSource extends PushSource<byte[]> {
private static Logger logger =
LoggerFactory.getLogger(RabbitMQSource.class);
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index fc61945..e1d0545 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -43,12 +43,20 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple Push based Twitter FireHose Source
*/
+@Connector(
+ name = "twitter",
+ type = IOType.SOURCE,
+ help = "A simple connector moving tweets from Twitter FireHose to Pulsar",
+ configClass = TwitterFireHoseConfig.class
+)
@Slf4j
public class TwitterFireHose extends PushSource<TweetData> {
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
index 88acb33..c9bf4df 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
@@ -29,6 +29,7 @@ import java.util.Map;
import com.twitter.hbc.core.Constants;
import lombok.*;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@Setter
@@ -40,18 +41,59 @@ public class TwitterFireHoseConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Your twitter app consumer key. See
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+ )
private String consumerKey;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Your twitter app consumer secret. See
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+ )
private String consumerSecret;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Your twitter app token. See
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+ )
private String token;
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Your twitter app token secret. See
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
+ )
private String tokenSecret;
// Most firehose events have null createdAt time. If this parameter is set
to true
// then we estimate the createdTime of each firehose event to be current
time.
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "Most firehose events have null createdAt time."
+ + " If this parameter is set to true, the connector estimates the
createdTime of each firehose event to be current time."
+ )
private Boolean guestimateTweetTime = false;
// ------ Optional property keys
- private String clientName = "openconnector-twitter-source";
+ @FieldDoc(
+ required = false,
+ defaultValue = "pulsario-twitter-source",
+ help = "The Twitter Firehose Client name"
+ )
+ private String clientName = "pulsario-twitter-source";
+ @FieldDoc(
+ required = false,
+ defaultValue = Constants.STREAM_HOST,
+ help = "The Twitter Firehose stream hosts that the connector connects
to"
+ )
private String clientHosts = Constants.STREAM_HOST;
+ @FieldDoc(
+ required = false,
+ defaultValue = "50000",
+ help = "The Twitter Firehose client buffer size"
+ )
private int clientBufferSize = 50000;
public static TwitterFireHoseConfig load(String yamlFile) throws
IOException {