Repository: flink Updated Branches: refs/heads/master 4d27f8f2d -> d84599ea0
[FLINK-4446] Remove Flume connector (now in Bahir) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d84599ea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d84599ea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d84599ea Branch: refs/heads/master Commit: d84599ea0375afbfeaba0c2827cb653c8905bf89 Parents: 4d27f8f Author: Robert Metzger <[email protected]> Authored: Wed Dec 14 15:12:29 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Thu Dec 15 12:01:48 2016 +0100 ---------------------------------------------------------------------- flink-connectors/flink-connector-flume/pom.xml | 175 ------------------- .../streaming/connectors/flume/FlumeSink.java | 141 --------------- flink-connectors/pom.xml | 1 - 3 files changed, 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d84599ea/flink-connectors/flink-connector-flume/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-flume/pom.xml b/flink-connectors/flink-connector-flume/pom.xml deleted file mode 100644 index 64860de..0000000 --- a/flink-connectors/flink-connector-flume/pom.xml +++ /dev/null @@ -1,175 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-flume_2.10</artifactId> - <name>flink-connector-flume</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <flume-ng.version>1.5.0</flume-ng.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-core</artifactId> - <version>${flume-ng.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </exclusion> - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - <exclusion> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </exclusion> - <exclusion> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>com.thoughtworks.paranamer</groupId> - <artifactId>paranamer</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </exclusion> - <exclusion> - <groupId>org.tukaani</groupId> - <artifactId>xz</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> - </exclusion> - <exclusion> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. --> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <configuration> - <artifactSet> - <includes combine.children="append"> - <!-- We include all dependencies that transitively depend on guava --> - <include>org.apache.flume:*</include> - </includes> - </artifactSet> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/d84599ea/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java deleted file mode 100644 index 2dc043b..0000000 --- a/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.flume; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.api.RpcClient; -import org.apache.flume.api.RpcClientFactory; -import org.apache.flume.event.EventBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlumeSink<IN> extends RichSinkFunction<IN> { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class); - - private transient FlinkRpcClientFacade client; - boolean initDone = false; - String host; - int port; - SerializationSchema<IN> schema; - - public FlumeSink(String host, int port, SerializationSchema<IN> schema) { - this.host = host; - this.port = port; - this.schema = schema; - } - - /** - * Receives tuples from the Apache Flink {@link DataStream} and forwards - * them to Apache Flume. - * - * @param value - * The tuple arriving from the datastream - */ - @Override - public void invoke(IN value) { - - byte[] data = schema.serialize(value); - client.sendDataToFlume(data); - - } - - private class FlinkRpcClientFacade { - private RpcClient client; - private String hostname; - private int port; - - /** - * Initializes the connection to Apache Flume. - * - * @param hostname - * The host - * @param port - * The port. - */ - public void init(String hostname, int port) { - // Setup the RPC connection - this.hostname = hostname; - this.port = port; - int initCounter = 0; - while (true) { - if (initCounter >= 90) { - throw new RuntimeException("Cannot establish connection with" + port + " at " - + host); - } - try { - this.client = RpcClientFactory.getDefaultInstance(hostname, port); - } catch (FlumeException e) { - // Wait one second if the connection failed before the next - // try - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - if (LOG.isErrorEnabled()) { - LOG.error("Interrupted while trying to connect {} at {}", port, host); - } - } - } - if (client != null) { - break; - } - initCounter++; - } - initDone = true; - } - - /** - * Sends byte arrays as {@link Event} series to Apache Flume. - * - * @param data - * The byte array to send to Apache FLume - */ - public void sendDataToFlume(byte[] data) { - Event event = EventBuilder.withBody(data); - - try { - client.append(event); - - } catch (EventDeliveryException e) { - // clean up and recreate the client - client.close(); - client = null; - client = RpcClientFactory.getDefaultInstance(hostname, port); - } - } - - } - - @Override - public void close() { - client.client.close(); - } - - @Override - public void open(Configuration config) { - client = new FlinkRpcClientFacade(); - client.init(host, port); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d84599ea/flink-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 695c34b..ba5ce46 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -41,7 +41,6 @@ under the License. <module>flink-hadoop-compatibility</module> <module>flink-hbase</module> <module>flink-hcatalog</module> - <module>flink-connector-flume</module> <module>flink-connector-kafka-base</module> <module>flink-connector-kafka-0.8</module> <module>flink-connector-kafka-0.9</module>
