Repository: flink
Updated Branches:
  refs/heads/release-1.2 a7644b171 -> 11ce80bc5


[FLINK-5512] [doc] Improve RabbitMQ documentation

This closes #3136.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11ce80bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11ce80bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11ce80bc

Branch: refs/heads/release-1.2
Commit: 11ce80bc557308cf891152e3fe66e124471a6bf6
Parents: a7644b1
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Tue Jan 17 11:38:50 2017 +0100
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Jan 19 10:38:04 2017 +0100

----------------------------------------------------------------------
 docs/dev/connectors/rabbitmq.md | 148 +++++++++++++++++++++--------------
 1 file changed, 89 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11ce80bc/docs/dev/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index 1b621c0..7f117c6 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -40,88 +40,118 @@ Follow the instructions from the [RabbitMQ download 
page](http://www.rabbitmq.co
 
 #### RabbitMQ Source
 
-A class which provides an interface for receiving data from RabbitMQ.
-
-The followings have to be provided for the `RMQSource(…)` constructor in 
order:
-
-- RMQConnectionConfig.
-- queueName: The RabbitMQ queue name.
-- usesCorrelationId: `true` when correlation ids should be used, `false` 
otherwise (default is `false`).
-- deserializationSchema: Deserialization schema to turn messages into Java 
objects.
-
-This source can be operated in three different modes:
-
-1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages 
with
-    unique correlation IDs.
-2. At-least-once (when checkpointed) with RabbitMQ transactions but no 
deduplication mechanism
-    (correlation id is not set).
-3. No strong delivery guarantees (without checkpointing) with RabbitMQ 
auto-commit mode.
-
-Correlation ids are a RabbitMQ application feature. You have to set it in the 
message properties
-when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true 
and do not supply
-unique correlation ids, the source will throw an exception (if the correlation 
id is null) or ignore
-messages with non-unique correlation ids. If you set `usesCorrelationId` to 
false, then you don't
-have to supply correlation ids.
-
-Example:
+This connector provides a `RMQSource` class to consume messages from a RabbitMQ
+queue. This source provides three different levels of guarantees, depending
+on how it is configured with Flink:
+
+1. **Exactly-once**: In order to achieve exactly-once guarantees with the
+RabbitMQ source, the following is required -
+ - *Enable checkpointing*: With checkpointing enabled, messages are only
+ acknowledged (hence, removed from the RabbitMQ queue) when checkpoints
+ are completed.
+ - *Use correlation ids*: Correlation ids are a RabbitMQ application feature.
+ You have to set it in the message properties when injecting messages into 
RabbitMQ.
+ The correlation id is used by the source to deduplicate any messages that
+ have been reproccessed when restoring from a checkpoint.
+ - *Non-parallel source*: The source must be non-parallel (parallelism set
+ to 1) in order to achieve exactly-once. This limitation is mainly due to
+ RabbitMQ's approach to dispatching messages from a single queue to multiple
+ consumers.
+
+
+2. **At-least-once**: When checkpointing is enabled, but correlation ids
+are not used or the source is parallel, the source only provides at-least-once
+guarantees.
+
+3. **No guarantee**: If checkpointing isn't enabled, the source does not
+have any strong delivery guarantees. Under this setting, instead of
+collaborating with Flink's checkpointing, messages will be automatically
+acknowledged once the source receives and processes them.
+
+Below is a code example for setting up an exactly-once RabbitMQ source.
+Inline comments explain which parts of the configuration can be ignored
+for more relaxed guarantees.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-DataStream<String> streamWithoutCorrelationIds = env
-       .addSource(new RMQSource<String>(connectionConfig, "hello", new 
SimpleStringSchema()))
-       .print
-
-DataStream<String> streamWithCorrelationIds = env
-       .addSource(new RMQSource<String>(connectionConfig, "hello", true, new 
SimpleStringSchema()))
-       .print
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+// checkpointing is required for exactly-once or at-least-once guarantees
+env.enableCheckpointing(...);
+
+final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build();
+    
+final DataStream<String> stream = env
+    .addSource(new RMQSource<String>(
+        connectionConfig,            // config for the RabbitMQ connection
+        "queueName",                 // name of the RabbitMQ queue to consume
+        true,                        // use correlation ids; can be false if 
only at-least-once is required
+        new SimpleStringSchema()))   // deserialization schema to turn 
messages into Java objects
+    .setParallelism(1);              // non-parallel source is only required 
for exactly-once
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+// checkpointing is required for exactly-once or at-least-once guarantees
+env.enableCheckpointing(...)
+
 val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-streamWithoutCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", new 
SimpleStringSchema))
-    .print
-
-streamWithCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", true, new 
SimpleStringSchema))
-    .print
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build
+    
+val stream = env
+    .addSource(new RMQSource[String](
+        connectionConfig,            // config for the RabbitMQ connection
+        "queueName",                 // name of the RabbitMQ queue to consume
+        true,                        // use correlation ids; can be false if 
only at-least-once is required
+        new SimpleStringSchema))     // deserialization schema to turn 
messages into Java objects
+    .setParallelism(1)               // non-parallel source is only required 
for exactly-once
 {% endhighlight %}
 </div>
 </div>
 
 #### RabbitMQ Sink
-A class providing an interface for sending data to RabbitMQ.
-
-The followings have to be provided for the `RMQSink(…)` constructor in order:
-
-1. RMQConnectionConfig
-2. The queue name
-3. Serialization schema
-
-Example:
+This connector provides a `RMQSink` class for sending messages to a RabbitMQ
+queue. Below is a code example for setting up a RabbitMQ sink.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-stream.addSink(new RMQSink<String>(connectionConfig, "hello", new 
SimpleStringSchema()));
+final DataStream<String> stream = ...
+
+final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build();
+    
+stream.addSink(new RMQSink<String>(
+    connectionConfig,            // config for the RabbitMQ connection
+    "queueName",                 // name of the RabbitMQ queue to send 
messages to
+    new SimpleStringSchema()));  // serialization schema to turn Java objects 
to messages
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
+val stream: DataStream[String] = ...
+
 val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-stream.addSink(new RMQSink[String](connectionConfig, "hello", new 
SimpleStringSchema))
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build
+    
+stream.addSink(new RMQSink[String](
+    connectionConfig,         // config for the RabbitMQ connection
+    "queueName",              // name of the RabbitMQ queue to send messages to
+    new SimpleStringSchema))  // serialization schema to turn Java objects to 
messages
 {% endhighlight %}
 </div>
 </div>

Reply via email to