Repository: flink
Updated Branches:
  refs/heads/master 4d27f8f2d -> d84599ea0


[FLINK-4446] Remove Flume connector (now in Bahir)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d84599ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d84599ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d84599ea

Branch: refs/heads/master
Commit: d84599ea0375afbfeaba0c2827cb653c8905bf89
Parents: 4d27f8f
Author: Robert Metzger <[email protected]>
Authored: Wed Dec 14 15:12:29 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Thu Dec 15 12:01:48 2016 +0100

----------------------------------------------------------------------
 flink-connectors/flink-connector-flume/pom.xml  | 175 -------------------
 .../streaming/connectors/flume/FlumeSink.java   | 141 ---------------
 flink-connectors/pom.xml                        |   1 -
 3 files changed, 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d84599ea/flink-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-flume/pom.xml 
b/flink-connectors/flink-connector-flume/pom.xml
deleted file mode 100644
index 64860de..0000000
--- a/flink-connectors/flink-connector-flume/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-flume_2.10</artifactId>
-       <name>flink-connector-flume</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <flume-ng.version>1.5.0</flume-ng.version>
-       </properties>
-
-       <dependencies>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flume</groupId>
-                       <artifactId>flume-ng-core</artifactId>
-                       <version>${flume-ng.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       <artifactId>slf4j-log4j12</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>log4j</groupId>
-                                       <artifactId>log4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-io</groupId>
-                                       <artifactId>commons-io</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-codec</groupId>
-                                       <artifactId>commons-codec</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-cli</groupId>
-                                       <artifactId>commons-cli</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-lang</groupId>
-                                       <artifactId>commons-lang</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.avro</groupId>
-                                       <artifactId>avro</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.codehaus.jackson</groupId>
-                                       
<artifactId>jackson-core-asl</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.codehaus.jackson</groupId>
-                                       
<artifactId>jackson-mapper-asl</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       
<groupId>com.thoughtworks.paranamer</groupId>
-                                       <artifactId>paranamer</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.xerial.snappy</groupId>
-                                       <artifactId>snappy-java</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.tukaani</groupId>
-                                       <artifactId>xz</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.velocity</groupId>
-                                       <artifactId>velocity</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-collections</groupId>
-                                       
<artifactId>commons-collections</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>servlet-api</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jetty-util</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jetty</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.google.code.gson</groupId>
-                                       <artifactId>gson</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.thrift</groupId>
-                                       <artifactId>libthrift</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <plugin>
-                               <!-- Override artifactSet configuration to 
build fat-jar with all dependencies packed. -->
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>shade-flink</id>
-                                               <configuration>
-                                                       <artifactSet>
-                                                               <includes 
combine.children="append">
-                                                                       <!-- We 
include all dependencies that transitively depend on guava -->
-                                                                       
<include>org.apache.flume:*</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <transformers>
-                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-                                                       </transformers>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d84599ea/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
 
b/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
deleted file mode 100644
index 2dc043b..0000000
--- 
a/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlumeSink<IN> extends RichSinkFunction<IN> {
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlumeSink.class);
-
-       private transient FlinkRpcClientFacade client;
-       boolean initDone = false;
-       String host;
-       int port;
-       SerializationSchema<IN> schema;
-
-       public FlumeSink(String host, int port, SerializationSchema<IN> schema) 
{
-               this.host = host;
-               this.port = port;
-               this.schema = schema;
-       }
-
-       /**
-        * Receives tuples from the Apache Flink {@link DataStream} and forwards
-        * them to Apache Flume.
-        * 
-        * @param value
-        *            The tuple arriving from the datastream
-        */
-       @Override
-       public void invoke(IN value) {
-
-               byte[] data = schema.serialize(value);
-               client.sendDataToFlume(data);
-
-       }
-
-       private class FlinkRpcClientFacade {
-               private RpcClient client;
-               private String hostname;
-               private int port;
-
-               /**
-                * Initializes the connection to Apache Flume.
-                * 
-                * @param hostname
-                *            The host
-                * @param port
-                *            The port.
-                */
-               public void init(String hostname, int port) {
-                       // Setup the RPC connection
-                       this.hostname = hostname;
-                       this.port = port;
-                       int initCounter = 0;
-                       while (true) {
-                               if (initCounter >= 90) {
-                                       throw new RuntimeException("Cannot 
establish connection with" + port + " at "
-                                                       + host);
-                               }
-                               try {
-                                       this.client = 
RpcClientFactory.getDefaultInstance(hostname, port);
-                               } catch (FlumeException e) {
-                                       // Wait one second if the connection 
failed before the next
-                                       // try
-                                       try {
-                                               Thread.sleep(1000);
-                                       } catch (InterruptedException e1) {
-                                               if (LOG.isErrorEnabled()) {
-                                                       LOG.error("Interrupted 
while trying to connect {} at {}", port, host);
-                                               }
-                                       }
-                               }
-                               if (client != null) {
-                                       break;
-                               }
-                               initCounter++;
-                       }
-                       initDone = true;
-               }
-
-               /**
-                * Sends byte arrays as {@link Event} series to Apache Flume.
-                * 
-                * @param data
-                *            The byte array to send to Apache FLume
-                */
-               public void sendDataToFlume(byte[] data) {
-                       Event event = EventBuilder.withBody(data);
-
-                       try {
-                               client.append(event);
-
-                       } catch (EventDeliveryException e) {
-                               // clean up and recreate the client
-                               client.close();
-                               client = null;
-                               client = 
RpcClientFactory.getDefaultInstance(hostname, port);
-                       }
-               }
-
-       }
-
-       @Override
-       public void close() {
-               client.client.close();
-       }
-
-       @Override
-       public void open(Configuration config) {
-               client = new FlinkRpcClientFacade();
-               client.init(host, port);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d84599ea/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 695c34b..ba5ce46 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -41,7 +41,6 @@ under the License.
                <module>flink-hadoop-compatibility</module>
                <module>flink-hbase</module>
                <module>flink-hcatalog</module>
-               <module>flink-connector-flume</module>
                <module>flink-connector-kafka-base</module>
                <module>flink-connector-kafka-0.8</module>
                <module>flink-connector-kafka-0.9</module>

Reply via email to