Repository: bahir-flink Updated Branches: refs/heads/master 7dce2db4a -> 1f839d510
[BAHIR-58] Add ActiveMQ connector Closes #3 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/1f839d51 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/1f839d51 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/1f839d51 Branch: refs/heads/master Commit: 1f839d510d3b166afa0dfd80ef3cc5f7d02b4f1e Parents: 7dce2db Author: Robert Metzger <[email protected]> Authored: Tue Aug 23 16:42:50 2016 +0200 Committer: Luciano Resende <[email protected]> Committed: Thu Aug 25 10:50:09 2016 -0700 ---------------------------------------------------------------------- .travis.yml | 2 +- flink-connector-activemq/pom.xml | 118 +++++++++ .../streaming/connectors/activemq/AMQSink.java | 175 ++++++++++++ .../connectors/activemq/AMQSinkConfig.java | 106 ++++++++ .../connectors/activemq/AMQSource.java | 248 +++++++++++++++++ .../connectors/activemq/AMQSourceConfig.java | 109 ++++++++ .../connectors/activemq/DestinationType.java | 26 ++ .../activemq/internal/AMQExceptionListener.java | 61 +++++ .../connectors/activemq/internal/AMQUtil.java | 53 ++++ .../activemq/internal/RunningChecker.java | 45 ++++ .../activemq/AMQExceptionListenerTest.java | 82 ++++++ .../connectors/activemq/AMQSinkTest.java | 173 ++++++++++++ .../connectors/activemq/AMQSourceTest.java | 264 +++++++++++++++++++ .../activemq/ActiveMQConnectorITCase.java | 259 ++++++++++++++++++ flink-connector-flume/pom.xml | 1 + pom.xml | 1 + 16 files changed, 1722 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 4c3ba91..fd3733a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ language: java env: - - FLINK_VERSION="1.0.3" +# - FLINK_VERSION="1.1.0" - FLINK_VERSION="1.1.1" jdk: http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml new file mode 100644 index 0000000..528b546 --- /dev/null +++ b/flink-connector-activemq/pom.xml @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink_parent_2.11</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-activemq_2.11</artifactId> + <name>flink-connector-activemq</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <activemq.version>5.14.0</activemq.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>${activemq.version}</version> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.11</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.11</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq.tooling</groupId> + <artifactId>activemq-junit</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>1.5.5</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>1.5.5</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java new file mode 100644 index 0000000..a494162 --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * <p> + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param <IN> type of input messages + */ +public class AMQSink<IN> extends RichSinkFunction<IN> { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + + // Factory that is used to create AMQ connection + private final ActiveMQConnectionFactory connectionFactory; + // Name of a queue or topic + private final String destinationName; + // Serialization scheme that is used to convert input message to bytes + private final SerializationSchema<IN> serializationSchema; + // Defines if persistent delivery in AMQ is used + private final boolean persistentDelivery; + // Type of AMQ destination (topic or a queue) + private final DestinationType destinationType; + // Throw exceptions or just log them + private boolean logFailuresOnly = false; + // Used to send messages + private transient MessageProducer producer; + // AMQ session + private transient Session session; + // AMQ connection + private transient Connection connection; + + /** + * Create AMQSink. + * + * @param config AMQSink configuration + */ + public AMQSink(AMQSinkConfig<IN> config) { + this.connectionFactory = config.getConnectionFactory(); + this.destinationName = config.getDestinationName(); + this.serializationSchema = config.getSerializationSchema(); + this.persistentDelivery = config.isPersistentDelivery(); + this.destinationType = config.getDestinationType(); + } + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = AMQUtil.getDestination(session, destinationType, destinationName); + + // Create a MessageProducer from the Session to the Topic or + // Queue + producer = session.createProducer(destination); + producer.setDeliveryMode(getDeliveryMode()); + } + + private int getDeliveryMode() { + if (persistentDelivery) { + return DeliveryMode.PERSISTENT; + } + + return DeliveryMode.NON_PERSISTENT; + } + + /** + * Called when new data arrives to the sink, and forwards it to RMQ. + * + * @param value + * The incoming data + */ + @Override + public void invoke(IN value) { + try { + byte[] bytes = serializationSchema.serialize(value); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(bytes); + producer.send(message); + } catch (JMSException e) { + if (logFailuresOnly) { + LOG.error("Failed to send message to ActiveMQ", e); + } else { + throw new RuntimeException("Failed to send message to ActiveMQ", e); + } + } + } + + @Override + public void close() { + RuntimeException t = null; + try { + session.close(); + } catch (JMSException e) { + if (logFailuresOnly) { + LOG.error("Failed to close ActiveMQ session", e); + } else { + t = new RuntimeException("Failed to close ActiveMQ session", e); + } + } + + try { + connection.close(); + } catch (JMSException e) { + if (logFailuresOnly) { + LOG.error("Failed to close ActiveMQ connection", e); + } else { + t = t == null ? new RuntimeException("Failed to close ActiveMQ session", e) + : t; + } + } + + if (t != null) { + throw t; + } + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java new file mode 100644 index 0000000..86254ff --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.Preconditions; + +/** + * Immutable configuration for AMQSink + * @param <IN> type of input messages in configured sink + */ +public class AMQSinkConfig<IN> { + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema<IN> serializationSchema; + private final boolean persistentDelivery; + private final DestinationType destinationType; + + public AMQSinkConfig(ActiveMQConnectionFactory connectionFactory, String queueName, + SerializationSchema<IN> serializationSchema, boolean persistentDelivery, + DestinationType destinationType) { + this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory"); + this.queueName = Preconditions.checkNotNull(queueName, "destinationName"); + this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema"); + this.persistentDelivery = persistentDelivery; + this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType"); + } + + public ActiveMQConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public String getDestinationName() { + return queueName; + } + + public SerializationSchema<IN> getSerializationSchema() { + return serializationSchema; + } + + public boolean isPersistentDelivery() { + return persistentDelivery; + } + + public DestinationType getDestinationType() { + return destinationType; + } + + + /** + * Builder for {@link AMQSinkConfig} + * @param <IN> type of input messages in configured sink + */ + public static class AMQSinkConfigBuilder<IN> { + private ActiveMQConnectionFactory connectionFactory; + private String destinationName; + private SerializationSchema<IN> serializationSchema; + private boolean persistentDelivery; + private DestinationType destinationType = DestinationType.QUEUE; + + public AMQSinkConfigBuilder<IN> setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { + this.connectionFactory = Preconditions.checkNotNull(connectionFactory); + return this; + } + + public AMQSinkConfigBuilder<IN> setDestinationName(String queueName) { + this.destinationName = Preconditions.checkNotNull(queueName); + return this; + } + + public AMQSinkConfigBuilder<IN> setSerializationSchema(SerializationSchema<IN> serializationSchema) { + this.serializationSchema = Preconditions.checkNotNull(serializationSchema); + return this; + } + + public AMQSinkConfigBuilder<IN> setPersistentDelivery(boolean persistentDelivery) { + this.persistentDelivery = persistentDelivery; + return this; + } + + public AMQSinkConfigBuilder<IN> setDestinationType(DestinationType destinationType) { + this.destinationType = Preconditions.checkNotNull(destinationType); + return this; + } + + public AMQSinkConfig<IN> build() { + return new AMQSinkConfig<IN>(connectionFactory, destinationName, serializationSchema, persistentDelivery, destinationType); + } + + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java new file mode 100644 index 0000000..49f2cf7 --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener; +import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil; +import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.HashMap; +import java.util.List; + +/** + * Source for reading messages from an ActiveMQ queue. + * <p> + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * This source is waiting for incoming messages from ActiveMQ and converts them from + * an array of bytes into an instance of the output type. If an incoming + * message is not a message with an array of bytes, this message is ignored + * and warning message is logged. + * + * If checkpointing is enabled AMQSink will not acknowledge received AMQ messages as they arrive, + * but will store them internally and will acknowledge a bulk of messages during checkpointing. + * + * @param <OUT> type of output messages + */ +public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String> + implements ResultTypeQueryable<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class); + + // Factory that is used to create AMQ connection + private final ActiveMQConnectionFactory connectionFactory; + // Name of a queue or topic + private final String destinationName; + // Deserialization scheme that is used to convert bytes to output message + private final DeserializationSchema<OUT> deserializationSchema; + // Type of AMQ destination (topic or a queue) + private final DestinationType destinationType; + // Throw exceptions or just log them + private boolean logFailuresOnly = false; + // Stores if source is running (used for testing) + private RunningChecker runningChecker; + // AMQ connection + private transient Connection connection; + // AMQ session + private transient Session session; + // Used to receive incoming messages + private transient MessageConsumer consumer; + // If source should immediately acknowledge incoming message + private boolean autoAck; + // Map of message ids to currently unacknowledged AMQ messages + private HashMap<String, Message> unacknowledgedMessages = new HashMap<>(); + // Listener for AMQ exceptions + private AMQExceptionListener exceptionListener; + + /** + * Create AMQSource. + * + * @param config AMQSource configuration + */ + AMQSource(AMQSourceConfig<OUT> config) { + super(String.class); + this.connectionFactory = config.getConnectionFactory(); + this.destinationName = config.getDestinationName(); + this.deserializationSchema = config.getDeserializationSchema(); + this.runningChecker = config.getRunningChecker(); + this.destinationType = config.getDestinationType(); + } + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + // Visible for testing + void setExceptionListener(AMQExceptionListener exceptionListener) { + this.exceptionListener = exceptionListener; + } + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + exceptionListener = new AMQExceptionListener(LOG, logFailuresOnly); + connection.setExceptionListener(exceptionListener); + + RuntimeContext runtimeContext = getRuntimeContext(); + int acknowledgeType; + if (runtimeContext instanceof StreamingRuntimeContext + && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) { + autoAck = false; + acknowledgeType = ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; + } else { + autoAck = true; + acknowledgeType = ActiveMQSession.AUTO_ACKNOWLEDGE; + } + // Create a Session + session = connection.createSession(false, acknowledgeType); + + // Create the destination (Topic or Queue) + Destination destination = AMQUtil.getDestination(session, destinationType, destinationName); + + // Create a MessageConsumer from the Session to the Topic or + // Queue + consumer = session.createConsumer(destination); + runningChecker.setIsRunning(true); + } + + @Override + public void close() throws Exception { + super.close(); + RuntimeException exception = null; + try { + consumer.close(); + } catch (JMSException e) { + if (logFailuresOnly) { + LOG.error("Failed to close ActiveMQ session", e); + } else { + exception = new RuntimeException("Failed to close ActiveMQ consumer", e); + } + } + try { + session.close(); + } catch (JMSException e) { + if (logFailuresOnly) { + LOG.error("Failed to close ActiveMQ session", e); + } else { + exception = exception == null ? new RuntimeException("Failed to close ActiveMQ session", e) + : exception; + } + + } + try { + connection.close(); + } catch (JMSException e) { + if (logFailuresOnly) { + LOG.error("Failed to close ActiveMQ session", e); + } else { + exception = exception == null ? new RuntimeException("Failed to close ActiveMQ connection", e) + : exception; + } + } + + if (exception != null) { + throw exception; + } + } + + @Override + protected void acknowledgeIDs(long checkpointId, List<String> UIds) { + try { + for (String messageId : UIds) { + Message unacknowledgedMessage = unacknowledgedMessages.get(messageId); + if (unacknowledgedMessage != null) { + unacknowledgedMessage.acknowledge(); + unacknowledgedMessages.remove(messageId); + } else { + LOG.warn("Tried to acknowledge unknown ActiveMQ message id: {}", messageId); + } + } + } catch (JMSException e) { + if (logFailuresOnly) { + LOG.error("Failed to acknowledge ActiveMQ message"); + } else { + throw new RuntimeException("Failed to acknowledge ActiveMQ message"); + } + } + } + + @Override + public void run(SourceContext<OUT> ctx) throws Exception { + while (runningChecker.isRunning()) { + exceptionListener.checkErroneous(); + + Message message = consumer.receive(1000); + if (! (message instanceof BytesMessage)) { + LOG.warn("Active MQ source received non bytes message: {}"); + return; + } + BytesMessage bytesMessage = (BytesMessage) message; + byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(bytes); + OUT value = deserializationSchema.deserialize(bytes); + synchronized (ctx.getCheckpointLock()) { + ctx.collect(value); + if (!autoAck) { + addId(bytesMessage.getJMSMessageID()); + unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage); + } + } + } + } + + @Override + public void cancel() { + runningChecker.setIsRunning(false); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java new file mode 100644 index 0000000..2dcb2cb --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.util.Preconditions; + +/** + * Immutable AMQ source config. + * + * @param <OUT> type of source output messages + */ +public class AMQSourceConfig<OUT> { + + private final ActiveMQConnectionFactory connectionFactory; + private final String destinationName; + private final DeserializationSchema<OUT> deserializationSchema; + private final RunningChecker runningChecker; + private final DestinationType destinationType; + + AMQSourceConfig(ActiveMQConnectionFactory connectionFactory, String destinationName, + DeserializationSchema<OUT> deserializationSchema, RunningChecker runningChecker, + DestinationType destinationType) { + this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory"); + this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName"); + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema"); + this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker"); + this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType"); + } + + public ActiveMQConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public String getDestinationName() { + return destinationName; + } + + public DeserializationSchema<OUT> getDeserializationSchema() { + return deserializationSchema; + } + + public RunningChecker getRunningChecker() { + return runningChecker; + } + + public DestinationType getDestinationType() { + return destinationType; + } + + /** + * Builder for {@link AMQSourceConfig} + * + * @param <OUT> type of source output messages + */ + public static class AMQSourceConfigBuilder<OUT> { + private ActiveMQConnectionFactory connectionFactory; + private String destinationName; + private DeserializationSchema<OUT> deserializationSchema; + private RunningChecker runningChecker = new RunningChecker(); + private DestinationType destinationType = DestinationType.QUEUE; + + public AMQSourceConfigBuilder<OUT> setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { + this.connectionFactory = Preconditions.checkNotNull(connectionFactory); + return this; + } + + public AMQSourceConfigBuilder<OUT> setDestinationName(String destinationName) { + this.destinationName = Preconditions.checkNotNull(destinationName); + return this; + } + + public AMQSourceConfigBuilder<OUT> setDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) { + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema); + return this; + } + + public AMQSourceConfigBuilder<OUT> setRunningChecker(RunningChecker runningChecker) { + this.runningChecker = Preconditions.checkNotNull(runningChecker); + return this; + } + + public AMQSourceConfigBuilder<OUT> setDestinationType(DestinationType destinationType) { + this.destinationType = Preconditions.checkNotNull(destinationType); + return this; + } + + public AMQSourceConfig<OUT> build() { + return new AMQSourceConfig<OUT>(connectionFactory, destinationName, deserializationSchema, runningChecker, destinationType); + } + + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java new file mode 100644 index 0000000..fa111a4 --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/DestinationType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +/** + * Type of AMQ destination + */ +public enum DestinationType { + QUEUE, + TOPIC +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java new file mode 100644 index 0000000..94fcd56 --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQExceptionListener.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq.internal; + +import org.slf4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + +public class AMQExceptionListener implements ExceptionListener { + + private final boolean logFailuresOnly; + private final Logger logger; + private JMSException exception; + + public AMQExceptionListener(Logger logger, boolean logFailuresOnly) { + this.logger = logger; + this.logFailuresOnly = logFailuresOnly; + } + + @Override + public void onException(JMSException e) { + this.exception = e; + } + + /** + * Check if the listener received an asynchronous exception. Throws an exception if it was + * received and if logFailuresOnly was set to true. Resets the state after the call + * so a single exception can be thrown only once. + * + * @throws JMSException if exception was received and logFailuresOnly was set to true. + */ + public void checkErroneous() throws JMSException { + if (exception == null) { + return; + } + + JMSException recordedException = exception; + exception = null; + if (logFailuresOnly) { + logger.error("Received ActiveMQ exception", recordedException); + } else { + throw recordedException; + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java new file mode 100644 index 0000000..e5ae524 --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/AMQUtil.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq.internal; + +import org.apache.flink.streaming.connectors.activemq.DestinationType; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * Utilities for AMQ connector + */ +public class AMQUtil { + private AMQUtil() {} + + /** + * Create ActiveMQ destination (queue or topic). + * + * @param session AMQ session + * @param destinationType destination type to create + * @param destinationName name of the destination + * @return created destination + * @throws JMSException + */ + public static Destination getDestination(Session session, DestinationType destinationType, + String destinationName) throws JMSException { + switch (destinationType) { + case QUEUE: + return session.createQueue(destinationName); + case TOPIC: + return session.createTopic(destinationName); + default: + throw new IllegalArgumentException("Unknown destination type: " + destinationType); + } + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java new file mode 100644 index 0000000..8c46695 --- /dev/null +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/internal/RunningChecker.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq.internal; + +import java.io.Serializable; + +/** + * Class that is used to store current status of source execution + */ +public class RunningChecker implements Serializable { + private volatile boolean isRunning = false; + + /** + * Check if source should run. + * + * @return true if source should run, false otherwise + */ + public boolean isRunning() { + return isRunning; + } + + /** + * Set if source should run. + * + * @param isRunning true if source should run, false otherwise + */ + public void setIsRunning(boolean isRunning) { + this.isRunning = isRunning; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java new file mode 100644 index 0000000..81bb926 --- /dev/null +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQExceptionListenerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener; +import org.junit.Test; +import org.slf4j.Logger; + +import javax.jms.JMSException; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class AMQExceptionListenerTest { + @Test + public void logMessageOnException() throws JMSException { + Logger logger = mock(Logger.class); + AMQExceptionListener listener = new AMQExceptionListener(logger, true); + JMSException exception = new JMSException("error"); + listener.onException(exception); + listener.checkErroneous(); + verify(logger).error("Received ActiveMQ exception", exception); + } + + @Test + public void logMessageWrittenOnlyOnce() throws JMSException { + Logger logger = mock(Logger.class); + AMQExceptionListener listener = new AMQExceptionListener(logger, true); + JMSException exception = new JMSException("error"); + listener.onException(exception); + listener.checkErroneous(); + listener.checkErroneous(); + verify(logger, times(1)).error("Received ActiveMQ exception", exception); + } + + @Test(expected = JMSException.class) + public void throwException() throws JMSException { + Logger logger = mock(Logger.class); + AMQExceptionListener listener = new AMQExceptionListener(logger, false); + listener.onException(new JMSException("error")); + listener.checkErroneous(); + } + + @Test + public void throwExceptionOnlyOnce() throws JMSException { + Logger logger = mock(Logger.class); + AMQExceptionListener listener = new AMQExceptionListener(logger, false); + listener.onException(new JMSException("error")); + + try { + listener.checkErroneous(); + } catch (JMSException ignore) { + // ignore + } + listener.checkErroneous(); + } + + @Test + public void logMessageNotWrittenIfNoException() throws JMSException { + Logger logger = mock(Logger.class); + AMQExceptionListener listener = new AMQExceptionListener(logger, false); + listener.checkErroneous(); + verify(logger, times(0)).error(any(String.class), any(Throwable.class)); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java new file mode 100644 index 0000000..b9ecfd8 --- /dev/null +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AMQSinkTest { + + private final String DESTINATION_NAME = "queue"; + + private ActiveMQConnectionFactory connectionFactory; + private MessageProducer producer; + private Session session; + private Connection connection; + private Destination destination; + private BytesMessage message; + + private AMQSink<String> amqSink; + private SerializationSchema<String> serializationSchema; + + @Before + public void before() throws Exception { + connectionFactory = mock(ActiveMQConnectionFactory.class); + producer = mock(MessageProducer.class); + session = mock(Session.class); + connection = mock(Connection.class); + destination = mock(Destination.class); + message = mock(BytesMessage.class); + + when(connectionFactory.createConnection()).thenReturn(connection); + when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session); + when(session.createProducer(null)).thenReturn(producer); + when(session.createBytesMessage()).thenReturn(message); + serializationSchema = new SimpleStringSchema(); + + AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>() + .setConnectionFactory(connectionFactory) + .setDestinationName(DESTINATION_NAME) + .setSerializationSchema(serializationSchema) + .build(); + amqSink = new AMQSink<>(config); + amqSink.open(new Configuration()); + } + + @Test + public void messageSentToProducer() throws Exception { + byte[] expectedMessage = serializationSchema.serialize("msg"); + amqSink.invoke("msg"); + + verify(producer).send(message); + verify(message).writeBytes(expectedMessage); + } + + @Test + public void setPersistentDeliveryMode() throws Exception { + AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>() + .setConnectionFactory(connectionFactory) + .setDestinationName(DESTINATION_NAME) + .setSerializationSchema(serializationSchema) + .setPersistentDelivery(true) + .build(); + amqSink = new AMQSink<>(config); + amqSink.open(new Configuration()); + verify(producer).setDeliveryMode(DeliveryMode.PERSISTENT); + } + + @Test + public void writeToTopic() throws Exception { + AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>() + .setConnectionFactory(connectionFactory) + .setDestinationName(DESTINATION_NAME) + .setSerializationSchema(serializationSchema) + .setDestinationType(DestinationType.TOPIC) + .build(); + amqSink = new AMQSink<>(config); + amqSink.open(new Configuration()); + verify(session).createTopic(DESTINATION_NAME); + } + + @Test + public void exceptionOnSendAreNotThrown() throws Exception { + when(session.createBytesMessage()).thenThrow(JMSException.class); + amqSink.setLogFailuresOnly(true); + + amqSink.invoke("msg"); + } + + @Test(expected = RuntimeException.class) + public void exceptionOnSendAreThrownByDefault() throws Exception { + when(session.createBytesMessage()).thenThrow(JMSException.class); + + amqSink.invoke("msg"); + } + + @Test + public void sessionAndConnectionAreClosed() throws Exception { + amqSink.close(); + verify(session).close(); + verify(connection).close(); + } + + @Test + public void connectionCloseExceptionIsIgnored() throws Exception { + doThrow(new JMSException("session")).when(session).close(); + doThrow(new JMSException("connection")).when(connection).close(); + + try { + amqSink.close(); + fail("Should throw an exception"); + } catch (RuntimeException ex) { + assertEquals("session", ex.getCause().getMessage()); + } + } + + @Test + public void connectionCloseExceptionIsPassed() throws Exception { + doThrow(new JMSException("connection")).when(connection).close(); + + try { + amqSink.close(); + fail("Should throw an exception"); + } catch (RuntimeException ex) { + assertEquals("connection", ex.getCause().getMessage()); + } + } + + @Test + public void exceptionDuringCloseAsIgnored() throws Exception { + doThrow(new JMSException("session")).when(session).close(); + doThrow(new JMSException("connection")).when(connection).close(); + + amqSink.setLogFailuresOnly(true); + amqSink.close(); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java new file mode 100644 index 0000000..05d0d60 --- /dev/null +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener; +import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import scala.Array; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AMQSourceTest { + + private static final long CHECKPOINT_ID = 1; + private final String DESTINATION_NAME = "queue"; + private final String MSG_ID = "msgId"; + + private ActiveMQConnectionFactory connectionFactory; + private Session session; + private Connection connection; + private Destination destination; + private MessageConsumer consumer; + private BytesMessage message; + + private AMQSource<String> amqSource; + private SimpleStringSchema deserializationSchema; + SourceFunction.SourceContext<String> context; + + @Before + public void before() throws Exception { + connectionFactory = mock(ActiveMQConnectionFactory.class); + session = mock(Session.class); + connection = mock(Connection.class); + destination = mock(Destination.class); + consumer = mock(MessageConsumer.class); + context = mock(SourceFunction.SourceContext.class); + + message = mock(BytesMessage.class); + + when(connectionFactory.createConnection()).thenReturn(connection); + when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session); + when(consumer.receive(anyInt())).thenReturn(message); + when(session.createConsumer(any(Destination.class))).thenReturn(consumer); + when(context.getCheckpointLock()).thenReturn(new Object()); + when(message.getJMSMessageID()).thenReturn(MSG_ID); + + deserializationSchema = new SimpleStringSchema(); + AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>() + .setConnectionFactory(connectionFactory) + .setDestinationName(DESTINATION_NAME) + .setDeserializationSchema(deserializationSchema) + .setRunningChecker(new SingleLoopRunChecker()) + .build(); + amqSource = new AMQSource<>(config); + amqSource.setRuntimeContext(createRuntimeContext()); + amqSource.open(new Configuration()); + } + + private RuntimeContext createRuntimeContext() { + StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class); + when(runtimeContext.isCheckpointingEnabled()).thenReturn(true); + return runtimeContext; + } + + @Test + public void readFromTopic() throws Exception { + AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>() + .setConnectionFactory(connectionFactory) + .setDestinationName(DESTINATION_NAME) + .setDeserializationSchema(deserializationSchema) + .setDestinationType(DestinationType.TOPIC) + .setRunningChecker(new SingleLoopRunChecker()) + .build(); + amqSource = new AMQSource<>(config); + amqSource.setRuntimeContext(createRuntimeContext()); + amqSource.open(new Configuration()); + verify(session).createTopic(DESTINATION_NAME); + } + + @Test + public void parseReceivedMessage() throws Exception { + final byte[] bytes = deserializationSchema.serialize("msg"); + when(message.getBodyLength()).thenReturn((long) bytes.length); + when(message.readBytes(any(byte[].class))).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + byte[] inputBytes = (byte[]) invocationOnMock.getArguments()[0]; + Array.copy(bytes, 0, inputBytes, 0, bytes.length); + return null; + } + }); + + amqSource.run(context); + + verify(context).collect("msg"); + } + + @Test + public void acknowledgeReceivedMessage() throws Exception { + amqSource.run(context); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + + verify(message).acknowledge(); + } + + @Test + public void handleUnknownIds() throws Exception { + amqSource.run(context); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList("unknown-id")); + + verify(message, never()).acknowledge(); + } + + @Test + public void doNotAcknowledgeMessageTwice() throws Exception { + amqSource.run(context); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + + verify(message, times(1)).acknowledge(); + } + + @Test(expected = JMSException.class) + public void propagateAsyncException() throws Exception { + AMQExceptionListener exceptionListener = mock(AMQExceptionListener.class); + amqSource.setExceptionListener(exceptionListener); + doThrow(JMSException.class).when(exceptionListener).checkErroneous(); + amqSource.run(context); + } + + @Test(expected = RuntimeException.class) + public void throwAcknowledgeExceptionByDefault() throws Exception { + doThrow(JMSException.class).when(message).acknowledge(); + + amqSource.run(context); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + } + + @Test + public void doNotThrowAcknowledgeExceptionByDefault() throws Exception { + amqSource.setLogFailuresOnly(true); + + doThrow(JMSException.class).when(message).acknowledge(); + + amqSource.run(context); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + } + + @Test + public void closeResources() throws Exception { + amqSource.close(); + + verify(consumer).close(); + verify(session).close(); + verify(connection).close(); + } + + @Test + public void consumerCloseExceptionShouldBePased() throws Exception { + doThrow(new JMSException("consumer")).when(consumer).close(); + doThrow(new JMSException("session")).when(session).close(); + doThrow(new JMSException("connection")).when(connection).close(); + + try { + amqSource.close(); + fail("Should throw an exception"); + } catch (RuntimeException ex) { + assertEquals("consumer", ex.getCause().getMessage()); + } + } + + @Test + public void sessionCloseExceptionShouldBePased() throws Exception { + doThrow(new JMSException("session")).when(session).close(); + doThrow(new JMSException("connection")).when(connection).close(); + + try { + amqSource.close(); + fail("Should throw an exception"); + } catch (RuntimeException ex) { + assertEquals("session", ex.getCause().getMessage()); + } + } + + @Test + public void connectionCloseExceptionShouldBePased() throws Exception { + doThrow(new JMSException("connection")).when(connection).close(); + + try { + amqSource.close(); + fail("Should throw an exception"); + } catch (RuntimeException ex) { + assertEquals("connection", ex.getCause().getMessage()); + } + } + + @Test + public void exceptionsShouldNotBePassedIfLogFailuresOnly() throws Exception { + doThrow(new JMSException("consumer")).when(consumer).close(); + doThrow(new JMSException("session")).when(session).close(); + doThrow(new JMSException("connection")).when(connection).close(); + + amqSource.setLogFailuresOnly(true); + amqSource.close(); + } + + class SingleLoopRunChecker extends RunningChecker { + + int count = 0; + + @Override + public boolean isRunning() { + return (count++ == 0); + } + + @Override + public void setIsRunning(boolean isRunning) { + + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java new file mode 100644 index 0000000..985e06d --- /dev/null +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashSet; +import java.util.Random; + +import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class ActiveMQConnectorITCase { + + public static final int MESSAGES_NUM = 10000; + public static final String QUEUE_NAME = "queue"; + public static final String TOPIC_NAME = "topic"; + private static ForkableFlinkMiniCluster flink; + private static int flinkPort; + + @BeforeClass + public static void beforeClass() { + // start also a re-usable Flink mini cluster + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + flink = new ForkableFlinkMiniCluster(flinkConfig, false); + flink.start(); + + flinkPort = flink.getLeaderRPCPort(); + } + + @AfterClass + public static void afterClass() { + flinkPort = -1; + if (flink != null) { + flink.shutdown(); + } + } + + @Test + public void amqTopologyWithQueue() throws Exception { + StreamExecutionEnvironment env = createExecutionEnvironment(); + AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>() + .setConnectionFactory(createConnectionFactory()) + .setDestinationName(QUEUE_NAME) + .setSerializationSchema(new SimpleStringSchema()) + .build(); + createProducerTopology(env, sinkConfig); + + ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory(); + AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>() + .setConnectionFactory(sourceConnectionFactory) + .setDestinationName(QUEUE_NAME) + .setDeserializationSchema(new SimpleStringSchema()) + .build(); + createConsumerTopology(env, sourceConfig); + + tryExecute(env, "AMQTest"); + } + + @Test + public void amqTopologyWithTopic() throws Exception { + StreamExecutionEnvironment env = createExecutionEnvironment(); + AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>() + .setConnectionFactory(createConnectionFactory()) + .setDestinationName(TOPIC_NAME) + .setSerializationSchema(new SimpleStringSchema()) + .setDestinationType(DestinationType.TOPIC) + .build(); + createProducerTopology(env, sinkConfig); + + ActiveMQConnectionFactory sourceConnectionFactory = createConnectionFactory(); + AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>() + .setConnectionFactory(sourceConnectionFactory) + .setDestinationName(TOPIC_NAME) + .setDeserializationSchema(new SimpleStringSchema()) + .setDestinationType(DestinationType.TOPIC) + .build(); + createConsumerTopology(env, sourceConfig); + + tryExecute(env, "AMQTest"); + } + + private StreamExecutionEnvironment createExecutionEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + return env; + } + + private ActiveMQConnectionFactory createConnectionFactory() { + return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + } + + private void createProducerTopology(StreamExecutionEnvironment env, AMQSinkConfig<String> config) { + DataStreamSource<String> stream = env.addSource(new SourceFunction<String>() { + @Override + public void run(SourceContext<String> ctx) throws Exception { + for (int i = 0; i < MESSAGES_NUM; i++) { + ctx.collect("amq-" + i); + } + } + + @Override + public void cancel() {} + }); + + + AMQSink<String> sink = new AMQSink<>(config); + stream.addSink(sink); + } + + private void createConsumerTopology(StreamExecutionEnvironment env, AMQSourceConfig<String> config) { + AMQSource<String> source = new AMQSource<>(config); + + env.addSource(source) + .addSink(new SinkFunction<String>() { + final HashSet<Integer> set = new HashSet<>(); + @Override + public void invoke(String value) throws Exception { + int val = Integer.parseInt(value.split("-")[1]); + set.add(val); + + if (set.size() == MESSAGES_NUM) { + throw new SuccessException(); + } + } + }); + } + + @Test + public void amqTopologyWithCheckpointing() throws Exception { + ActiveMQConnectionFactory connectionFactory = createConnectionFactory(); + AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>() + .setConnectionFactory(connectionFactory) + .setDestinationName("queue2") + .setSerializationSchema(new SimpleStringSchema()) + .build(); + AMQSink<String> sink = new AMQSink<>(sinkConfig); + sink.open(new Configuration()); + + for (int i = 0; i < MESSAGES_NUM; i++) { + sink.invoke("amq-" + i); + } + + AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>() + .setConnectionFactory(connectionFactory) + .setDestinationName("queue2") + .setDeserializationSchema(new SimpleStringSchema()) + .build(); + + final AMQSource<String> source = new AMQSource<>(sourceConfig); + RuntimeContext runtimeContext = createMockRuntimeContext(); + source.setRuntimeContext(runtimeContext); + source.open(new Configuration()); + + final TestSourceContext sourceContext = new TestSourceContext(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + source.run(sourceContext); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + thread.start(); + + Deadline deadline = FiniteDuration.apply(5, "s").fromNow(); + while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) { + Thread.sleep(100); + Random random = new Random(); + long checkpointId = random.nextLong(); + synchronized (sourceContext.getCheckpointLock()) { + source.snapshotState(checkpointId, System.currentTimeMillis()); + source.notifyCheckpointComplete(checkpointId); + } + } + assertEquals(MESSAGES_NUM, sourceContext.getIdsNum()); + } + + private RuntimeContext createMockRuntimeContext() { + StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class); + when(runtimeContext.isCheckpointingEnabled()).thenReturn(true); + return runtimeContext; + } + + class TestSourceContext implements SourceFunction.SourceContext<String> { + + private HashSet<Integer> ids = new HashSet<>(); + private Object contextLock = new Object(); + @Override + public void collect(String value) { + int val = Integer.parseInt(value.split("-")[1]); + ids.add(val); + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { } + + @Override + public void emitWatermark(Watermark mark) { } + + @Override + public Object getCheckpointLock() { + return contextLock; + } + + @Override + public void close() { } + + public int getIdsNum() { + synchronized (contextLock) { + return ids.size(); + } + } + }; +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/flink-connector-flume/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml index f8b98f1..8551abd 100644 --- a/flink-connector-flume/pom.xml +++ b/flink-connector-flume/pom.xml @@ -152,6 +152,7 @@ under the License. <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. --> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> <executions> <execution> <id>shade-flink</id> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/1f839d51/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 01fb4fc..31af3a3 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ <modules> <module>flink-connector-redis</module> <module>flink-connector-flume</module> + <module>flink-connector-activemq</module> </modules> <properties>
