This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f3d37d0 [Doc] Update *Develop Connectors Guide* (#5181) f3d37d0 is described below commit f3d37d0bf9f01011e0e1ccc306e61c8202da2073 Author: Anonymitaet <50226895+anonymit...@users.noreply.github.com> AuthorDate: Tue Sep 17 22:02:41 2019 +0800 [Doc] Update *Develop Connectors Guide* (#5181) --- site2/docs/io-develop.md | 278 ++++++++++++++++++++++++++--------------------- 1 file changed, 156 insertions(+), 122 deletions(-) diff --git a/site2/docs/io-develop.md b/site2/docs/io-develop.md index ff15e26..27aa677 100644 --- a/site2/docs/io-develop.md +++ b/site2/docs/io-develop.md @@ -1,156 +1,188 @@ --- id: io-develop -title: Develop Connectors -sidebar_label: Developing Connectors +title: How to develop Pulsar connectors +sidebar_label: Develop --- -This guide describes how developers can write new connectors for Pulsar IO to move data -between Pulsar and other systems. It describes how to create a Pulsar IO connector. +This guide describes how to develop Pulsar connectors to move data +between Pulsar and other systems. -Pulsar IO connectors are specialized [Pulsar Functions](functions-overview.md). So writing -a Pulsar IO connector is as simple as writing a Pulsar function. Pulsar IO connectors come -in two flavors: {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}, -which import data from another system, and {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java}, -which export data to another system. For example, [KinesisSink](io-kinesis.md) would export -the messages of a Pulsar topic to a Kinesis stream, and [RabbitmqSource](io-rabbitmq.md) would import -the messages of a RabbitMQ queue to a Pulsar topic. +Pulsar connectors are special [Pulsar Functions](functions-overview.md), so creating +a Pulsar connector is similar to creating a Pulsar function. -### Developing +Pulsar connectors come in two types: -#### Develop a source connector +| Type | Description | Example +|---|---|--- +{@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}|Import data from another system to Pulsar.|[RabbitMQ source connector](io-rabbitmq.md) imports the messages of a RabbitMQ queue to a Pulsar topic. +{@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java}|Export data from Pulsar to another system.|[Kinesis sink connector](io-kinesis.md) exports the messages of a Pulsar topic to a Kinesis stream. -What you need to develop a source connector is to implement {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java} -interface. +## Develop -First, you need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method. This method will be called once when the source connector -is initialized. In this method, you can retrieve all the connector specific settings through -the passed `config` parameter, and initialize all the necessary resourcess. For example, a Kafka -connector can create the Kafka client in this `open` method. +You can develop Pulsar source connectors and sink connectors. -Beside the passed-in `config` object, the Pulsar runtime also provides a `SourceContext` for the -connector to access runtime resources for tasks like collecting metrics. The implementation can -save the `SourceContext` for futher usage. +### Source -```java +Developing a source connector is to implement the {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java} +interface, which means you need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method and the {@inject: github:`record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} method. + +1. Implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method. + + ```java /** - * Open connector with configuration - * - * @param config initialization config - * @param sourceContext - * @throws Exception IO type exceptions when opening a connector - */ + * Open connector with configuration + * + * @param config initialization config + * @param sourceContext + * @throws Exception IO type exceptions when opening a connector + */ void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception; -``` + ``` -The main task for a Source implementor is to implement {@inject: github:`read`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41} -method. + This method is called when the source connector is initialized. -```java - /** - * Reads the next message from source. - * If source does not have any new messages, this call should block. - * @return next message from source. The return result should never be null - * @throws Exception - */ - Record<T> read() throws Exception; -``` + In this method, you can retrieve all connector specific settings through the passed-in `config` parameter and initialize all necessary resources. + + For example, a Kafka connector can create a Kafka client in this `open` method. -The implementation should be blocking on this method if nothing to return. It should never return -`null`. The returned {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should encapsulates the information that is needed by -Pulsar IO runtime. + Besides, Pulsar runtime also provides a `SourceContext` for the + connector to access runtime resources for tasks like collecting metrics. The implementation can save the `SourceContext` for future use. -These information includes: +2. Implement the {@inject: github:`read`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41} method. -- *Topic Name*: _Optional_. If the record is originated from a Pulsar topic, it should be the Pulsar topic name. -- *Key*: _Optional_. If the record has a key associated with it. -- *Value*: _Required_. The actual data of this record. -- *Partition Id*: _Optional_. If the record is originated from a partitioned source, - return its partition id. The partition id will be used as part of the unique identifier - by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee. -- *Record Sequence*: _Optional_. If the record is originated from a sequential source, - return its record sequence. The record sequence will be used as part of the unique identifier - by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee. -- *Properties*: _Optional_. If the record carries user-defined properties, return those properties. + ```java + /** + * Reads the next message from source. + * If source does not have any new messages, this call should block. + * @return next message from source. The return result should never be null + * @throws Exception + */ + Record<T> read() throws Exception; + ``` -Additionally, the implemention of the record should provide two methods: `ack` and `fail`. These -two methods will be used by Pulsar IO connector to acknowledge the records that it has done -processing and fail the records that it has failed to process. + If nothing to return, the implementation should be blocking rather than returning `null`. -{@inject: github:`KafkaSource`:/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java} is a good example to follow. + The returned {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should encapsulate the following information, which is needed by Pulsar IO runtime. -#### Develop a sink connector + * {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should provide the following variables: -Developing a sink connector is as easy as developing a source connector. You just need to -implement {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java} interface. + |Variable|Required|Description + |---|---|--- + `TopicName`|No|Pulsar topic name from which the record is originated from. + `Key`|No| Messages can optionally be tagged with keys.<br/><br/>For more information, see [Routing modes](concepts-messaging.md#routing-modes).| + `Value`|Yes|Actual data of the record. + `EventTime`|No|Event time of the record from the source. + `PartitionId`|No| If the record is originated from a partitioned source, it returns its `PartitionId`. <br/><br/>`PartitionId` is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee. + `RecordSequence`|No|If the record is originated from a sequential source, it returns its `RecordSequence`.<br/><br/>`RecordSequence` is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee. + `Properties` |No| If the record carries user-defined properties, it returns those properties. + `DestinationTopic`|No|Topic to which message should be written. + `Message`|No|A class which carries data sent by users.<br/><br/>For more information, see [Message.java](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java).| -Similarly, you first need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36} method to initialize all the necessary resources -before implementing the {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method. + * {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should provide the following methods: -```java - /** - * Open connector with configuration - * - * @param config initialization config - * @param sinkContext - * @throws Exception IO type exceptions when opening a connector - */ - void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception; -``` + Method|Description + |---|--- + `ack` |Acknowledge that the record is fully processed. + `fail`|Indicate that the record fails to be processed. -The main task for a Sink implementor is to implement {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method. +> #### Tip +> +> For more information about **how to create a source connector**, see {@inject: github:`KafkaSource`:/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java}. -```java - /** - * Write a message to Sink - * @param inputRecordContext Context of input record from the source - * @param record record to write to sink - * @throws Exception - */ - void write(Record<T> record) throws Exception; -``` +### Sink + +Developing a sink connector **is similar to** developing a source connector, that is, you need to implement the {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java} interface, which means implementing the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36} method and the {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method. + +1. Implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36} method. + + ```java + /** + * Open connector with configuration + * + * @param config initialization config + * @param sinkContext + * @throws Exception IO type exceptions when opening a connector + */ + void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception; + ``` + +2. Implement the {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method. + + ```java + /** + * Write a message to Sink + * @param inputRecordContext Context of input record from the source + * @param record record to write to sink + * @throws Exception + */ + void write(Record<T> record) throws Exception; + ``` + + During the implementation, you can decide how to write the `Value` and + the `Key` to the actual source, and leverage all the provided information such as + `PartitionId` and `RecordSequence` to achieve different processing guarantees. -In the implemention of `write` method, the implementor can decide how to write the value and -the optional key to the actual source, and leverage all the provided information such as -`Partition Id`, `Record Sequence` for achieving different processing guarantees. The implementor -is also responsible for acknowledging records if it has successfully written them or failing -records if has failed to write them. + You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). -### Testing +## Test Testing connectors can be challenging because Pulsar IO connectors interact with two systems -that may be difficult to mock - Pulsar and the system the connector is connecting to. It is -recommended to write very specificially test the functionalities of the connector classes -while mocking the external services. - -Once you have written sufficient unit tests for your connector, we also recommend adding -separate integration tests to verify end-to-end functionality. In Pulsar, we are using -[testcontainers](https://www.testcontainers.org/) for all Pulsar integration tests. Pulsar IO -{@inject: github:`IntegrationTests`:/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io} are good examples to follow on integration testing your connectors. - -### Packaging - -Once you've developed and tested your connector, you must package it so that it can be submitted -to a [Pulsar Functions](functions-overview.md) cluster. There are two approaches described -here work with Pulsar Functions' runtime. - -If you plan to package and distribute your connector for others to use, you are obligated to -properly license and copyright your own code and to adhere to the licensing and copyrights of -all libraries your code uses and that you include in your distribution. If you are using the -approach described in ["Creating a NAR package"](#creating-a-nar-package), the NAR plugin will -automatically create a `DEPENDENCIES` file in the generated NAR package, including the proper +that may be difficult to mock—Pulsar and the system to which the connector is connecting. + +It is +recommended writing special tests to test the connector functionalities as below +while mocking the external service. + +### Unit test + +You can create unit tests for your connector. + +### Integration test + +Once you have written sufficient unit tests, you can add +separate integration tests to verify end-to-end functionality. + +Pulsar uses +[testcontainers](https://www.testcontainers.org/) **for all integration tests**. + +> #### Tip +> +>For more information about **how to create integration tests for Pulsar connectors**, see {@inject: github:`IntegrationTests`:/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io}. + +## Package + +Once you've developed and tested your connector, you need to package it so that it can be submitted +to a [Pulsar Functions](functions-overview.md) cluster. + +There are two methods to +work with Pulsar Functions' runtime, that is, [NAR](#nar) and [uber JAR](#uber-jar). + +> #### Note +> +> If you plan to package and distribute your connector for others to use, you are obligated to +license and copyright your own code properly. Remember to add the license and copyright to +all libraries your code uses and to your distribution. +> +> If you use the [NAR](#nar) method, the NAR plugin +automatically creates a `DEPENDENCIES` file in the generated NAR package, including the proper licensing and copyrights of all libraries of your connector. -#### Creating a NAR package +### NAR -The easiest approach to packaging a Pulsar IO connector is to create a NAR package using -[nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin). +**NAR** stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide +a bit of Java ClassLoader isolation. -NAR stands for NiFi Archive. It is a custom packaging mechanism used by Apache NiFi, to provide -a bit of Java ClassLoader isolation. For more details, you can read this -[blog post](https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd) to understand -how NAR works. Pulsar uses the same mechanism for packaging all the [builtin connectors](io-connectors). +> #### Tip +> +> For more information about **how NAR works**, see +> [here](https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd). + +Pulsar uses the same mechanism for packaging **all** [built-in connectors](io-connectors). + +The easiest approach to package a Pulsar connector is to create a NAR package using +[nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin). -All what you need is to include this [nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin) in your maven project for your connector. For example: +All you need to do is to include this [nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin) in your maven project for your connector as below. ```xml <plugins> @@ -162,14 +194,16 @@ All what you need is to include this [nifi-nar-maven-plugin](https://mvnreposito </plugins> ``` -The {@inject: github:`TwitterFirehose`:/pulsar-io/twitter} connector is a good example to follow. +> #### Tip +> +> For more information about an **how to use NAR for Pulsar connectors**, see {@inject: github:`TwitterFirehose`:/pulsar-io/twitter/pom.xml#L79}. -#### Creating an Uber JAR +### Uber JAR -An alternative approach is to create an _uber JAR_ that contains all of the connector's JAR files +An alternative approach is to create an **uber JAR** that contains all of the connector's JAR files and other resource files. No directory internal structure is necessary. -You can use [maven-shade-plugin](https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html) to create a Uber JAR. For example: +You can use [maven-shade-plugin](https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html) to create a uber JAR as below: ```xml <plugin>