Repository: bahir-flink Updated Branches: refs/heads/master a105a7c34 -> dd8dcbe7b
[BAHIR-68] Add README.md documentation for Flink Extensions Add README for Redis, Flink and ActiveMQ Flink Extensions Closes #5 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/dd8dcbe7 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/dd8dcbe7 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/dd8dcbe7 Branch: refs/heads/master Commit: dd8dcbe7bd5ef74c665632aeba8ddac506fc58a2 Parents: a105a7c Author: Robert Metzger <[email protected]> Authored: Tue Oct 11 11:46:57 2016 +0200 Committer: Luciano Resende <[email protected]> Committed: Fri Oct 14 23:01:27 2016 -0700 ---------------------------------------------------------------------- README.md | 19 +++- flink-connector-activemq/README.md | 19 ++++ flink-connector-flume/README.md | 22 +++++ flink-connector-redis/README.md | 148 ++++++++++++++++++++++++++++++++ 4 files changed, 206 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/dd8dcbe7/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index a3829b9..f1e9d77 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,25 @@ -# Apache Bahir +# Apache Bahir (Flink) -Apache Bahir provides extensions to distributed analytics platforms such as Apache Spark and Apache Flink. +Apache Bahir provides extensions to distributed analytics platforms such as Apache Spark⢠and Apache Flink®. <http://bahir.apache.org/> +This repository is for Apache Flink extensions. + +## Contributing a Flink Connector + +The Bahir community is very open to new connector contributions for Apache Flink. + +We ask contributors to first open a [JIRA issue](issues.apache.org/jira/browse/BAHIR) describing the planned changes. Please make sure to put "Flink Streaming Connector" in the "Component/s" field. + +Once the community has agreed that the planned changes are suitable, you can open a pull request at the "bahir-flink" repository. +Please follow the same directory structure as the existing code. + +The community will review your changes, giving suggestions how to improve the code until we can merge it to the main repository. + + + ## Building Bahir Bahir is built using [Apache Maven](http://maven.apache.org/). http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/dd8dcbe7/flink-connector-activemq/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/README.md b/flink-connector-activemq/README.md new file mode 100644 index 0000000..77ce7c5 --- /dev/null +++ b/flink-connector-activemq/README.md @@ -0,0 +1,19 @@ +# Flink ActiveMQ connector + + +This connector provides a source and sink to [Apache ActiveMQ](http://activemq.apache.org/)⢠+To use this connector, add the following dependency to your project: + + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-activemq_2.11</artifactId> + <version>1.0</version> + </dependency> + +*Version Compatibility*: This module is compatible with ActiveMQ 5.14.0. + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. + + +The source class is called `AMQSource`, the sink is `AMQSink`. http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/dd8dcbe7/flink-connector-flume/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-flume/README.md b/flink-connector-flume/README.md new file mode 100644 index 0000000..d2d43fc --- /dev/null +++ b/flink-connector-flume/README.md @@ -0,0 +1,22 @@ +# Flink Flume connector + + +This connector provides a Sink that can send data to [Apache Flume](https://flume.apache.org/)â¢. To use this connector, add the +following dependency to your project: + + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-flume_2.11</artifactId> + <version>1.0</version> + </dependency> + +*Version Compatibility*: This module is compatible with Flume 1.5.0. + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. + + +To create a `FlumeSink` instantiate the following constructor: + + FlumeSink(String host, int port, SerializationSchema<IN> schema) + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/dd8dcbe7/flink-connector-redis/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-redis/README.md b/flink-connector-redis/README.md new file mode 100644 index 0000000..67852dd --- /dev/null +++ b/flink-connector-redis/README.md @@ -0,0 +1,148 @@ +# Flink Redis connector + + +This connector provides a Sink that can write to [Redis](http://redis.io/) and also can publish data +to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: + + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-redis_2.11</artifactId> + <version>1.0</version> + </dependency> + +*Version Compatibility*: This module is compatible with Redis 2.8.5. + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. + + +## Installing Redis + +Follow the instructions from the [Redis download page](http://redis.io/download). + + +## Redis Sink + +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: + +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + +**Java:** + + + public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{ + + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); + } + + @Override + public String getKeyFromData(Tuple2<String, String> data) { + return data.f0; + } + + @Override + public String getValueFromData(Tuple2<String, String> data) { + return data.f1; + } + } + FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); + + DataStream<String> stream = ...; + stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); + + + +**Scala:** + + class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getCommandDescription: RedisCommandDescription = { + new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 + } + val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() + stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) + + + +This example code does the same, but for Redis Cluster: + +**Java:** + + FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() + .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); + + DataStream<String> stream = ...; + stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); + +**Scala:** + + + val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build() + stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) + + +This example shows when the Redis environment is with Sentinels: + +Java: + + FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder() + .setMasterName("master").setSentinels(...).build(); + + DataStream<String> stream = ...; + stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); + + +Scala: + + val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() + stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) + + +This section gives a description of all the available data types and what Redis command used for that. + +<table class="table table-bordered" style="width: 75%"> + <thead> + <tr> + <th class="text-center" style="width: 20%">Data Type</th> + <th class="text-center" style="width: 25%">Redis Command [Sink]</th> + </tr> + </thead> + <tbody> + <tr> + <td>HASH</td><td><a href="http://redis.io/commands/hset">HSET</a></td> + </tr> + <tr> + <td>LIST</td><td> + <a href="http://redis.io/commands/rpush">RPUSH</a>, + <a href="http://redis.io/commands/lpush">LPUSH</a> + </td> + </tr> + <tr> + <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td> + </tr> + <tr> + <td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td> + </tr> + <tr> + <td>STRING</td><td><a href="http://redis.io/commands/set">SET</a></td> + </tr> + <tr> + <td>HYPER_LOG_LOG</td><td><a href="http://redis.io/commands/pfadd">PFADD</a></td> + </tr> + <tr> + <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td> + </tr> + </tbody> +</table>
