http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md new file mode 100644 index 0000000..ce011b3 --- /dev/null +++ b/docs/dev/connectors/kinesis.md @@ -0,0 +1,319 @@ +--- +title: "Amazon AWS Kinesis Streams Connector" +nav-title: Kinesis +nav-parent_id: connectors +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/). + +To use the connector, add the following Maven dependency to your project: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kinesis{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +**The `flink-connector-kinesis{{ site.scala_version_suffix }}` has a dependency on code licensed under the [Amazon Software License](https://aws.amazon.com/asl/) (ASL). +Linking to the flink-connector-kinesis will include ASL licensed code into your application.** + +The `flink-connector-kinesis{{ site.scala_version_suffix }}` artifact is not deployed to Maven central as part of +Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source. + +Download the Flink source or check it out from the git repository. Then, use the following Maven command to build the module: +{% highlight bash %} +mvn clean install -Pinclude-kinesis -DskipTests +# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again. +cd flink-dist +mvn clean install -Pinclude-kinesis -DskipTests +{% endhighlight %} + + +The streaming connectors are not part of the binary distribution. See how to link with them for cluster +execution [here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +### Using the Amazon Kinesis Streams Service +Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) +to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams. + +### Kinesis Consumer + +The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis +streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is +responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will +change as shards are closed and created by Kinesis. + +Before consuming data from Kinesis streams, make sure that all streams are created with the status "ACTIVE" in the AWS dashboard. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +Properties consumerConfig = new Properties(); +consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); + +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( + "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val consumerConfig = new Properties(); +consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); + +val env = StreamExecutionEnvironment.getEnvironment + +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( + "kinesis_stream_name", new SimpleStringSchema, consumerConfig)) +{% endhighlight %} +</div> +</div> + +The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties` +instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example +demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which +the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting +`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, and `AUTO`). Also, data is being consumed +from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` +to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible). + +Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. + +#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics + +With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and +periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the +state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that +was stored in the checkpoint. + +The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. + +To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} +</div> +</div> + +Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. +Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. +Flink on YARN supports automatic restart of lost YARN containers. + +#### Event Time for Consumed Records + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} +</div> +</div> + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( + "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( + "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig)) +kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner) +{% endhighlight %} +</div> +</div> + +#### Threading Model + +The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption. + +For shard discovery, each parallel consumer subtask will have a single thread that constantly queries Kinesis for shard +information even if the subtask initially did not have shards to read from when the consumer was started. In other words, if +the consumer is run with a parallelism of 10, there will be a total of 10 threads constantly querying Kinesis regardless +of the total amount of shards in the subscribed streams. + +For data consumption, a single thread will be created to consume each discovered shard. Threads will terminate when the +shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be +one thread per open shard. + +#### Internally Used Kinesis APIs + +The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs +for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) +on the APIs, the consumer will be competing with other non-Flink consuming applications that the user may be running. +Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information +on how to deal with any errors or warnings that the Flink Kinesis Consumer may have due to these service limits. + +- *[DescribeStream](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)*: this is constantly called +by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default, +the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result +from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of +calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied +configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts +the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval. + +- *[GetShardIterator](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*: this is called +only once when per shard consuming threads are started, and will retry if Kinesis complains that the transaction limit for the +API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream), +the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other +non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by +setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties. + +- *[GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*: this is constantly called +by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there +are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call +of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded, +up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput +of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and +`ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former +adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 100), while +the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the +consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`. + +### Kinesis Producer + +The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in +Flink's checkpointing and doesn't provide exactly-once processing guarantees. +Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). + +In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. + +To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. + +For the monitoring to work, the user accessing the stream needs access to the Cloud watch service. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +Properties producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); +kinesis.setFailOnError(true); +kinesis.setDefaultStream("kinesis_stream_name"); +kinesis.setDefaultPartition("0"); + +DataStream<String> simpleStringStream = ...; +simpleStringStream.addSink(kinesis); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig); +kinesis.setFailOnError(true); +kinesis.setDefaultStream("kinesis_stream_name"); +kinesis.setDefaultPartition("0"); + +val simpleStringStream = ...; +simpleStringStream.addSink(kinesis); +{% endhighlight %} +</div> +</div> + +The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties` +instance as described above for the consumer. The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". + +Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is +done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. +Otherwise, the returned stream name is used. + +Other optional configuration keys for the producer can be found in `ProducerConfigConstants`. + + +### Using Non-AWS Kinesis Endpoints for Testing + +It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as +[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink +application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property. + +To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the +Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is +required, it will not be used to determine the AWS endpoint URL. + +The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +Properties producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val producerConfig = new Properties(); +producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); +producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567"); +{% endhighlight %} +</div> +</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/nifi.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/nifi.md b/docs/dev/connectors/nifi.md new file mode 100644 index 0000000..924a80b --- /dev/null +++ b/docs/dev/connectors/nifi.md @@ -0,0 +1,138 @@ +--- +title: "Apache NiFi Connector" +nav-title: NiFi +nav-parent_id: connectors +nav-pos: 8 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +This connector provides a Source and Sink that can read from and write to +[Apache NiFi](https://nifi.apache.org/). To use this connector, add the +following dependency to your project: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-nifi{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution) +for information about how to package the program with the libraries for +cluster execution. + +#### Installing Apache NiFi + +Instructions for setting up a Apache NiFi cluster can be found +[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi). + +#### Apache NiFi Source + +The connector provides a Source for reading data from Apache NiFi to Apache Flink. + +The class `NiFiSource(â¦)` provides 2 constructors for reading data from NiFi. + +- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(â¦)` given the client's SiteToSiteConfig and a + default wait time of 1000 ms. + +- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(â¦)` given the client's + SiteToSiteConfig and the specified wait time (in milliseconds). + +Example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data for Flink") + .requestBatchCount(5) + .buildConfig(); + +SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() + +val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data for Flink") + .requestBatchCount(5) + .buildConfig() + +val nifiSource = new NiFiSource(clientConfig) +{% endhighlight %} +</div> +</div> + +Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi +Site-to-site protocol configuration. + +#### Apache NiFi Sink + +The connector provides a Sink for writing data from Apache Flink to Apache NiFi. + +The class `NiFiSink(â¦)` provides a constructor for instantiating a `NiFiSink`. + +- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a `NiFiSink(â¦)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi. + +Example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data from Flink") + .requestBatchCount(5) + .buildConfig(); + +SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...}); + +streamExecEnv.addSink(nifiSink); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() + +val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data from Flink") + .requestBatchCount(5) + .buildConfig() + +val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...}) + +streamExecEnv.addSink(nifiSink) +{% endhighlight %} +</div> +</div> + +More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site) http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/rabbitmq.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md new file mode 100644 index 0000000..02def40 --- /dev/null +++ b/docs/dev/connectors/rabbitmq.md @@ -0,0 +1,129 @@ +--- +title: "RabbitMQ Connector" +nav-title: RabbitMQ +nav-parent_id: connectors +nav-pos: 7 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following dependency to your project: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-rabbitmq{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +#### Installing RabbitMQ +Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched. + +#### RabbitMQ Source + +A class which provides an interface for receiving data from RabbitMQ. + +The followings have to be provided for the `RMQSource(â¦)` constructor in order: + +- RMQConnectionConfig. +- queueName: The RabbitMQ queue name. +- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`). +- deserializationSchema: Deserialization schema to turn messages into Java objects. + +This source can be operated in three different modes: + +1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with + unique correlation IDs. +2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism + (correlation id is not set). +3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode. + +Correlation ids are a RabbitMQ application feature. You have to set it in the message properties +when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not supply +unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore +messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you don't +have to supply correlation ids. + +Example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build(); +DataStream<String> streamWithoutCorrelationIds = env + .addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema())) + .print + +DataStream<String> streamWithCorrelationIds = env + .addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema())) + .print +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build() +streamWithoutCorrelationIds = env + .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema)) + .print + +streamWithCorrelationIds = env + .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema)) + .print +{% endhighlight %} +</div> +</div> + +#### RabbitMQ Sink +A class providing an interface for sending data to RabbitMQ. + +The followings have to be provided for the `RMQSink(â¦)` constructor in order: + +1. RMQConnectionConfig +2. The queue name +3. Serialization schema + +Example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build(); +stream.addSink(new RMQSink<String>(connectionConfig, "hello", new SimpleStringSchema())); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val connectionConfig = new RMQConnectionConfig.Builder() +.setHost("localhost").setPort(5000).setUserName(..) +.setPassword(..).setVirtualHost("/").build() +stream.addSink(new RMQSink[String](connectionConfig, "hello", new SimpleStringSchema)) +{% endhighlight %} +</div> +</div> + +More about RabbitMQ can be found [here](http://www.rabbitmq.com/). http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/redis.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/redis.md b/docs/dev/connectors/redis.md new file mode 100644 index 0000000..a987b90 --- /dev/null +++ b/docs/dev/connectors/redis.md @@ -0,0 +1,174 @@ +--- +title: "Redis Connector" +nav-title: Redis +nav-parent_id: connectors +nav-pos: 8 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} +Version Compatibility: This module is compatible with Redis 2.8.5. + +Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution [explicitly]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +#### Installing Redis +Follow the instructions from the [Redis download page](http://redis.io/download). + +#### Redis Sink +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{ + + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); + } + + @Override + public String getKeyFromData(Tuple2<String, String> data) { + return data.f0; + } + + @Override + public String getValueFromData(Tuple2<String, String> data) { + return data.f1; + } +} +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); + +DataStream<String> stream = ...; +stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getCommandDescription: RedisCommandDescription = { + new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 +} +val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} +</div> +</div> + +This example code does the same, but for Redis Cluster: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() + .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); + +DataStream<String> stream = ...; +stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} +</div> +</div> + +This example shows when the Redis environment is with Sentinels: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder() + .setMasterName("master").setSentinels(...).build(); + +DataStream<String> stream = ...; +stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} +</div> +</div> + +This section gives a description of all the available data types and what Redis command used for that. + +<table class="table table-bordered" style="width: 75%"> + <thead> + <tr> + <th class="text-center" style="width: 20%">Data Type</th> + <th class="text-center" style="width: 25%">Redis Command [Sink]</th> + <th class="text-center" style="width: 25%">Redis Command [Source]</th> + </tr> + </thead> + <tbody> + <tr> + <td>HASH</td><td><a href="http://redis.io/commands/hset">HSET</a></td><td>--NA--</td> + </tr> + <tr> + <td>LIST</td><td> + <a href="http://redis.io/commands/rpush">RPUSH</a>, + <a href="http://redis.io/commands/lpush">LPUSH</a> + </td><td>--NA--</td> + </tr> + <tr> + <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td><td>--NA--</td> + </tr> + <tr> + <td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td><td>--NA--</td> + </tr> + <tr> + <td>STRING</td><td><a href="http://redis.io/commands/set">SET</a></td><td>--NA--</td> + </tr> + <tr> + <td>HYPER_LOG_LOG</td><td><a href="http://redis.io/commands/pfadd">PFADD</a></td><td>--NA--</td> + </tr> + <tr> + <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td><td>--NA--</td> + </tr> + </tbody> +</table> +More about Redis can be found [here](http://redis.io/). http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/connectors/twitter.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md new file mode 100644 index 0000000..e92e51d --- /dev/null +++ b/docs/dev/connectors/twitter.md @@ -0,0 +1,85 @@ +--- +title: "Twitter Connector" +nav-title: Twitter +nav-parent_id: connectors +nav-pos: 9 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +The Twitter Streaming API provides access to the stream of tweets made available by Twitter. +Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream. +To use this connector, add the following dependency to your project: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-twitter{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. +See linking with them for cluster execution [here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +#### Authentication +In order to connect to the Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below. + +#### Acquiring the authentication information +First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) +or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by +clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions. +After selecting the application, the API key and API secret (called `twitter-source.consumerKey` and `twitter-source.consumerSecret` in `TwitterSource` respectively) are located on the "API Keys" tab. +The necessary OAuth Access Token data (`twitter-source.token` and `twitter-source.tokenSecret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab. +Remember to keep these pieces of information secret and do not push them to public repositories. + + + +#### Usage +In contrast to other connectors, the `TwitterSource` depends on no additional services. For example the following code should run gracefully: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +Properties props = new Properties(); +p.setProperty(TwitterSource.CONSUMER_KEY, ""); +p.setProperty(TwitterSource.CONSUMER_SECRET, ""); +p.setProperty(TwitterSource.TOKEN, ""); +p.setProperty(TwitterSource.TOKEN_SECRET, ""); +DataStream<String> streamSource = env.addSource(new TwitterSource(props)); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val props = new Properties(); +p.setProperty(TwitterSource.CONSUMER_KEY, ""); +p.setProperty(TwitterSource.CONSUMER_SECRET, ""); +p.setProperty(TwitterSource.TOKEN, ""); +p.setProperty(TwitterSource.TOKEN_SECRET, ""); +DataStream<String> streamSource = env.addSource(new TwitterSource(props)); +{% endhighlight %} +</div> +</div> + +The `TwitterSource` emits strings containing a JSON object, representing a Tweet. + +The `TwitterExample` class in the `flink-examples-streaming` package shows a full example how to use the `TwitterSource`. + +By default, the `TwitterSource` uses the `StatusesSampleEndpoint`. This endpoint returns a random sample of Tweets. +There is a `TwitterSource.EndpointInitializer` interface allowing users to provide a custom endpoint.