Repository: bahir-flink Updated Branches: refs/heads/master b2955a749 -> 7dce2db4a
[BAHIR-57] Add Flume sink Closes #2 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/7dce2db4 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/7dce2db4 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/7dce2db4 Branch: refs/heads/master Commit: 7dce2db4a5a2b8387ba3ad3121955f9e90bc01ef Parents: b2955a7 Author: Robert Metzger <[email protected]> Authored: Mon Aug 22 16:48:59 2016 +0200 Committer: Luciano Resende <[email protected]> Committed: Mon Aug 22 13:17:30 2016 -0700 ---------------------------------------------------------------------- flink-connector-flume/pom.xml | 174 +++++++++++++++++++ .../streaming/connectors/flume/FlumeSink.java | 141 +++++++++++++++ flink-connector-redis/pom.xml | 1 - pom.xml | 1 + 4 files changed, 316 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/flink-connector-flume/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml new file mode 100644 index 0000000..f8b98f1 --- /dev/null +++ b/flink-connector-flume/pom.xml @@ -0,0 +1,174 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink_parent_2.11</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-flume_2.11</artifactId> + <name>flink-connector-flume</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <flume-ng.version>1.5.0</flume-ng.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume-ng.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>org.tukaani</groupId> + <artifactId>xz</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.velocity</groupId> + <artifactId>velocity</artifactId> + </exclusion> + <exclusion> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <configuration> + <artifactSet> + <includes combine.children="append"> + <!-- We include all dependencies that transitively depend on guava --> + <include>org.apache.flume:*</include> + </includes> + </artifactSet> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java new file mode 100644 index 0000000..00bfc39 --- /dev/null +++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.flume; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlumeSink<IN> extends RichSinkFunction<IN> { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class); + + private transient FlinkRpcClientFacade client; + boolean initDone = false; + String host; + int port; + SerializationSchema<IN> schema; + + public FlumeSink(String host, int port, SerializationSchema<IN> schema) { + this.host = host; + this.port = port; + this.schema = schema; + } + + /** + * Receives tuples from the Apache Flink {@link DataStream} and forwards + * them to Apache Flume. + * + * @param value + * The tuple arriving from the datastream + */ + @Override + public void invoke(IN value) { + + byte[] data = schema.serialize(value); + client.sendDataToFlume(data); + + } + + private class FlinkRpcClientFacade { + private RpcClient client; + private String hostname; + private int port; + + /** + * Initializes the connection to Apache Flume. + * + * @param hostname + * The host + * @param port + * The port. + */ + public void init(String hostname, int port) { + // Setup the RPC connection + this.hostname = hostname; + this.port = port; + int initCounter = 0; + while (true) { + if (initCounter >= 90) { + throw new RuntimeException("Cannot establish connection with" + port + " at " + + host); + } + try { + this.client = RpcClientFactory.getDefaultInstance(hostname, port); + } catch (FlumeException e) { + // Wait one second if the connection failed before the next + // try + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + if (LOG.isErrorEnabled()) { + LOG.error("Interrupted while trying to connect {} at {}", port, host); + } + } + } + if (client != null) { + break; + } + initCounter++; + } + initDone = true; + } + + /** + * Sends byte arrays as {@link Event} series to Apache Flume. + * + * @param data + * The byte array to send to Apache FLume + */ + public void sendDataToFlume(byte[] data) { + Event event = EventBuilder.withBody(data); + + try { + client.append(event); + + } catch (EventDeliveryException e) { + // clean up and recreate the client + client.close(); + client = null; + client = RpcClientFactory.getDefaultInstance(hostname, port); + } + } + + } + + @Override + public void close() { + client.client.close(); + } + + @Override + public void open(Configuration config) { + client = new FlinkRpcClientFacade(); + client.init(host, port); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/flink-connector-redis/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml index c34711e..df78295 100644 --- a/flink-connector-redis/pom.xml +++ b/flink-connector-redis/pom.xml @@ -44,7 +44,6 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> - <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/7dce2db4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b26982f..01fb4fc 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,7 @@ <modules> <module>flink-connector-redis</module> + <module>flink-connector-flume</module> </modules> <properties>
