This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push: new 578d1bb [TUBEMQ-163] Flume sink for TubeMQ (#146) 578d1bb is described below commit 578d1bb46bafe052698528ce9bb59f82b3b44b67 Author: Yuanbo Liu <liuyuanb...@gmail.com> AuthorDate: Tue Jun 16 15:17:12 2020 +0800 [TUBEMQ-163] Flume sink for TubeMQ (#146) --- tubemq-connectors/pom.xml | 1 + tubemq-connectors/tubemq-connector-flume/README.md | 21 + .../{ => tubemq-connector-flume}/pom.xml | 58 +-- .../apache/flume/sink/tubemq/ConfigOptions.java | 65 ++++ .../org/apache/flume/sink/tubemq/EventStat.java | 63 +++ .../apache/flume/sink/tubemq/TubeSinkCounter.java | 54 +++ .../org/apache/flume/sink/tubemq/TubemqSink.java | 424 +++++++++++++++++++++ .../apache/flume/sink/tubemq/TestTubemqSink.java | 110 ++++++ .../src/test/resources/log4j.properties | 20 + 9 files changed, 787 insertions(+), 29 deletions(-) diff --git a/tubemq-connectors/pom.xml b/tubemq-connectors/pom.xml index d3e8cd9..56a4c0e 100644 --- a/tubemq-connectors/pom.xml +++ b/tubemq-connectors/pom.xml @@ -30,6 +30,7 @@ <packaging>pom</packaging> <modules> <module>tubemq-connector-flink</module> + <module>tubemq-connector-flume</module> </modules> <dependencies> diff --git a/tubemq-connectors/tubemq-connector-flume/README.md b/tubemq-connectors/tubemq-connector-flume/README.md new file mode 100644 index 0000000..f7780f5 --- /dev/null +++ b/tubemq-connectors/tubemq-connector-flume/README.md @@ -0,0 +1,21 @@ +### TubeMQ Flume Connector +#### Prerequisites + +Copy the following files to flume library path. + +``` +tubemq-connector-flume-[TUBEMQ-VERSION].jar +tubemq-client-[TUBEMQ-VERSION].jar +tubemq-core-[TUBEMQ-VERSION].jar +``` + +#### Flume Sink Configuration Template + +``` +agent.sinks = tubemq +agent.sinks.tubemq.type = org.apache.flume.sink.tubemq.TubemqSink +// master host addresses, it could separate with ",". +agent.sinks.tubemq.master-host-port-list = 127.0.0.1:8000 +// the default topic name, it could be override by topic in envent header. +agent.sinks.tubemq.topic = demo +``` \ No newline at end of file diff --git a/tubemq-connectors/pom.xml b/tubemq-connectors/tubemq-connector-flume/pom.xml similarity index 54% copy from tubemq-connectors/pom.xml copy to tubemq-connectors/tubemq-connector-flume/pom.xml index d3e8cd9..f719501 100644 --- a/tubemq-connectors/pom.xml +++ b/tubemq-connectors/tubemq-connector-flume/pom.xml @@ -20,44 +20,44 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>tubemq</artifactId> + <artifactId>tubemq-connectors</artifactId> <groupId>org.apache.tubemq</groupId> <version>0.5.0-incubating-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <name>Apache TubeMQ - Connectors</name> - <artifactId>tubemq-connectors</artifactId> - <packaging>pom</packaging> - <modules> - <module>tubemq-connector-flink</module> - </modules> + <name>Apache TubeMQ - Connectors-flume</name> + <artifactId>tubemq-connector-flume</artifactId> + + <properties> + <flume.version>1.9.0</flume.version> + <mockito.version>1.9.0</mockito.version> + <junit.version>4.10</junit.version> + <awaitility.version>4.0.3</awaitility.version> + </properties> <dependencies> <dependency> - <groupId>org.apache.tubemq</groupId> - <artifactId>tubemq-client</artifactId> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.tubemq</groupId> - <artifactId>tubemq-core</artifactId> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility.version}</version> + <scope>test</scope> </dependency> </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.1</version> - <configuration> - <source>1.8</source> - <target>1.8</target> - <encoding>UTF-8</encoding> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - </plugins> - </build> </project> \ No newline at end of file diff --git a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/ConfigOptions.java b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/ConfigOptions.java new file mode 100644 index 0000000..7b3fcf4 --- /dev/null +++ b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/ConfigOptions.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.tubemq; + +/** + * Flume configuration options for tubemq sink + */ +public class ConfigOptions { + + // master host of tube mq cluster + public static final String MASTER_HOST_PORT_LIST = "master-host-port-list"; + + // topic name + public static final String TOPIC = "topic"; + + public static final String HEARTBEAT_PERIOD = "heartbeat-period"; // in milliseconds + public static final long DEFAULT_HEARTBEAT_PERIOD = 15000L; + + public static final String RPC_TIMEOUT = "rpc-timeout"; + public static final long DEFAULT_RPC_TIMEOUT = 20000L; + + public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link-max-allowed-delayed-msg-count"; + public static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L; + + public static final String SESSION_WARN_DELAYED_MSG_COUNT = "session-warn-delayed-msg-count"; + public static final long DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT = 2000000L; + + public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session-max-allowed-delayed-msg-count"; + public static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L; + + public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty-write-buffer-high-water-mark"; + public static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L; + + public static final String SINK_THREAD_NUM = "thread-num"; + public static final int DEFAULT_SINK_THREAD_NUM = 4; + + public static final String RETRY_QUEUE_CAPACITY = "retry-queue-capacity"; + public static final int DEFAULT_RETRY_QUEUE_CAPACITY = 10000; + + public static final String EVENT_QUEUE_CAPACITY = "retry-queue-capacity"; + public static final int DEFAULT_EVENT_QUEUE_CAPACITY = 1000; + + public static final String EVENT_MAX_RETRY_TIME = "event-max-retry-time"; + public static final int DEFAULT_EVENT_MAX_RETRY_TIME = 5; + + public static final String EVENT_OFFER_TIMEOUT = "event-offer-timeout"; + public static final long DEFAULT_EVENT_OFFER_TIMEOUT = 3 * 1000; // in milliseconds + +} diff --git a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/EventStat.java b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/EventStat.java new file mode 100644 index 0000000..5c6f0fd --- /dev/null +++ b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/EventStat.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.tubemq; +import static org.apache.flume.sink.tubemq.ConfigOptions.TOPIC; + +import java.util.Map; +import org.apache.flume.Event; + + + +/** + * Event with retry time + */ +public class EventStat { + private final Event event; + private int retryCnt; + private String topic; + + public EventStat(Event event) { + this.event = event; + this.retryCnt = 0; + Map<String, String> headers = event.getHeaders(); + if (headers != null && headers.containsKey(TOPIC)) { + this.topic = headers.get(TOPIC); + } + } + + public String getTopic() { + return topic; + } + + public void setTopic(String defaultTopic) { + this.topic = defaultTopic; + } + + public Event getEvent() { + return event; + } + + public int getRetryCnt() { + return retryCnt; + } + + public void incRetryCnt() { + this.retryCnt++; + } +} diff --git a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubeSinkCounter.java b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubeSinkCounter.java new file mode 100644 index 0000000..a4704f9 --- /dev/null +++ b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubeSinkCounter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.tubemq; + +import org.apache.flume.instrumentation.SinkCounter; + +/** + * Counters for tube sink + */ +public class TubeSinkCounter extends SinkCounter { + private static final String COUNT_ROLLBACK = + "sink.rollback.count"; + + private static final String COUNT_SEND = + "sink.send.count"; + + private static final String[] ATTRIBUTES = { COUNT_ROLLBACK, COUNT_SEND }; + + public TubeSinkCounter(String name) { + super(name, ATTRIBUTES); + } + + public long incrementRollbackCount() { + return increment(COUNT_ROLLBACK); + } + + public long incrementSendCount() { + return increment(COUNT_SEND); + } + + public long getTubeRollbackCount() { + return get(COUNT_ROLLBACK); + } + + public long getTubeSendCount() { + return get(COUNT_SEND); + } +} diff --git a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubemqSink.java b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubemqSink.java new file mode 100644 index 0000000..be6cbc7 --- /dev/null +++ b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubemqSink.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.tubemq; + +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_EVENT_MAX_RETRY_TIME; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_EVENT_OFFER_TIMEOUT; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_EVENT_QUEUE_CAPACITY; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_HEARTBEAT_PERIOD; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_RETRY_QUEUE_CAPACITY; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_RPC_TIMEOUT; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT; +import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_SINK_THREAD_NUM; +import static org.apache.flume.sink.tubemq.ConfigOptions.EVENT_MAX_RETRY_TIME; +import static org.apache.flume.sink.tubemq.ConfigOptions.EVENT_OFFER_TIMEOUT; +import static org.apache.flume.sink.tubemq.ConfigOptions.EVENT_QUEUE_CAPACITY; +import static org.apache.flume.sink.tubemq.ConfigOptions.HEARTBEAT_PERIOD; +import static org.apache.flume.sink.tubemq.ConfigOptions.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT; +import static org.apache.flume.sink.tubemq.ConfigOptions.MASTER_HOST_PORT_LIST; +import static org.apache.flume.sink.tubemq.ConfigOptions.NETTY_WRITE_BUFFER_HIGH_WATER_MARK; +import static org.apache.flume.sink.tubemq.ConfigOptions.RETRY_QUEUE_CAPACITY; +import static org.apache.flume.sink.tubemq.ConfigOptions.RPC_TIMEOUT; +import static org.apache.flume.sink.tubemq.ConfigOptions.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT; +import static org.apache.flume.sink.tubemq.ConfigOptions.SESSION_WARN_DELAYED_MSG_COUNT; +import static org.apache.flume.sink.tubemq.ConfigOptions.SINK_THREAD_NUM; +import static org.apache.flume.sink.tubemq.ConfigOptions.TOPIC; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; +import org.apache.tubemq.client.config.TubeClientConfig; +import org.apache.tubemq.client.exception.TubeClientException; +import org.apache.tubemq.client.factory.TubeMultiSessionFactory; +import org.apache.tubemq.client.producer.MessageProducer; +import org.apache.tubemq.client.producer.MessageSentCallback; +import org.apache.tubemq.client.producer.MessageSentResult; +import org.apache.tubemq.corebase.Message; +import org.apache.tubemq.corerpc.exception.OverflowException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Make tubemq as one of flume sinks + */ +public class TubemqSink extends AbstractSink implements Configurable { + private static final Logger LOGGER = LoggerFactory.getLogger(TubemqSink.class); + + public TubeMultiSessionFactory sessionFactory; + public ConcurrentHashMap<String, MessageProducer> producerMap; + + private String masterHostAndPortList; + private String defaultTopic; + private long heartbeatPeriod; + private long rpcTimeout; + + private long linkMaxAllowedDelayedMsgCount; + private long sessionWarnDelayedMsgCount; + private long sessionMaxAllowedDelayedMsgCount; + private long nettyWriteBufferHighWaterMark; + + private ExecutorService sinkThreadPool; + private final List<Future<?>> threadFutures = new ArrayList<>(); + private int threadNum; + + private boolean started = false; + // check if overflow + private boolean overflow = false; + + private LinkedBlockingQueue<EventStat> resendQueue; + + private LinkedBlockingQueue<Event> eventQueue; + + private int maxRetryTime; + private long eventOfferTimeout; + + private TubeClientConfig clientConfig; + + private TubeSinkCounter counter; + + /** + * init tube config + * + * @return tube config + */ + private TubeClientConfig initTubeConfig() { + final TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterHostAndPortList); + tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount); + tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount); + tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount); + tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark); + tubeClientConfig.setHeartbeatPeriodMs(heartbeatPeriod); + tubeClientConfig.setRpcTimeoutMs(rpcTimeout); + + return tubeClientConfig; + } + + @VisibleForTesting + TubeClientConfig getClientConfig() { + return clientConfig; + } + + @VisibleForTesting + TubeSinkCounter getCounter() { + return counter; + } + + /** + * Create producer + * + * @throws FlumeException + */ + private void createConnection() throws FlumeException { + if (sessionFactory != null) { + return; + } + try { + sessionFactory = new TubeMultiSessionFactory(clientConfig); + } catch (TubeClientException e) { + LOGGER.error("create connection error in tubemqSink, " + + "maybe tubemq master set error, please re-check. ex1 {}", e.getMessage()); + throw new FlumeException("connect to tubemq error1, please re-check", e); + } + + if (producerMap == null) { + producerMap = new ConcurrentHashMap<>(); + } + + } + + /** + * Destroy all producers and clear up caches. + */ + private void destroyConnection() { + for (String topic : producerMap.keySet()) { + MessageProducer producer = producerMap.get(topic); + try { + producer.shutdown(); + } catch (Throwable e) { + LOGGER.error("destroy producer error in tubemqSink, ex", e); + } + } + producerMap.clear(); + + if (sessionFactory != null) { + try { + sessionFactory.shutdown(); + } catch (Exception e) { + LOGGER.error("destroy sessionFactory error in tubemqSink, MetaClientException", e); + } + } + sessionFactory = null; + LOGGER.debug("closed meta producer"); + } + + @Override + public void start() { + LOGGER.info("tubemq sink starting..."); + + // create connection + try { + createConnection(); + } catch (FlumeException e) { + // close connection + destroyConnection(); + LOGGER.error("Unable to create tubemq client" + ". Exception follows.", e); + } + started = true; + // submit worker threads + for (int i = 0; i < threadNum; i++) { + threadFutures.add(sinkThreadPool.submit(new SinkTask())); + } + super.start(); + } + + @Override + public void stop() { + LOGGER.info("tubemq sink stopping"); + started = false; + if (sinkThreadPool != null) { + sinkThreadPool.shutdown(); + } + for (Future<?> future : threadFutures) { + // interrupt threads + future.cancel(true); + } + threadFutures.clear(); + destroyConnection(); + super.stop(); + } + + @Override + public Status process() { + if (!started) { + return Status.BACKOFF; + } + Channel channel = getChannel(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Status status = Status.READY; + try { + Event event = channel.take(); + if (event != null) { + if (!eventQueue.offer(event, eventOfferTimeout, TimeUnit.MILLISECONDS)) { + LOGGER.info("[{}] Channel --> Queue(has no enough space,current code point) --> tubemq, Check " + + "if tubemq server or network is ok.(if this situation last long time it will cause" + + " memoryChannel full and fileChannel write.)", getName()); + counter.incrementRollbackCount(); + tx.rollback(); + } else { + tx.commit(); + } + } else { + // if event is null, that means queue is empty, backoff it. + status = Status.BACKOFF; + tx.commit(); + } + } catch (Throwable t) { + LOGGER.error("Process event failed!" + this.getName(), t); + try { + counter.incrementRollbackCount(); + tx.rollback(); + } catch (Throwable e) { + LOGGER.error("tubemq sink transaction rollback exception", e); + } + } finally { + tx.close(); + } + return status; + } + + @Override + public void configure(Context context) { + LOGGER.info(context.toString()); + masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST); + defaultTopic = context.getString(TOPIC); + heartbeatPeriod = context.getLong(HEARTBEAT_PERIOD, DEFAULT_HEARTBEAT_PERIOD); + rpcTimeout = context.getLong(RPC_TIMEOUT, DEFAULT_RPC_TIMEOUT); + + linkMaxAllowedDelayedMsgCount = context.getLong( + LINK_MAX_ALLOWED_DELAYED_MSG_COUNT, + DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT); + sessionWarnDelayedMsgCount = context.getLong( + SESSION_WARN_DELAYED_MSG_COUNT, + DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT); + sessionMaxAllowedDelayedMsgCount = context.getLong( + SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT, + DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT); + nettyWriteBufferHighWaterMark = context.getLong( + NETTY_WRITE_BUFFER_HIGH_WATER_MARK, + DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK); + + producerMap = new ConcurrentHashMap<>(); + + threadNum = context.getInteger(SINK_THREAD_NUM, DEFAULT_SINK_THREAD_NUM); + sinkThreadPool = Executors.newFixedThreadPool(threadNum); + + int retryQueueCapacity = context.getInteger(RETRY_QUEUE_CAPACITY, DEFAULT_RETRY_QUEUE_CAPACITY); + resendQueue = new LinkedBlockingQueue<>(retryQueueCapacity); + + int eventQueueCapacity = context.getInteger(EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY); + eventQueue = new LinkedBlockingQueue<>(eventQueueCapacity); + + maxRetryTime = context.getInteger(EVENT_MAX_RETRY_TIME, DEFAULT_EVENT_MAX_RETRY_TIME); + eventOfferTimeout = context.getLong(EVENT_OFFER_TIMEOUT, DEFAULT_EVENT_OFFER_TIMEOUT); + + counter = new TubeSinkCounter(this.getName()); + + clientConfig = initTubeConfig(); + } + + /** + * Get producer from cache, create it if not exists. + * + * @param topic - topic name + * @return MessageProducer + * @throws TubeClientException + */ + private MessageProducer getProducer(String topic) throws TubeClientException { + if (!producerMap.containsKey(topic)) { + MessageProducer producer = sessionFactory.createProducer(); + // publish topic + producer.publish(topic); + producerMap.putIfAbsent(topic, producer); + } + return producerMap.get(topic); + } + + + class SinkTask implements Runnable { + + private void sleepIfOverflow() throws Exception { + if (overflow) { + overflow = false; + Thread.sleep(50); + } + } + + /** + * fetch message, wait if queue is empty + * + * @return EventStat + * @throws Exception + */ + private EventStat fetchEventStat() throws Exception { + EventStat es = null; + if (!resendQueue.isEmpty()) { + es = resendQueue.poll(); + } else { + // wait if eventQueue is empty + Event event = eventQueue.take(); + es = new EventStat(event); + } + return es; + } + + private void sendEvent(MessageProducer producer, EventStat es) throws Exception { + // send message with callback + Message message = new Message(es.getTopic(), es.getEvent().getBody()); + producer.sendMessage(message, new MessageSentCallback() { + @Override + public void onMessageSent(MessageSentResult result) { + if (!result.isSuccess()) { + resendEvent(es); + } + } + + @Override + public void onException(Throwable e) { + LOGGER.error("exception caught", e); + if (e instanceof OverflowException) { + overflow = true; + } + resendEvent(es); + } + }); + } + + /** + * Resent event + * + * @param es EventStat + */ + private void resendEvent(EventStat es) { + if (es == null || es.getEvent() == null) { + return; + } + es.incRetryCnt(); + if (es.getRetryCnt() > maxRetryTime) { + LOGGER.error("event max retry reached, ignore it"); + return; + } + + // if resend queue is full, send back to channel + if (!resendQueue.offer(es)) { + getChannel().put(es.getEvent()); + LOGGER.warn("resend queue is full, size: {}, send back to channel", resendQueue.size()); + } + } + + @Override + public void run() { + LOGGER.info("Sink task {} started.", Thread.currentThread().getName()); + while (started) { + boolean decrementFlag = false; + EventStat es = null; + try { + sleepIfOverflow(); + // fetch event, wait if necessary + es = fetchEventStat(); + if (es.getTopic() == null || es.getTopic().equals("")) { + LOGGER.debug("no topic specified in event header, use default topic instead"); + es.setTopic(defaultTopic); + } + counter.incrementSendCount(); + MessageProducer producer; + try { + producer = getProducer(es.getTopic()); + sendEvent(producer, es); + } catch (Exception e) { + LOGGER.error("Get producer failed!", e); + } + } catch (InterruptedException e) { + LOGGER.info("Thread {} has been interrupted!", Thread.currentThread().getName()); + return; + } catch (Throwable t) { + LOGGER.error("error while sending event", t); + resendEvent(es); + } + } + } + } +} diff --git a/tubemq-connectors/tubemq-connector-flume/src/test/java/org/apache/flume/sink/tubemq/TestTubemqSink.java b/tubemq-connectors/tubemq-connector-flume/src/test/java/org/apache/flume/sink/tubemq/TestTubemqSink.java new file mode 100644 index 0000000..4de1203 --- /dev/null +++ b/tubemq-connectors/tubemq-connector-flume/src/test/java/org/apache/flume/sink/tubemq/TestTubemqSink.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.tubemq; + +import static org.apache.flume.sink.tubemq.ConfigOptions.MASTER_HOST_PORT_LIST; +import static org.apache.flume.sink.tubemq.ConfigOptions.TOPIC; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Sink; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.tubemq.client.config.TubeClientConfig; +import org.junit.Test; + + +public class TestTubemqSink { + + private Context prepareDefaultContext() { + // Prepares a default context with Kafka Server Properties + Context context = new Context(); + context.put(MASTER_HOST_PORT_LIST, "localhost:9092"); + return context; + } + + @Test + public void testTubeProperties() { + + TubemqSink tubemqSink = new TubemqSink(); + Context context = new Context(); + context.put(MASTER_HOST_PORT_LIST, "ip1:9092,ip2:9092"); + Configurables.configure(tubemqSink, context); + + TubeClientConfig config = tubemqSink.getClientConfig(); + + //check that we have defaults set + for (String host : config.getMasterInfo().getNodeHostPortList()) { + if (host.startsWith("ip1")) { + assertEquals("ip1:9092", host); + } else if (host.startsWith("ip2")) { + assertEquals("ip2:9092", host); + } else { + fail("config should contains host list"); + } + } + } + + @Test + public void testTubeSink() throws Exception { + TubemqSink tubeSink = new TubemqSink(); + Context context = prepareDefaultContext(); + Configurables.configure(tubeSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + tubeSink.setChannel(memoryChannel); + tubeSink.start(); + + + String msg = "default-topic-test"; + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes()); + Map<String, String> eventHeader = new HashMap<>(); + eventHeader.put(TOPIC, msg); + event.setHeaders(eventHeader); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = tubeSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (Exception ex) { + // ignore + } + + await().atMost(20, TimeUnit.SECONDS).until(() -> tubeSink.getCounter().getTubeSendCount() == 1); + assertEquals(1, tubeSink.getCounter().getTubeSendCount()); + + tubeSink.stop(); + } +} diff --git a/tubemq-connectors/tubemq-connector-flume/src/test/resources/log4j.properties b/tubemq-connectors/tubemq-connector-flume/src/test/resources/log4j.properties new file mode 100644 index 0000000..81e44d9 --- /dev/null +++ b/tubemq-connectors/tubemq-connector-flume/src/test/resources/log4j.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger = INFO, out +log4j.appender.out = org.apache.log4j.ConsoleAppender +log4j.appender.out.layout = org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n \ No newline at end of file