This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3098fc5 [BAHIR-197] Add RpcClient and FlumeSink to connector
3098fc5 is described below
commit 3098fc52162f6a90dad5ba323f1d1e804a9b05aa
Author: ambition119 <[email protected]>
AuthorDate: Fri Feb 1 11:18:24 2019 +0800
[BAHIR-197] Add RpcClient and FlumeSink to connector
Closes #42
---
.../connectors/flume/FlumeEventBuilder.java} | 26 ++--
.../streaming/connectors/flume/FlumeRpcClient.java | 118 ------------------
.../streaming/connectors/flume/FlumeSink.java | 133 +++++++++++++++++----
.../streaming/connectors/flume/FlumeUtils.java | 74 ++++++++++++
.../connectors/flume/FlumeRpcClientTest.java | 68 -----------
.../streaming/connectors/flume/FlumeSinkTest.java | 18 ++-
.../{FlumeSinkTest.java => FlumeUtilsTest.java} | 24 ++--
.../flume/examples/FlumeSinkExample.java | 59 +++++++++
.../flume/examples/FlumeThriftService.java | 73 +++++++++++
9 files changed, 354 insertions(+), 239 deletions(-)
diff --git
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeEventBuilder.java
similarity index 57%
copy from
flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
copy to
flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeEventBuilder.java
index f1255ff..1063237 100644
---
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeEventBuilder.java
@@ -14,25 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.flume;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.junit.jupiter.api.Test;
+package org.apache.flink.streaming.connectors.flume;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
-@DockerTest
-public class FlumeSinkTest {
+import org.apache.flume.Event;
- @Test
- public void testSink() throws Exception {
- StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
+import java.io.Serializable;
- environment.fromElements("string1", "string2")
- .addSink(new FlumeSink<>("172.25.0.3", 44444, new
SimpleStringSchema()));
+/**
+ * A function that can create a Event from an incoming instance of the given
type.
+ *
+ * @param <IN>
+ */
+public interface FlumeEventBuilder<IN> extends Function, Serializable {
- tryExecute(environment, "FlumeTest");
- }
+ Event createFlumeEvent(IN value, RuntimeContext ctx);
}
diff --git
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
deleted file mode 100644
index e918f56..0000000
---
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-
-class FlumeRpcClient implements AutoCloseable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(FlumeRpcClient.class);
-
- protected RpcClient client;
- private String hostname;
- private int port;
-
-
- FlumeRpcClient(String hostname, int port) {
- this.hostname = hostname;
- this.port = port;
- }
-
- /**
- * Initializes the connection to Apache Flume.
- */
- public boolean init() {
- // Setup the RPC connection
- int initCounter = 0;
- while (true) {
- verifyCounter(initCounter, "Cannot establish connection");
-
- try {
- this.client = RpcClientFactory.getDefaultInstance(hostname,
port);
- } catch (FlumeException e) {
- // Wait one second if the connection failed before the next
- // try
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Interrupted while trying to connect {} on
{}", hostname, port);
- }
- }
- }
- if (client != null) {
- break;
- }
- initCounter++;
- }
- return client.isActive();
- }
-
-
- public boolean sendData(String data) {
- Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
- return sendData(event);
- }
- public boolean sendData(byte[] data) {
- Event event = EventBuilder.withBody(data);
- return sendData(event);
- }
-
- private boolean sendData(Event event) {
- return sendData(event, 0);
- }
- private boolean sendData(Event event, int retryCount) {
- verifyCounter(retryCount, "Cannot send message");
- try {
- client.append(event);
- return true;
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- reconnect();
- return sendData(event, ++retryCount);
- }
- }
-
-
- private void verifyCounter(int counter, String messaje) {
- if (counter >= 10) {
- throw new RuntimeException(messaje + " on " + hostname + " on " +
port);
- }
- }
-
- private void reconnect() {
- close();
- client = null;
- init();
- }
-
- @Override
- public void close() {
- if (this.client == null) return;
-
- this.client.close();
- }
-}
diff --git
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 7a80fd2..992d19f 100644
---
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -14,51 +14,136 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.flume;
-import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A Sink for publishing data into Flume.
+ * @param <IN>
+ */
public class FlumeSink<IN> extends RichSinkFunction<IN> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
- private transient FlumeRpcClient client;
+ private static final int DEFAULT_MAXRETRYATTEMPTS = 3;
+ private static final long DEFAULT_WAITTIMEMS = 1000L;
- private String host;
+ private String clientType;
+ private String hostname;
private int port;
- private SerializationSchema<IN> schema;
+ private int batchSize;
+ private int maxRetryAttempts;
+ private long waitTimeMs;
+ private List<IN> incomingList;
+ private FlumeEventBuilder eventBuilder;
+ private RpcClient client;
- public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
- this.host = host;
+ public FlumeSink(String clientType, String hostname, int port,
FlumeEventBuilder<IN> eventBuilder) {
+ this(clientType, hostname, port, eventBuilder,
RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE, DEFAULT_MAXRETRYATTEMPTS,
DEFAULT_WAITTIMEMS);
+ }
+
+ public FlumeSink(String clientType, String hostname, int port,
FlumeEventBuilder<IN> eventBuilder, int batchSize) {
+ this(clientType, hostname, port, eventBuilder, batchSize,
DEFAULT_MAXRETRYATTEMPTS, DEFAULT_WAITTIMEMS);
+ }
+
+ public FlumeSink(String clientType, String hostname, int port,
FlumeEventBuilder<IN> eventBuilder, int batchSize, int maxRetryAttempts, long
waitTimeMs) {
+ this.clientType = clientType;
+ this.hostname = hostname;
this.port = port;
- this.schema = schema;
+ this.eventBuilder = eventBuilder;
+ this.batchSize = batchSize;
+ this.maxRetryAttempts = maxRetryAttempts;
+ this.waitTimeMs = waitTimeMs;
}
- /**
- * Receives tuples from the Apache Flink {@link DataStream} and forwards
- * them to Apache Flume.
- *
- * @param value
- * The tuple arriving from the datastream
- */
@Override
- public void invoke(IN value, Context context) throws Exception {
- byte[] data = schema.serialize(value);
- client.sendData(data);
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ incomingList = new ArrayList();
+ client = FlumeUtils.getRpcClient(clientType, hostname, port,
batchSize);
}
@Override
- public void open(Configuration config) {
- client = new FlumeRpcClient(host, port);
- client.init();
+ public void invoke(IN value) throws Exception {
+ int number;
+ synchronized (this) {
+ if (null != value) {
+ incomingList.add(value);
+ }
+ number = incomingList.size();
+ }
+
+ if (number == batchSize) {
+ flush();
+ }
}
@Override
- public void close() {
- if (client == null) return;
- client.close();
+ public void close() throws Exception {
+ super.close();
+ FlumeUtils.destroy(client);
}
+ private void flush() throws IllegalStateException {
+ List<Event> events = new ArrayList<>();
+ List<IN> toFlushList;
+ synchronized (this) {
+ if (incomingList.isEmpty()) {
+ return;
+ }
+ toFlushList = incomingList;
+ incomingList = new ArrayList();
+ }
+
+ for (IN value: toFlushList) {
+ Event event = this.eventBuilder.createFlumeEvent(value,
getRuntimeContext());
+ events.add(event);
+ }
+
+ int retries = 0;
+ boolean flag = true;
+ while (flag) {
+ if (null != client || retries > maxRetryAttempts) {
+ flag = false;
+ }
+
+ if (retries <= maxRetryAttempts && null == client) {
+ LOG.info("Wait for {} ms before retry", waitTimeMs);
+ try {
+ Thread.sleep(waitTimeMs);
+ } catch (InterruptedException ignored) {
+ LOG.error("Interrupted while trying to connect {} on {}",
hostname, port);
+ }
+ reconnect();
+ LOG.info("Retry attempt number {}", retries);
+ retries++;
+ }
+ }
+
+ try {
+ client.appendBatch(events);
+ } catch (EventDeliveryException e) {
+ LOG.info("Encountered exception while sending data to flume : {}",
e.getMessage(), e);
+ }
+
+ }
+
+ private void reconnect() {
+ FlumeUtils.destroy(client);
+ client = null;
+ client = FlumeUtils.getRpcClient(clientType, hostname, port,
batchSize);
+ }
}
diff --git
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeUtils.java
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeUtils.java
new file mode 100644
index 0000000..f9d8009
--- /dev/null
+++
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.api.RpcClientFactory;
+
+import java.util.Properties;
+
+/**
+ * Flume RpcClient Util.
+ */
+public class FlumeUtils {
+ private static final String CLIENT_TYPE_KEY = "client.type";
+ private static final String CLIENT_TYPE_DEFAULT_FAILOVER =
"default_failover";
+ private static final String CLIENT_TYPE_DEFAULT_LOADBALANCING =
"default_loadbalance";
+
+ public static RpcClient getRpcClient(String clientType, String hostname,
Integer port, Integer batchSize) {
+ Properties props;
+ RpcClient client;
+ switch(clientType.toUpperCase()) {
+ case "THRIFT":
+ client = RpcClientFactory.getThriftInstance(hostname, port,
batchSize);
+ break;
+ case "DEFAULT":
+ client = RpcClientFactory.getDefaultInstance(hostname, port,
batchSize);
+ break;
+ case "DEFAULT_FAILOVER":
+ props = getDefaultProperties(hostname, port, batchSize);
+ props.put(CLIENT_TYPE_KEY, CLIENT_TYPE_DEFAULT_FAILOVER);
+ client = RpcClientFactory.getInstance(props);
+ break;
+ case "DEFAULT_LOADBALANCE":
+ props = getDefaultProperties(hostname, port, batchSize);
+ props.put(CLIENT_TYPE_KEY, CLIENT_TYPE_DEFAULT_LOADBALANCING);
+ client = RpcClientFactory.getInstance(props);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported client type -
cannot happen");
+ }
+ return client;
+ }
+
+ public static void destroy(RpcClient client) {
+ if (null != client) {
+ client.close();
+ }
+ }
+
+ private static Properties getDefaultProperties(String hostname, Integer
port, Integer batchSize) {
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX
+ "h1",
+ hostname + ":" + port.intValue());
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
batchSize.toString());
+ return props;
+ }
+}
diff --git
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
deleted file mode 100644
index 69e5955..0000000
---
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.flume;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-@DockerTest
-public class FlumeRpcClientTest {
-
- public FlumeRpcClient createGoodClient() {
- return new FlumeRpcClient("172.25.0.3", 44444);
- }
-
- @Test
- public void testInitClientMustFail() {
- FlumeRpcClient client = new FlumeRpcClient("172.25.0.3", 44445);
- Assertions.assertThrows(RuntimeException.class, () -> client.init(),
"client start");
- }
-
- @Test
- public void testSendStringData() {
- FlumeRpcClient client = createGoodClient();
- boolean init = client.init();
- Assertions.assertTrue(init, "client not start");
-
- boolean send = client.sendData("xpto");
- Assertions.assertTrue(send, "data not send");
-
- }
-
- @Test
- public void testSendBytesData() {
- FlumeRpcClient client = createGoodClient();
- boolean init = client.init();
- Assertions.assertTrue(init, "client not start");
-
- boolean send = client.sendData("xpto".getBytes());
- Assertions.assertTrue(send, "data not send");
-
- }
-
- @Test
- public void testSendDataWhenConnectionClosed() {
- FlumeRpcClient client = createGoodClient();
- boolean init = client.init();
- Assertions.assertTrue(init, "client not start");
- client.close();
-
- boolean send = client.sendData("xpto");
- Assertions.assertTrue(send, "data not send");
-
- }
-}
diff --git
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
index f1255ff..92aad41 100644
---
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
@@ -16,10 +16,14 @@
*/
package org.apache.flink.streaming.connectors.flume;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
import org.junit.jupiter.api.Test;
+import java.nio.charset.Charset;
+
import static org.apache.flink.test.util.TestUtils.tryExecute;
@DockerTest
@@ -29,8 +33,16 @@ public class FlumeSinkTest {
public void testSink() throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
- environment.fromElements("string1", "string2")
- .addSink(new FlumeSink<>("172.25.0.3", 44444, new
SimpleStringSchema()));
+ FlumeEventBuilder<String> flumeEventBuilder = new
FlumeEventBuilder<String>() {
+ @Override
+ public Event createFlumeEvent(String value, RuntimeContext ctx) {
+ return EventBuilder.withBody(value, Charset.forName("UTF-8"));
+ }
+ };
+
+ FlumeSink<String> flumeSink = new FlumeSink<>("default", "172.25.0.3",
44444, flumeEventBuilder, 1, 1, 1);
+
+ environment.fromElements("string1", "string2").addSink(flumeSink);
tryExecute(environment, "FlumeTest");
}
diff --git
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeUtilsTest.java
similarity index 61%
copy from
flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
copy to
flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeUtilsTest.java
index f1255ff..f810d05 100644
---
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeUtilsTest.java
@@ -16,23 +16,23 @@
*/
package org.apache.flink.streaming.connectors.flume;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flume.api.RpcClient;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-
@DockerTest
-public class FlumeSinkTest {
+public class FlumeUtilsTest {
+ private RpcClient client;
@Test
- public void testSink() throws Exception {
- StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- environment.fromElements("string1", "string2")
- .addSink(new FlumeSink<>("172.25.0.3", 44444, new
SimpleStringSchema()));
-
- tryExecute(environment, "FlumeTest");
+ public void testGetRpcClient() {
+ client = FlumeUtils.getRpcClient("default","172.25.0.3", 44444, 1);
+ Assertions.assertNotNull(client);
}
+ @Test
+ public void testDestroy() {
+ FlumeUtils.destroy(client);
+ Assertions.assertNull(client);
+ }
}
diff --git
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeSinkExample.java
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeSinkExample.java
new file mode 100644
index 0000000..d03c1bb
--- /dev/null
+++
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeSinkExample.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.flume.FlumeEventBuilder;
+import org.apache.flink.streaming.connectors.flume.FlumeSink;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+
+import java.nio.charset.Charset;
+
+/**
+ * An example FlumeSink that sends data to Flume service.
+ */
+public class FlumeSinkExample {
+ private static String clientType = "thrift";
+ private static String hostname = "localhost";
+ private static int port = 9000;
+
+ public static void main(String[] args) throws Exception {
+ //FlumeSink send data
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ FlumeEventBuilder<String> flumeEventBuilder = new
FlumeEventBuilder<String>() {
+ @Override
+ public Event createFlumeEvent(String value, RuntimeContext ctx) {
+ return EventBuilder.withBody(value, Charset.defaultCharset());
+ }
+ };
+
+ FlumeSink<String> flumeSink = new FlumeSink<>(clientType, hostname,
port, flumeEventBuilder, 1, 1, 1);
+
+ // Note: parallelisms and FlumeSink batchSize
+ // if every parallelism not enough batchSize, this parallelism not
word FlumeThriftService output
+ DataStreamSink<String> dataStream = env.fromElements("one", "two",
"three", "four", "five")
+ .addSink(flumeSink);
+
+ env.execute();
+ }
+}
diff --git
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeThriftService.java
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeThriftService.java
new file mode 100644
index 0000000..04fd9b9
--- /dev/null
+++
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/examples/FlumeThriftService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume.examples;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.ThriftSource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Start Flume Source service.
+ */
+public class FlumeThriftService {
+ private static String hostname = "localhost";
+ private static int port = 9000;
+
+ public static void main(String[] args) throws Exception {
+ //Flume Source
+ ThriftSource source = new ThriftSource();
+ Channel ch = new MemoryChannel();
+ Configurables.configure(ch, new Context());
+
+ Context context = new Context();
+ context.put("port", String.valueOf(port));
+ context.put("bind", hostname);
+ Configurables.configure(source, context);
+
+ List<Channel> channels = new ArrayList<>();
+ channels.add(ch);
+ ChannelSelector rcs = new ReplicatingChannelSelector();
+ rcs.setChannels(channels);
+ source.setChannelProcessor(new ChannelProcessor(rcs));
+ source.start();
+ System.out.println("ThriftSource service start.");
+
+ while (true) {
+ Transaction transaction = ch.getTransaction();
+ transaction.begin();
+ Event event = ch.take();
+ if (null != event) {
+ System.out.println(event);
+ System.out.println(new String(event.getBody()).trim());
+ }
+ transaction.commit();
+ transaction.close();
+ }
+
+ }
+}