Repository: bahir-flink Updated Branches: refs/heads/master 8b1011803 -> 898e913fe
[BAHIR-114] update flume to 1.8 and add some tests Closes #33 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/72cbe807 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/72cbe807 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/72cbe807 Branch: refs/heads/master Commit: 72cbe807294cad84a1ef2919e4205c7c37ec861e Parents: 8b10118 Author: Joao Boto <[email protected]> Authored: Wed Aug 22 12:10:32 2018 +0200 Committer: Luciano Resende <[email protected]> Committed: Wed Nov 7 11:56:39 2018 -0800 ---------------------------------------------------------------------- .travis.yml | 59 +++++----- flink-connector-flume/README.md | 2 +- flink-connector-flume/dockers/conf/sink.conf | 34 ++++++ flink-connector-flume/dockers/conf/source.conf | 33 ++++++ .../dockers/docker-compose.yml | 57 +++++++++ flink-connector-flume/pom.xml | 95 +++------------ .../connectors/flume/FlumeRpcClient.java | 118 +++++++++++++++++++ .../streaming/connectors/flume/FlumeSink.java | 100 ++-------------- .../connectors/flume/FlumeRpcClientTest.java | 68 +++++++++++ .../connectors/flume/FlumeSinkTest.java | 39 ++++++ .../src/test/resources/log4j.properties | 27 +++++ pom.xml | 1 - 12 files changed, 434 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 691667c..3049224 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,49 +18,48 @@ sudo: required dist: trusty -cache: - directories: - - $HOME/.m2 +language: java # do not cache our own artifacts before_cache: - rm -rf $HOME/.m2/repository/org/apache/flink/ -language: java +cache: + directories: + - $HOME/.m2 services: - docker -matrix: - include: - - jdk: oraclejdk8 - env: - - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - - MAVEN_PROFILE="default" - - CACHE_NAME=JDK8_F130_A - - jdk: openjdk8 - env: - - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - - CACHE_NAME=JDK8_F130_C - - MAVEN_PROFILE="default" - - CACHE_NAME=JDK8_F130_B - - jdk: openjdk8 - env: - - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - - MAVEN_PROFILE="test-kudu" - - CACHE_NAME=JDK8_F130_KUDU +jdk: + - oraclejdk8 + - openjdk8 + +env: + - | + FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="false" + PROJECTS="flink-connector-activemq,flink-connector-akka,flink-connector-influxdb,flink-connector-netty,flink-connector-redis,flink-library-siddhi" + - | + FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="true" + PROJECTS="flink-connector-flume" + - | + FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="true" + PROJECTS="flink-connector-kudu" + before_install: - ./dev/change-scala-version.sh $SCALA_VERSION install: true -script: - - | - if [[ $MAVEN_PROFILE == "default" ]]; then - mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION - fi - - | - if [[ $MAVEN_PROFILE == "test-kudu" ]]; then - flink-connector-kudu/dockers/run_kudu_tests.sh +before_script: + - if [[ $DOCKER == "true" ]]; then + docker-compose -f "$PROJECTS/dockers/docker-compose.yml" up -d; fi + +script: mvn clean verify -pl $PROJECTS -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION + +after_script: + - if [[ $DOCKER == "true" ]]; then + docker-compose -f "$PROJECTS/dockers/docker-compose.yml" down; + fi \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-flume/README.md b/flink-connector-flume/README.md index c7d7f67..ebcd3f1 100644 --- a/flink-connector-flume/README.md +++ b/flink-connector-flume/README.md @@ -9,7 +9,7 @@ following dependency to your project: <version>1.1-SNAPSHOT</version> </dependency> -*Version Compatibility*: This module is compatible with Flume 1.5.0. +*Version Compatibility*: This module is compatible with Flume 1.8.0. Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html). http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/dockers/conf/sink.conf ---------------------------------------------------------------------- diff --git a/flink-connector-flume/dockers/conf/sink.conf b/flink-connector-flume/dockers/conf/sink.conf new file mode 100644 index 0000000..81c246f --- /dev/null +++ b/flink-connector-flume/dockers/conf/sink.conf @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +docker.sinks = fileSink +docker.sources = avroSource +docker.channels = inMemoryChannel + +docker.sources.avroSource.type = avro +docker.sources.avroSource.channels = c1 +docker.sources.avroSource.bind = 0.0.0.0 +docker.sources.avroSource.port = 4545 +docker.sources.avroSource.channels = inMemoryChannel + +docker.channels.inMemoryChannel.type = memory +docker.channels.inMemoryChannel.capacity = 1000 +docker.channels.inMemoryChannel.transactionCapacity = 100 + +docker.sinks.fileSink.type = file_roll +docker.sinks.fileSink.channel = inMemoryChannel +docker.sinks.fileSink.sink.directory = /var/tmp/output +docker.sinks.fileSink.rollInterval = 0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/dockers/conf/source.conf ---------------------------------------------------------------------- diff --git a/flink-connector-flume/dockers/conf/source.conf b/flink-connector-flume/dockers/conf/source.conf new file mode 100644 index 0000000..f883f41 --- /dev/null +++ b/flink-connector-flume/dockers/conf/source.conf @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +docker.sinks = avroSink +docker.sources = netcatSource +docker.channels = inMemoryChannel + +docker.sources.netcatSource.type = avro +docker.sources.netcatSource.bind = 0.0.0.0 +docker.sources.netcatSource.port = 44444 +docker.sources.netcatSource.channels = inMemoryChannel + +docker.channels.inMemoryChannel.type = memory +docker.channels.inMemoryChannel.capacity = 1000 +docker.channels.inMemoryChannel.transactionCapacity = 100 + +docker.sinks.avroSink.type = avro +docker.sinks.avroSink.channel = inMemoryChannel +docker.sinks.avroSink.hostname = sink +docker.sinks.avroSink.port = 4545 http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/dockers/docker-compose.yml ---------------------------------------------------------------------- diff --git a/flink-connector-flume/dockers/docker-compose.yml b/flink-connector-flume/dockers/docker-compose.yml new file mode 100644 index 0000000..042bd5e --- /dev/null +++ b/flink-connector-flume/dockers/docker-compose.yml @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: '2' + +services: + + source: + image: eskabetxe/flume + container_name: flume-source + hostname: 172.25.0.3 + ports: + - "44444:44444" + volumes: + - ./conf/source.conf:/opt/flume-config/flume.conf + environment: + - FLUME_AGENT_NAME=docker + links: + - "sink:sink" + networks: + mynet: + ipv4_address: 172.25.0.3 + + sink: + image: eskabetxe/flume + container_name: flume-sink + hostname: 172.25.0.4 + volumes: + - ./conf/sink.conf:/opt/flume-config/flume.conf + - ./output:/var/tmp/output + environment: + - FLUME_AGENT_NAME=docker + networks: + mynet: + ipv4_address: 172.25.0.4 + +networks: + mynet: + driver: bridge + ipam: + config: + - subnet: 172.25.0.0/24 + IPRange: 172.25.0.2/24, + gateway: 172.25.0.1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml index 1f4cf6d..c202c6d 100644 --- a/flink-connector-flume/pom.xml +++ b/flink-connector-flume/pom.xml @@ -35,7 +35,7 @@ under the License. <!-- Allow users to pass custom connector versions --> <properties> - <flume-ng.version>1.5.0</flume-ng.version> + <flume-ng.version>1.8.0</flume-ng.version> </properties> <dependencies> @@ -50,86 +50,23 @@ under the License. <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${flume-ng.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </exclusion> - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - <exclusion> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </exclusion> - <exclusion> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>com.thoughtworks.paranamer</groupId> - <artifactId>paranamer</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </exclusion> - <exclusion> - <groupId>org.tukaani</groupId> - <artifactId>xz</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> - </exclusion> - <exclusion> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - </exclusions> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.2.0</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java ---------------------------------------------------------------------- diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java new file mode 100644 index 0000000..e918f56 --- /dev/null +++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.flume; + +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +class FlumeRpcClient implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(FlumeRpcClient.class); + + protected RpcClient client; + private String hostname; + private int port; + + + FlumeRpcClient(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + /** + * Initializes the connection to Apache Flume. + */ + public boolean init() { + // Setup the RPC connection + int initCounter = 0; + while (true) { + verifyCounter(initCounter, "Cannot establish connection"); + + try { + this.client = RpcClientFactory.getDefaultInstance(hostname, port); + } catch (FlumeException e) { + // Wait one second if the connection failed before the next + // try + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + if (LOG.isErrorEnabled()) { + LOG.error("Interrupted while trying to connect {} on {}", hostname, port); + } + } + } + if (client != null) { + break; + } + initCounter++; + } + return client.isActive(); + } + + + public boolean sendData(String data) { + Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); + return sendData(event); + } + public boolean sendData(byte[] data) { + Event event = EventBuilder.withBody(data); + return sendData(event); + } + + private boolean sendData(Event event) { + return sendData(event, 0); + } + private boolean sendData(Event event, int retryCount) { + verifyCounter(retryCount, "Cannot send message"); + try { + client.append(event); + return true; + } catch (EventDeliveryException e) { + // clean up and recreate the client + reconnect(); + return sendData(event, ++retryCount); + } + } + + + private void verifyCounter(int counter, String messaje) { + if (counter >= 10) { + throw new RuntimeException(messaje + " on " + hostname + " on " + port); + } + } + + private void reconnect() { + close(); + client = null; + init(); + } + + @Override + public void close() { + if (this.client == null) return; + + this.client.close(); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java index 41b1b25..7a80fd2 100644 --- a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java @@ -14,32 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.streaming.connectors.flume; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.api.RpcClient; -import org.apache.flume.api.RpcClientFactory; -import org.apache.flume.event.EventBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class FlumeSink<IN> extends RichSinkFunction<IN> { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class); + private transient FlumeRpcClient client; - private transient FlinkRpcClientFacade client; - boolean initDone = false; - String host; - int port; - SerializationSchema<IN> schema; + private String host; + private int port; + private SerializationSchema<IN> schema; public FlumeSink(String host, int port, SerializationSchema<IN> schema) { this.host = host; @@ -57,84 +45,20 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> { @Override public void invoke(IN value, Context context) throws Exception { byte[] data = schema.serialize(value); - client.sendDataToFlume(data); - + client.sendData(data); } - private class FlinkRpcClientFacade { - private RpcClient client; - private String hostname; - private int port; - - /** - * Initializes the connection to Apache Flume. - * - * @param hostname - * The host - * @param port - * The port. - */ - public void init(String hostname, int port) { - // Setup the RPC connection - this.hostname = hostname; - this.port = port; - int initCounter = 0; - while (true) { - if (initCounter >= 90) { - throw new RuntimeException("Cannot establish connection with" + port + " at " - + host); - } - try { - this.client = RpcClientFactory.getDefaultInstance(hostname, port); - } catch (FlumeException e) { - // Wait one second if the connection failed before the next - // try - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - if (LOG.isErrorEnabled()) { - LOG.error("Interrupted while trying to connect {} at {}", port, host); - } - } - } - if (client != null) { - break; - } - initCounter++; - } - initDone = true; - } - - /** - * Sends byte arrays as {@link Event} series to Apache Flume. - * - * @param data - * The byte array to send to Apache FLume - */ - public void sendDataToFlume(byte[] data) { - Event event = EventBuilder.withBody(data); - - try { - client.append(event); - - } catch (EventDeliveryException e) { - // clean up and recreate the client - client.close(); - client = null; - client = RpcClientFactory.getDefaultInstance(hostname, port); - } - } - + @Override + public void open(Configuration config) { + client = new FlumeRpcClient(host, port); + client.init(); } @Override public void close() { - client.client.close(); + if (client == null) return; + client.close(); } - @Override - public void open(Configuration config) { - client = new FlinkRpcClientFacade(); - client.init(host, port); - } + } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java new file mode 100644 index 0000000..7bab666 --- /dev/null +++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.flume; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FlumeRpcClientTest { + + public FlumeRpcClient createGoodClient() { + return new FlumeRpcClient("172.25.0.3", 44444); + } + + + @Test + public void testInitClientMustFail() { + FlumeRpcClient client = new FlumeRpcClient("172.25.0.3", 44445); + Assertions.assertThrows(RuntimeException.class, () -> client.init(), "client start"); + } + + @Test + public void testSendStringData() { + FlumeRpcClient client = createGoodClient(); + boolean init = client.init(); + Assertions.assertTrue(init, "client not start"); + + boolean send = client.sendData("xpto"); + Assertions.assertTrue(send, "data not send"); + + } + + @Test + public void testSendBytesData() { + FlumeRpcClient client = createGoodClient(); + boolean init = client.init(); + Assertions.assertTrue(init, "client not start"); + + boolean send = client.sendData("xpto".getBytes()); + Assertions.assertTrue(send, "data not send"); + + } + + @Test + public void testSendDataWhenConnectionClosed() { + FlumeRpcClient client = createGoodClient(); + boolean init = client.init(); + Assertions.assertTrue(init, "client not start"); + client.close(); + + boolean send = client.sendData("xpto"); + Assertions.assertTrue(send, "data not send"); + + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java new file mode 100644 index 0000000..9d87642 --- /dev/null +++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.flume; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +public class FlumeSinkTest { + + + @Test + public void testSink() throws Exception { + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.fromElements("string1", "string2") + .addSink(new FlumeSink<>("172.25.0.3", 44444, new SimpleStringSchema())); + + tryExecute(environment, "FlumeTest"); + } + + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-connector-flume/src/test/resources/log4j.properties b/flink-connector-flume/src/test/resources/log4j.properties new file mode 100644 index 0000000..15efe08 --- /dev/null +++ b/flink-connector-flume/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=WARN, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 80766be..c40e194 100644 --- a/pom.xml +++ b/pom.xml @@ -708,7 +708,6 @@ </plugins> </build> </profile> - <profile> <id>test-java-home</id> <activation>
