This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3098fc5  [BAHIR-197] Add RpcClient and FlumeSink to connector
3098fc5 is described below

commit 3098fc52162f6a90dad5ba323f1d1e804a9b05aa
Author: ambition119 <[email protected]>
AuthorDate: Fri Feb 1 11:18:24 2019 +0800

    [BAHIR-197] Add RpcClient and FlumeSink to connector
    
    Closes #42
---
 .../connectors/flume/FlumeEventBuilder.java}       |  26 ++--
 .../streaming/connectors/flume/FlumeRpcClient.java | 118 ------------------
 .../streaming/connectors/flume/FlumeSink.java      | 133 +++++++++++++++++----
 .../streaming/connectors/flume/FlumeUtils.java     |  74 ++++++++++++
 .../connectors/flume/FlumeRpcClientTest.java       |  68 -----------
 .../streaming/connectors/flume/FlumeSinkTest.java  |  18 ++-
 .../{FlumeSinkTest.java => FlumeUtilsTest.java}    |  24 ++--
 .../flume/examples/FlumeSinkExample.java           |  59 +++++++++
 .../flume/examples/FlumeThriftService.java         |  73 +++++++++++
 9 files changed, 354 insertions(+), 239 deletions(-)

diff --git 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeEventBuilder.java
similarity index 57%
copy from 
flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
copy to 
flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeEventBuilder.java
index f1255ff..1063237 100644
--- 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++ 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeEventBuilder.java
@@ -14,25 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.flume;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.junit.jupiter.api.Test;
+package org.apache.flink.streaming.connectors.flume;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
 
-@DockerTest
-public class FlumeSinkTest {
+import org.apache.flume.Event;
 
-    @Test
-    public void testSink() throws Exception {
-        StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+import java.io.Serializable;
 
-        environment.fromElements("string1", "string2")
-                .addSink(new FlumeSink<>("172.25.0.3", 44444, new 
SimpleStringSchema()));
+/**
+ * A function that can create a Event from an incoming instance of the given 
type.
+ *
+ * @param <IN>
+ */
+public interface FlumeEventBuilder<IN> extends Function, Serializable {
 
-        tryExecute(environment, "FlumeTest");
-    }
+    Event createFlumeEvent(IN value, RuntimeContext ctx);
 
 }
diff --git 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
deleted file mode 100644
index e918f56..0000000
--- 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-
-class FlumeRpcClient implements AutoCloseable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(FlumeRpcClient.class);
-
-    protected RpcClient client;
-    private String hostname;
-    private int port;
-
-
-    FlumeRpcClient(String hostname, int port) {
-        this.hostname = hostname;
-        this.port = port;
-    }
-
-    /**
-     * Initializes the connection to Apache Flume.
-     */
-    public boolean init() {
-        // Setup the RPC connection
-        int initCounter = 0;
-        while (true) {
-            verifyCounter(initCounter, "Cannot establish connection");
-
-            try {
-                this.client = RpcClientFactory.getDefaultInstance(hostname, 
port);
-            } catch (FlumeException e) {
-                // Wait one second if the connection failed before the next
-                // try
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e1) {
-                    if (LOG.isErrorEnabled()) {
-                        LOG.error("Interrupted while trying to connect {} on 
{}", hostname, port);
-                    }
-                }
-            }
-            if (client != null) {
-                break;
-            }
-            initCounter++;
-        }
-        return client.isActive();
-    }
-
-
-    public boolean sendData(String data) {
-        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
-        return sendData(event);
-    }
-    public boolean sendData(byte[] data) {
-        Event event = EventBuilder.withBody(data);
-        return sendData(event);
-    }
-
-    private boolean sendData(Event event) {
-        return sendData(event, 0);
-    }
-    private boolean sendData(Event event, int retryCount) {
-        verifyCounter(retryCount, "Cannot send message");
-        try {
-            client.append(event);
-            return true;
-        } catch (EventDeliveryException e) {
-            // clean up and recreate the client
-            reconnect();
-            return sendData(event, ++retryCount);
-        }
-    }
-
-
-    private void verifyCounter(int counter, String messaje) {
-        if (counter >= 10) {
-            throw new RuntimeException(messaje + " on " + hostname + " on " + 
port);
-        }
-    }
-
-    private void reconnect() {
-        close();
-        client = null;
-        init();
-    }
-
-    @Override
-    public void close() {
-        if (this.client == null) return;
-
-        this.client.close();
-    }
-}
diff --git 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 7a80fd2..992d19f 100644
--- 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -14,51 +14,136 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.flume;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A Sink for publishing data into Flume.
+ * @param <IN>
+ */
 public class FlumeSink<IN> extends RichSinkFunction<IN> {
+    private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
 
-    private transient FlumeRpcClient client;
+    private static final int DEFAULT_MAXRETRYATTEMPTS = 3;
+    private static final long DEFAULT_WAITTIMEMS = 1000L;
 
-    private String host;
+    private String clientType;
+    private String hostname;
     private int port;
-    private SerializationSchema<IN> schema;
+    private int batchSize;
+    private int maxRetryAttempts;
+    private long waitTimeMs;
+    private List<IN> incomingList;
+    private FlumeEventBuilder eventBuilder;
+    private RpcClient client;
 
-    public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
-        this.host = host;
+    public FlumeSink(String clientType, String hostname, int port, 
FlumeEventBuilder<IN> eventBuilder) {
+        this(clientType, hostname, port, eventBuilder, 
RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE, DEFAULT_MAXRETRYATTEMPTS, 
DEFAULT_WAITTIMEMS);
+    }
+
+    public FlumeSink(String clientType, String hostname, int port, 
FlumeEventBuilder<IN> eventBuilder, int batchSize) {
+        this(clientType, hostname, port, eventBuilder, batchSize, 
DEFAULT_MAXRETRYATTEMPTS, DEFAULT_WAITTIMEMS);
+    }
+
+    public FlumeSink(String clientType, String hostname, int port, 
FlumeEventBuilder<IN> eventBuilder, int batchSize, int maxRetryAttempts, long 
waitTimeMs) {
+        this.clientType = clientType;
+        this.hostname = hostname;
         this.port = port;
-        this.schema = schema;
+        this.eventBuilder = eventBuilder;
+        this.batchSize = batchSize;
+        this.maxRetryAttempts = maxRetryAttempts;
+        this.waitTimeMs = waitTimeMs;
     }
 
-    /**
-     * Receives tuples from the Apache Flink {@link DataStream} and forwards
-     * them to Apache Flume.
-     *
-     * @param value
-     *            The tuple arriving from the datastream
-     */
     @Override
-    public void invoke(IN value, Context context) throws Exception {
-        byte[] data = schema.serialize(value);
-        client.sendData(data);
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        incomingList = new ArrayList();
+        client = FlumeUtils.getRpcClient(clientType, hostname, port, 
batchSize);
     }
 
     @Override
-    public void open(Configuration config) {
-        client = new FlumeRpcClient(host, port);
-        client.init();
+    public void invoke(IN value) throws Exception {
+        int number;
+        synchronized (this) {
+            if (null != value) {
+                incomingList.add(value);
+            }
+            number = incomingList.size();
+        }
+
+        if (number == batchSize) {
+            flush();
+        }
     }
 
     @Override
-    public void close() {
-        if (client == null) return;
-        client.close();
+    public void close() throws Exception {
+        super.close();
+        FlumeUtils.destroy(client);
     }
 
+    private void flush() throws IllegalStateException {
+        List<Event> events = new ArrayList<>();
+        List<IN>  toFlushList;
+        synchronized (this) {
+            if (incomingList.isEmpty()) {
+                return;
+            }
+            toFlushList = incomingList;
+            incomingList = new ArrayList();
+        }
+
+        for (IN value: toFlushList) {
+            Event event = this.eventBuilder.createFlumeEvent(value, 
getRuntimeContext());
+            events.add(event);
+        }
+
+        int retries = 0;
+        boolean flag = true;
+        while (flag) {
+            if (null != client || retries > maxRetryAttempts) {
+                flag = false;
+            }
+
+            if (retries <= maxRetryAttempts && null == client) {
+                LOG.info("Wait for {} ms before retry", waitTimeMs);
+                try {
+                    Thread.sleep(waitTimeMs);
+                } catch (InterruptedException ignored) {
+                    LOG.error("Interrupted while trying to connect {} on {}", 
hostname, port);
+                }
+                reconnect();
+                LOG.info("Retry attempt number {}", retries);
+                retries++;
+            }
+        }
+
+        try {
+            client.appendBatch(events);
+        } catch (EventDeliveryException e) {
+            LOG.info("Encountered exception while sending data to flume : {}", 
e.getMessage(), e);
+        }
+
+    }
+
+    private void reconnect() {
+        FlumeUtils.destroy(client);
+        client = null;
+        client = FlumeUtils.getRpcClient(clientType, hostname, port, 
batchSize);
+    }
 
 }
diff --git 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeUtils.java
 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeUtils.java
new file mode 100644
index 0000000..f9d8009
--- /dev/null
+++ 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.api.RpcClientFactory;
+
+import java.util.Properties;
+
+/**
+ * Flume RpcClient Util.
+ */
+public class FlumeUtils {
+    private static final String CLIENT_TYPE_KEY = "client.type";
+    private static final String CLIENT_TYPE_DEFAULT_FAILOVER = 
"default_failover";
+    private static final String CLIENT_TYPE_DEFAULT_LOADBALANCING = 
"default_loadbalance";
+
+    public static RpcClient getRpcClient(String clientType, String hostname, 
Integer port, Integer batchSize) {
+        Properties props;
+        RpcClient client;
+        switch(clientType.toUpperCase()) {
+            case "THRIFT":
+                client = RpcClientFactory.getThriftInstance(hostname, port, 
batchSize);
+                break;
+            case "DEFAULT":
+                client = RpcClientFactory.getDefaultInstance(hostname, port, 
batchSize);
+                break;
+            case "DEFAULT_FAILOVER":
+                props = getDefaultProperties(hostname, port, batchSize);
+                props.put(CLIENT_TYPE_KEY, CLIENT_TYPE_DEFAULT_FAILOVER);
+                client = RpcClientFactory.getInstance(props);
+                break;
+            case "DEFAULT_LOADBALANCE":
+                props = getDefaultProperties(hostname, port, batchSize);
+                props.put(CLIENT_TYPE_KEY, CLIENT_TYPE_DEFAULT_LOADBALANCING);
+                client = RpcClientFactory.getInstance(props);
+                break;
+            default:
+                throw new IllegalStateException("Unsupported client type - 
cannot happen");
+        }
+        return client;
+    }
+
+    public static void destroy(RpcClient client) {
+        if (null != client) {
+            client.close();
+        }
+    }
+
+    private static Properties getDefaultProperties(String hostname, Integer 
port, Integer batchSize) {
+        Properties props = new Properties();
+        props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+        props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX 
+ "h1",
+            hostname + ":" + port.intValue());
+        props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, 
batchSize.toString());
+        return props;
+    }
+}
diff --git 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
deleted file mode 100644
index 69e5955..0000000
--- 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.flume;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-@DockerTest
-public class FlumeRpcClientTest {
-
-    public FlumeRpcClient createGoodClient() {
-        return new FlumeRpcClient("172.25.0.3", 44444);
-    }
-
-    @Test
-    public void testInitClientMustFail() {
-        FlumeRpcClient client = new FlumeRpcClient("172.25.0.3", 44445);
-        Assertions.assertThrows(RuntimeException.class, () -> client.init(), 
"client start");
-    }
-
-    @Test
-    public void testSendStringData() {
-        FlumeRpcClient client = createGoodClient();
-        boolean init = client.init();
-        Assertions.assertTrue(init, "client not start");
-
-        boolean send = client.sendData("xpto");
-        Assertions.assertTrue(send, "data not send");
-
-    }
-
-    @Test
-    public void testSendBytesData() {
-        FlumeRpcClient client = createGoodClient();
-        boolean init = client.init();
-        Assertions.assertTrue(init, "client not start");
-
-        boolean send = client.sendData("xpto".getBytes());
-        Assertions.assertTrue(send, "data not send");
-
-    }
-
-    @Test
-    public void testSendDataWhenConnectionClosed() {
-        FlumeRpcClient client = createGoodClient();
-        boolean init = client.init();
-        Assertions.assertTrue(init, "client not start");
-        client.close();
-
-        boolean send = client.sendData("xpto");
-        Assertions.assertTrue(send, "data not send");
-
-    }
-}
diff --git 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
index f1255ff..92aad41 100644
--- 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++ 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.flink.streaming.connectors.flume;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
 import org.junit.jupiter.api.Test;
 
+import java.nio.charset.Charset;
+
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 
 @DockerTest
@@ -29,8 +33,16 @@ public class FlumeSinkTest {
     public void testSink() throws Exception {
         StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        environment.fromElements("string1", "string2")
-                .addSink(new FlumeSink<>("172.25.0.3", 44444, new 
SimpleStringSchema()));
+        FlumeEventBuilder<String> flumeEventBuilder = new 
FlumeEventBuilder<String>() {
+            @Override
+            public Event createFlumeEvent(String value, RuntimeContext ctx) {
+                return EventBuilder.withBody(value, Charset.forName("UTF-8"));
+            }
+        };
+
+        FlumeSink<String> flumeSink = new FlumeSink<>("default", "172.25.0.3", 
44444, flumeEventBuilder, 1, 1, 1);
+
+        environment.fromElements("string1", "string2").addSink(flumeSink);
 
         tryExecute(environment, "FlumeTest");
     }
diff --git 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeUtilsTest.java
similarity index 61%
copy from 
flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
copy to 
flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeUtilsTest.java
index f1255ff..f810d05 100644
--- 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++ 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeUtilsTest.java
@@ -16,23 +16,23 @@
  */
 package org.apache.flink.streaming.connectors.flume;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flume.api.RpcClient;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-
 @DockerTest
-public class FlumeSinkTest {
+public class FlumeUtilsTest {
+    private RpcClient client;
 
     @Test
-    public void testSink() throws Exception {
-        StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        environment.fromElements("string1", "string2")
-                .addSink(new FlumeSink<>("172.25.0.3", 44444, new 
SimpleStringSchema()));
-
-        tryExecute(environment, "FlumeTest");
+    public void testGetRpcClient() {
+        client = FlumeUtils.getRpcClient("default","172.25.0.3", 44444, 1);
+        Assertions.assertNotNull(client);
     }
 
+    @Test
+    public void testDestroy() {
+        FlumeUtils.destroy(client);
+        Assertions.assertNull(client);
+    }
 }
diff --git 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeSinkExample.java
 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeSinkExample.java
new file mode 100644
index 0000000..d03c1bb
--- /dev/null
+++ 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeSinkExample.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.flume.FlumeEventBuilder;
+import org.apache.flink.streaming.connectors.flume.FlumeSink;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+
+import java.nio.charset.Charset;
+
+/**
+ * An example FlumeSink that sends data to Flume service.
+ */
+public class FlumeSinkExample {
+    private static String clientType = "thrift";
+    private static String hostname = "localhost";
+    private static int port = 9000;
+
+    public static void main(String[] args) throws Exception {
+        //FlumeSink send data
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        FlumeEventBuilder<String> flumeEventBuilder = new 
FlumeEventBuilder<String>() {
+            @Override
+            public Event createFlumeEvent(String value, RuntimeContext ctx) {
+                return EventBuilder.withBody(value, Charset.defaultCharset());
+            }
+        };
+
+        FlumeSink<String> flumeSink = new FlumeSink<>(clientType, hostname, 
port, flumeEventBuilder, 1, 1, 1);
+
+        // Note: parallelisms and FlumeSink batchSize
+        // if every parallelism not enough batchSize, this parallelism not 
word FlumeThriftService output
+        DataStreamSink<String> dataStream = env.fromElements("one", "two", 
"three", "four", "five")
+                .addSink(flumeSink);
+
+        env.execute();
+    }
+}
diff --git 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeThriftService.java
 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeThriftService.java
new file mode 100644
index 0000000..04fd9b9
--- /dev/null
+++ 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeThriftService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume.examples;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.ThriftSource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Start Flume Source service.
+ */
+public class FlumeThriftService {
+    private static String hostname = "localhost";
+    private static int port = 9000;
+
+    public static void main(String[] args) throws Exception {
+        //Flume Source
+        ThriftSource source = new ThriftSource();
+        Channel ch = new MemoryChannel();
+        Configurables.configure(ch, new Context());
+
+        Context context = new Context();
+        context.put("port", String.valueOf(port));
+        context.put("bind", hostname);
+        Configurables.configure(source, context);
+
+        List<Channel> channels = new ArrayList<>();
+        channels.add(ch);
+        ChannelSelector rcs = new ReplicatingChannelSelector();
+        rcs.setChannels(channels);
+        source.setChannelProcessor(new ChannelProcessor(rcs));
+        source.start();
+        System.out.println("ThriftSource service start.");
+
+        while (true) {
+            Transaction transaction = ch.getTransaction();
+            transaction.begin();
+            Event event = ch.take();
+            if (null != event) {
+                System.out.println(event);
+                System.out.println(new String(event.getBody()).trim());
+            }
+            transaction.commit();
+            transaction.close();
+        }
+
+    }
+}

Reply via email to