Repository: bahir-flink
Updated Branches:
  refs/heads/master b2955a749 -> 7dce2db4a


[BAHIR-57] Add Flume sink

Closes #2


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/7dce2db4
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/7dce2db4
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/7dce2db4

Branch: refs/heads/master
Commit: 7dce2db4a5a2b8387ba3ad3121955f9e90bc01ef
Parents: b2955a7
Author: Robert Metzger <[email protected]>
Authored: Mon Aug 22 16:48:59 2016 +0200
Committer: Luciano Resende <[email protected]>
Committed: Mon Aug 22 13:17:30 2016 -0700

----------------------------------------------------------------------
 flink-connector-flume/pom.xml                   | 174 +++++++++++++++++++
 .../streaming/connectors/flume/FlumeSink.java   | 141 +++++++++++++++
 flink-connector-redis/pom.xml                   |   1 -
 pom.xml                                         |   1 +
 4 files changed, 316 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
new file mode 100644
index 0000000..f8b98f1
--- /dev/null
+++ b/flink-connector-flume/pom.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink_parent_2.11</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+       <artifactId>flink-connector-flume_2.11</artifactId>
+       <name>flink-connector-flume</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <flume-ng.version>1.5.0</flume-ng.version>
+       </properties>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.11</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flume</groupId>
+                       <artifactId>flume-ng-core</artifactId>
+                       <version>${flume-ng.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-log4j12</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-io</groupId>
+                                       <artifactId>commons-io</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-codec</groupId>
+                                       <artifactId>commons-codec</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-cli</groupId>
+                                       <artifactId>commons-cli</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-lang</groupId>
+                                       <artifactId>commons-lang</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.avro</groupId>
+                                       <artifactId>avro</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.codehaus.jackson</groupId>
+                                       
<artifactId>jackson-core-asl</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.codehaus.jackson</groupId>
+                                       
<artifactId>jackson-mapper-asl</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>com.thoughtworks.paranamer</groupId>
+                                       <artifactId>paranamer</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.xerial.snappy</groupId>
+                                       <artifactId>snappy-java</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.tukaani</groupId>
+                                       <artifactId>xz</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.velocity</groupId>
+                                       <artifactId>velocity</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-collections</groupId>
+                                       
<artifactId>commons-collections</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>servlet-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty-util</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.code.gson</groupId>
+                                       <artifactId>gson</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.thrift</groupId>
+                                       <artifactId>libthrift</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <!-- Override artifactSet configuration to 
build fat-jar with all dependencies packed. -->
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <configuration>
+                                                       <artifactSet>
+                                                               <includes 
combine.children="append">
+                                                                       <!-- We 
include all dependencies that transitively depend on guava -->
+                                                                       
<include>org.apache.flume:*</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..00bfc39
--- /dev/null
+++ 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+    private transient FlinkRpcClientFacade client;
+    boolean initDone = false;
+    String host;
+    int port;
+    SerializationSchema<IN> schema;
+
+    public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
+        this.host = host;
+        this.port = port;
+        this.schema = schema;
+    }
+
+    /**
+     * Receives tuples from the Apache Flink {@link DataStream} and forwards
+     * them to Apache Flume.
+     *
+     * @param value
+     *            The tuple arriving from the datastream
+     */
+    @Override
+    public void invoke(IN value) {
+
+        byte[] data = schema.serialize(value);
+        client.sendDataToFlume(data);
+
+    }
+
+    private class FlinkRpcClientFacade {
+        private RpcClient client;
+        private String hostname;
+        private int port;
+
+        /**
+         * Initializes the connection to Apache Flume.
+         *
+         * @param hostname
+         *            The host
+         * @param port
+         *            The port.
+         */
+        public void init(String hostname, int port) {
+            // Setup the RPC connection
+            this.hostname = hostname;
+            this.port = port;
+            int initCounter = 0;
+            while (true) {
+                if (initCounter >= 90) {
+                    throw new RuntimeException("Cannot establish connection 
with" + port + " at "
+                            + host);
+                }
+                try {
+                    this.client = 
RpcClientFactory.getDefaultInstance(hostname, port);
+                } catch (FlumeException e) {
+                    // Wait one second if the connection failed before the next
+                    // try
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        if (LOG.isErrorEnabled()) {
+                            LOG.error("Interrupted while trying to connect {} 
at {}", port, host);
+                        }
+                    }
+                }
+                if (client != null) {
+                    break;
+                }
+                initCounter++;
+            }
+            initDone = true;
+        }
+
+        /**
+         * Sends byte arrays as {@link Event} series to Apache Flume.
+         *
+         * @param data
+         *            The byte array to send to Apache FLume
+         */
+        public void sendDataToFlume(byte[] data) {
+            Event event = EventBuilder.withBody(data);
+
+            try {
+                client.append(event);
+
+            } catch (EventDeliveryException e) {
+                // clean up and recreate the client
+                client.close();
+                client = null;
+                client = RpcClientFactory.getDefaultInstance(hostname, port);
+            }
+        }
+
+    }
+
+    @Override
+    public void close() {
+        client.client.close();
+    }
+
+    @Override
+    public void open(Configuration config) {
+        client = new FlinkRpcClientFacade();
+        client.init(host, port);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index c34711e..df78295 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -44,7 +44,6 @@ under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java_2.11</artifactId>
             <version>${flink.version}</version>
-            <scope>provided</scope>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b26982f..01fb4fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@
 
   <modules>
     <module>flink-connector-redis</module>
+    <module>flink-connector-flume</module>
   </modules>
 
   <properties>

Reply via email to