Repository: samza Updated Branches: refs/heads/master 7887b6d86 -> 2e5ceec19
SAMZA-1437; Added Eventhub producer and consumer docs Still need to add tutorials, and configs to configurations table vjagadish1989 for review Author: Daniel Chen <[email protected]> Reviewers: Jagadish <[email protected]> Closes #382 from dxichen/eventhub-docs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2e5ceec1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2e5ceec1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2e5ceec1 Branch: refs/heads/master Commit: 2e5ceec19f52b300b98efb06b292cf9a3db60388 Parents: 7887b6d Author: Daniel Chen <[email protected]> Authored: Tue Dec 12 21:47:38 2017 -0800 Committer: Jagadish <[email protected]> Committed: Tue Dec 12 21:47:38 2017 -0800 ---------------------------------------------------------------------- .../azure/eventhub_send_methods.png | Bin 0 -> 52427 bytes .../documentation/versioned/azure/eventhubs.md | 197 +++++++++++++++++++ docs/learn/documentation/versioned/index.html | 6 + .../versioned/jobs/configuration-table.html | 87 ++++++++ 4 files changed, 290 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2e5ceec1/docs/learn/documentation/azure/eventhub_send_methods.png ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/azure/eventhub_send_methods.png b/docs/learn/documentation/azure/eventhub_send_methods.png new file mode 100644 index 0000000..477553a Binary files /dev/null and b/docs/learn/documentation/azure/eventhub_send_methods.png differ http://git-wip-us.apache.org/repos/asf/samza/blob/2e5ceec1/docs/learn/documentation/versioned/azure/eventhubs.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/azure/eventhubs.md b/docs/learn/documentation/versioned/azure/eventhubs.md new file mode 100644 index 0000000..2defddb --- /dev/null +++ b/docs/learn/documentation/versioned/azure/eventhubs.md @@ -0,0 +1,197 @@ +--- +layout: page +title: Connecting to Eventhubs +--- +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +You can configure your Samza jobs to process data from [Azure Eventhubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft's data streaming service. An `event hub` is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data). + +### Consuming from EventHubs: + +Samza's [EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java) wraps the EventData into an [EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java). The key of the message is set to the partition key of the EventData. The message is obtained from the EventData body. + +To configure Samza to configure from EventHub streams: + +``` +# define an event hub system factory with your identifier. eg: eh-system +systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory + +# define your streams +systems.eh-system.stream.list=input0, output0 + +# define required properties for your streams +systems.eh-system.streams.input0.eventhubs.namespace=YOUR-STREAM-NAMESPACE +systems.eh-system.streams.input0.eventhubs.entitypath=YOUR-ENTITY-NAME +systems.eh-system.streams.input0.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME +systems.eh-system.streams.input0.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN + +systems.eh-system.streams.output0.eventhubs.namespace=YOUR-STREAM-NAMESPACE +systems.eh-system.streams.output0.eventhubs.entitypath=YOUR-ENTITY-NAME +systems.eh-system.streams.output0.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME +systems.eh-system.streams.output0.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN +``` + +The tuple required to access the Eventhubs entity per stream must be provided, namely the fields `YOUR-STREAM-NAMESPACE`, `YOUR-ENTITY-NAME`, `YOUR-SAS-KEY-NAME`, `YOUR-SAS-KEY-TOKEN`. + +### Producing to EventHubs: + +Similarly, you can also configure your Samza job to write to EventHubs. +``` +OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("eh-system", "output0"), key, message); +collector.send(envelope); +``` + +Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the `message` in the envelope. Additionally, the `key` and the `produce timestamp` are set as properties in the EventData before sending it to EventHubs. + +### Advanced configuration: + +##### Producer partitioning: + +The `partition.method` property determines how outgoing messages are partitioned. Valid values for this config are `EVENT_HUB_HASHING`, `PARTITION_KEY_AS_PARTITION` or `ROUND_ROBIN`. + +`EVENT_HUB_HASHING`: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning. + +`PARTITION_KEY_AS_PARTITION`: In this method, each message is sent to the partition specified by its partition key. This requires the partition key to be an integer. If the key is greater than the number of partitions, a modulo operation will be performed on the key. Similar to EVENT_HUB_HASHING, the key in the message is used if the partition key is not specified. + +`ROUND_ROBIN`: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored. + + + +``` +systems.eh-system.partition.method = EVENT_HUB_HASHING +``` + +##### Consumer groups: + +Eventhub supports a notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications have their own view of the event stream. Each event hub stream has a pre-defined consumer group named `$Default`. You can define your own consumer group for your job and configure a `eventhubs.consumer.group` + +``` +systems.eh-system.streams.eh-input0.eventhubs.consumer.group = my-group +``` + +##### Serde: + +By default, the messages from EventHubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for `msg.serde` for your stream. + +``` +streams.input0.samza.msg.serde = json +streams.output0.samza.msg.serde = json +``` + +##### Consumer buffer size: + +When the consumer reads a message from event hubs, it appends them to a shared producer-consumer buffer corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory. + +``` +systems.eh-system.eventhubs.receive.queue.size = 10 +``` + +For the list of all configs, check out the configuration table page [here](../jobs/configuration-table.html) + +### Azure Eventhubs Hello-Samza Example + +The [hello-samza](https://github.com/apache/samza-hello-samza) project contains an example of a high level job that consumes and produces to Eventhub using the Zookeeper deployment model. + +#### Get the Code + +Let's get started by cloning the hello-samza project + +``` +git clone https://git.apache.org/samza-hello-samza.git hello-samza +cd hello-samza +git checkout latest +``` + +The project comes up with numerous examples and for this tutorial, we will pick the Azure application. + +#### Setting up the Deployment Environment + +For our Azure application, we require [ZooKeeper](http://zookeeper.apache.org/). The hello-samza project comes with a script called "grid" to help with the environment setup + +``` +./bin/grid standalone +``` + +This command will download, install, and start ZooKeeper and Kafka. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called "deploy" inside hello-samza's root folder. + +If you get a complaint that JAVA_HOME is not set, then you'll need to set it to the path where Java is installed on your system. + + +#### Configuring the Azure application + +Here are the configs you must set before building the project. Configure these in the `src/main/config/azure-application-local-runner.properties` file. + +``` +# Add your EventHubs input stream credentials here +systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE +systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME +systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME +systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN + +# Add your EventHubs output stream credentials here +systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE +systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME +systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME +systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN +``` + +Optionally, you may also use the Azure Checkpoint Manager. Otherwise, comment out both these lines. + +``` +# Azure Table Checkpoint Manager +task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory +azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING +``` + +#### Building the Hello Samza Project + +With the environment setup complete, let us move on to building the hello-samza project. Execute the following commands: + +``` +mvn clean package +mkdir -p deploy/samza +tar -xvf ./target/hello-samza-0.14.0-SNAPSHOT-dist.tar.gz -C deploy/samza +``` + +We are now all set to deploy the application locally. + +#### Running the Azure application + +In order to run the application, we will use the *run-azure-application* script. + +``` +./deploy/samza/bin/run-azure-application.sh +``` + +The above command executes the helper script which invokes the *AzureZKLocalApplication* main class, which starts the *AzureApplication*. This application filters out the messages consumed without keys, prints them out and send them the configured output stream. + +The messages consumed should be printed in the following format: +``` +Sending: +Received Key: <KEY> +Received Message: <VALUE> +``` + +#### Shutdown + +This application can be shutdown by terminating the *run-azure-application* script. +We can use the *grid* script to tear down the local environment ([Kafka](http://kafka.apache.org/) and [Zookeeper](http://zookeeper.apache.org/)). + +``` +bin/grid stop all +``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2e5ceec1/docs/learn/documentation/versioned/index.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html index e23b29f..fa8e1e3 100644 --- a/docs/learn/documentation/versioned/index.html +++ b/docs/learn/documentation/versioned/index.html @@ -94,6 +94,12 @@ title: Documentation <li><a href="hadoop/producer.html">Writing to HDFS</a></li> </ul> +<h4>Azure</h4> + +<ul class="documentation-list"> + <li><a href="azure/eventhubs.html">Eventhubs Consumer/Producer</a></li> +</ul> + <h4>Operations</h4> <ul class="documentation-list"> http://git-wip-us.apache.org/repos/asf/samza/blob/2e5ceec1/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index ef81887..6666bb1 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -2248,6 +2248,93 @@ <td class="default"></td> <td class="description">Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.</td> </tr> + + <tr> + <th colspan="3" class="section" id="eventhub"> + Using <a href="https://azure.microsoft.com/en-us/services/event-hubs/">EventHubs</a> for input and output streams<br> + <span class="subtitle"> + (This section applies if you have set + <a href="#systems-samza-factory" class="property">systems.*.samza.factory</a> + <code>= org.apache.samza.system.eventhub.EventHubSystemFactory</code>) + </span> + </th> + </tr> + + <tr> + <td class="property" id="eventhub-stream-list">systems.<span class="system">system-name</span>.<br>stream.list</td> + <td class="default"></td> + <td class="description">List of Samza <span class="stream">stream-ids</span> used for the Eventhub system</td> + </tr> + + <tr> + <td class="property" id="eventhub-stream-namespace">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td> + <td class="default"></td> + <td class="description">Namespace of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> + </tr> + + <tr> + <td class="property" id="eventhub-stream-entity">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td> + <td class="default"></td> + <td class="description">Entity of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> + </tr> + + <tr> + <td class="property" id="eventhub-stream-sas-keyname">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td> + <td class="default"></td> + <td class="description">SAS Keyname of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> + </tr> + + <tr> + <td class="property" id="eventhub-stream-sas-token">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td> + <td class="default"></td> + <td class="description">SAS Token the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> + </tr> + + <tr> + <td class="property" id="eventhub-runtime-timeout">systems.<span class="system">system-name</span>.<br>eventhubs.runtime.info.timeout</td> + <td class="default">60000</td> + <td class="description">Timeout for fetching the runtime metadata from an Eventhub entity on startup in millis.</td> + </tr> + + <tr> + <td class="property" id="eventhub-send-partition-method">systems.<span class="system">system-name</span>.<br>eventhubs.partition.method</td> + <td class="default"><code>EVENT_HUB_HASHING</code></td> + <td class="description"> + Producer only config. Configure the method that the message is partitioned for the downstream Eventhub in one of the following ways: + <dl> + <dt><code>ROUND_ROBIN</code></dt> + <dd>The message key and partition key are ignored and the message will be distributed in a round-robin fashion amongst all the partitions in the downstream EventHub.</dd> + <dt><code>EVENT_HUB_HASHING</code></dt> + <dd>Employs the hashing mechanism in EventHubs to determine, based on the key of the message, which partition the message should go. Using this method still ensures that all the events with the same key are sent to the same partition in the event hub. If this option is chosen, the partition key used for the hash should be a string. If the partition key is not set, the message key is used instead.</dd> + <dt><code>PARTITION_KEY_AS_PARTITION</code></dt> + <dd>Use the integer key specified by the partition key or key of the message to a specific partition on Eventhub. If the integer key is greater than the number of partitions in the destination Eventhub, a modulo operation will be performed to determine the resulting paritition. ie. if there are 6 partitions and the key is 9, the message will end up in partition 3. Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</dd> + </dl> + </td> + </tr> + + <tr> + <td class="property" id="eventhub-send-key">systems.<span class="system">system-name</span>.<br>eventhubs.send.key</td> + <td class="default">true</td> + <td class="description"> + Producer only config. Sending each message key to the eventhub in the properties of the AMQP message. If the Samza Eventhub consumer is used, this field is used as the message key if the partition key is not present. + </td> + </tr> + + <tr> + <td class="property" id="eventhub-consumer-group">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td> + <td class="default"><code>$Default</code></td> + <td class="description"> + Consumer only config. Set the consumer group from the upstream Eventhub that the consumer is part of. Defaults to the <code>$Default</code> group that is initially present in all Eventhub entities (unless removed) + </td> + </tr> + + <tr> + <td class="property" id="eventhub-consumer-buffer-capacity">systems.<span class="system">system-name</span>.<br>eventhubs.receive.queue.size</td> + <td class="default">100</td> + <td class="description"> + Consumer only config. Per partition capacity of the eventhubs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory. + </td> + </tr> </tbody> </table> </body>
