Repository: apex-malhar Updated Branches: refs/heads/master 5972bca41 -> 37cb58484
JMS Input operator changes to support SQS and ActiveMQ Incorporate comments: javadocs, NotNull constraint for the builder and create function to return the builder ref Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d46a6be6 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d46a6be6 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d46a6be6 Branch: refs/heads/master Commit: d46a6be62d2b70bb99ab7ca02d0a87ea2d5eba38 Parents: 7dea3d0 Author: Sanjay Pujare <[email protected]> Authored: Mon Aug 8 12:40:05 2016 -0700 Committer: Sanjay Pujare <[email protected]> Committed: Tue Aug 9 16:38:13 2016 -0700 ---------------------------------------------------------------------- library/pom.xml | 34 ++- .../lib/io/jms/AbstractJMSInputOperator.java | 10 + .../com/datatorrent/lib/io/jms/JMSBase.java | 214 +++++++++++++-- .../lib/io/jms/JMSObjectInputOperatorTest.java | 1 + .../lib/io/jms/JMSStringInputOperatorTest.java | 2 + .../lib/io/jms/SQSStringInputOperatorTest.java | 262 +++++++++++++++++++ .../com/datatorrent/lib/io/jms/SQSTestBase.java | 187 +++++++++++++ .../src/test/resources/sqsdevCreds.properties | 21 ++ .../src/test/resources/sqstestCreds.properties | 21 ++ 9 files changed, 719 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index 8d264a4..8fe30d0 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -306,12 +306,18 @@ <artifactId>janino</artifactId> <version>2.7.8</version> <scope>test</scope> - </dependency> - <dependency> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.8.5</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> @@ -346,9 +352,27 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>2.5.4</version> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-sqs</artifactId> + <version>1.10.73</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.5.4</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>amazon-sqs-java-messaging-lib</artifactId> + <version>1.0.0</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-sqs</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index cc27c88..bf0fe5c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -514,6 +514,16 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase return this.windowDataManager; } + /** + * Sets this transacted value + * + * @param value new value for transacted + */ + public void setTransacted(boolean value) + { + transacted = value; + } + protected abstract void emit(T payload); public static enum CounterKeys http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java index 48ed2c3..772464a 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java @@ -25,6 +25,7 @@ import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; +import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,8 @@ import org.apache.commons.beanutils.BeanUtils; import com.google.common.collect.Maps; +import com.datatorrent.netlet.util.DTThrowable; + /** * Base class for any JMS input or output adapter operator. * <p/> @@ -76,11 +79,11 @@ public class JMSBase private transient Session session; private transient Destination destination; - private String connectionFactoryClass; - private Map<String, String> connectionFactoryProperties = Maps.newHashMap(); + @NotNull + private ConnectionFactoryBuilder connectionFactoryBuilder; private String ackMode = "CLIENT_ACKNOWLEDGE"; - private String clientId = "TestClient"; - private String subject = "TEST.FOO"; + private String clientId; + private String subject; private int batch = 10; private int messageSize = 255; private boolean durable = false; @@ -89,6 +92,105 @@ public class JMSBase protected boolean transacted = true; /** + * Builder class that allows caller to build the connection factory (optional) + * + */ + public static interface ConnectionFactoryBuilder + { + + /** + * This method is called by the operator to return properly built + * (authenticated, connected etc) connection factory + * + * @return properly built connection factory + */ + public ConnectionFactory buildConnectionFactory(); + } + + /** + * Default implementation for {@link ConnectionFactoryBuilder} that works for ActiveMQ + * + * + */ + public static class DefaultConnectionFactoryBuilder implements ConnectionFactoryBuilder + { + protected String connectionFactoryClass; + + @NotNull + protected Map<String, String> connectionFactoryProperties = Maps.newHashMap(); + + /** + * Get properties used to configure this DefaultConnectionFactoryBuilder instance + * + * @return Map of properties + */ + public Map<String, String> getConnectionFactoryProperties() + { + return connectionFactoryProperties; + } + + /** + * Set properties used to configure this DefaultConnectionFactoryBuilder instance. + * Note: previous properties are overwritten. + * + * @param connectionFactoryProperties + */ + public void setConnectionFactoryProperties(Map<String, String> connectionFactoryProperties) + { + this.connectionFactoryProperties = connectionFactoryProperties; + } + + /** + * Get the fully qualified class-name of the connection factory that is used by this + * builder to instantiate the connection factory + * + * @return fully qualified class-name + */ + public String getConnectionFactoryClass() + { + return connectionFactoryClass; + } + + /** + * Set the fully qualified class-name of the connection factory that is used by this + * builder to instantiate the connection factory + * + * @param connectionFactoryClass fully qualified class-name + */ + public void setConnectionFactoryClass(String connectionFactoryClass) + { + this.connectionFactoryClass = connectionFactoryClass; + } + + @Override + public ConnectionFactory buildConnectionFactory() + { + ConnectionFactory cf; + try { + if (connectionFactoryClass != null) { + @SuppressWarnings("unchecked") + Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)Class.forName(connectionFactoryClass); + cf = clazz.newInstance(); + } else { + cf = new org.apache.activemq.ActiveMQConnectionFactory(); + } + BeanUtils.populate(cf, connectionFactoryProperties); + logger.debug("creation successful."); + return cf; + } catch (Exception e) { + DTThrowable.rethrow(e); + return null; // previous rethrow makes this redundant, but compiler doesn't know... + } + } + + @Override + public String toString() + { + return "DefaultConnectionFactoryBuilder [connectionFactoryProperties=" + connectionFactoryProperties + "]"; + } + } + + /** * @return the connection */ public Connection getConnection() @@ -111,15 +213,68 @@ public class JMSBase { return destination; } - + + /** + * gets the connection factory class-name used by the default connection factory builder + * + * @return connection factory class-name + */ public String getConnectionFactoryClass() { - return connectionFactoryClass; + if (connectionFactoryBuilder == null) { + connectionFactoryBuilder = createDefaultConnectionFactoryBuilderIfRequired(); + } + if (connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder) { + return ((DefaultConnectionFactoryBuilder)connectionFactoryBuilder).getConnectionFactoryClass(); + } else { + throw new UnsupportedOperationException("ConnectionFactoryBuilder does not support connectionFactoryClass"); + } } + /** + * if the existing connectionFactoryBuilder is not of type DefaultConnectionFactoryBuilder + * create one. + * + * @return the current DefaultConnectionFactoryBuilder value + */ + private DefaultConnectionFactoryBuilder createDefaultConnectionFactoryBuilderIfRequired() + { + if (!(connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder)) { + connectionFactoryBuilder = new DefaultConnectionFactoryBuilder(); + } + return (DefaultConnectionFactoryBuilder)connectionFactoryBuilder; + } + + /** + * Sets the connection factory class-name used by the default connection factory builder + * + * @param connectionFactoryClass factory class-name to be set + */ public void setConnectionFactoryClass(String connectionFactoryClass) { - this.connectionFactoryClass = connectionFactoryClass; + DefaultConnectionFactoryBuilder builder = + createDefaultConnectionFactoryBuilderIfRequired(); + builder.setConnectionFactoryClass(connectionFactoryClass); + } + + /** + * gets the connection factory builder of this instance + * + * @return connection factory builder + */ + public ConnectionFactoryBuilder getConnectionFactoryBuilder() + { + return connectionFactoryBuilder; + } + + /** + * Sets the connection factory builder of this instance + * + * @param connectionFactoryBuilder connection factory builder for this instance + */ + public void setConnectionFactoryBuilder(ConnectionFactoryBuilder connectionFactoryBuilder) + { + this.connectionFactoryBuilder = connectionFactoryBuilder; } /** @@ -129,12 +284,27 @@ public class JMSBase */ public Map<String, String> getConnectionFactoryProperties() { - return connectionFactoryProperties; + if (connectionFactoryBuilder == null) { + connectionFactoryBuilder = createDefaultConnectionFactoryBuilderIfRequired(); + } + if (connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder) { + return ((DefaultConnectionFactoryBuilder)connectionFactoryBuilder).getConnectionFactoryProperties(); + } else { + throw new UnsupportedOperationException("ConnectionFactoryBuilder does not support connectionFactoryProperties"); + } } + /** + * Sets the connection factory properties. Property names are provider specific and can be set directly from configuration, for example:<p> + * <code>dt.operator.JMSOper.connectionFactoryProperties.brokerURL=vm://localhost<code> + * + * @param connectionFactoryProperties reference to mutable properties + */ public void setConnectionFactoryProperties(Map<String, String> connectionFactoryProperties) { - this.connectionFactoryProperties = connectionFactoryProperties; + DefaultConnectionFactoryBuilder builder = + createDefaultConnectionFactoryBuilderIfRequired(); + builder.setConnectionFactoryProperties(connectionFactoryProperties); } /** @@ -143,7 +313,7 @@ public class JMSBase @Deprecated public void setUser(String user) { - this.connectionFactoryProperties.put("userName", user); + this.getConnectionFactoryProperties().put("userName", user); } /** @@ -152,7 +322,7 @@ public class JMSBase @Deprecated public void setPassword(String password) { - this.connectionFactoryProperties.put("password", password); + this.getConnectionFactoryProperties().put("password", password); } /** @@ -161,7 +331,7 @@ public class JMSBase @Deprecated public void setUrl(String url) { - this.connectionFactoryProperties.put("brokerURL", url); + this.getConnectionFactoryProperties().put("brokerURL", url); } /** @@ -355,22 +525,10 @@ public class JMSBase */ protected ConnectionFactory getConnectionFactory() { - logger.debug("class {} properties {}", connectionFactoryClass, connectionFactoryProperties); - ConnectionFactory cf; - try { - if (connectionFactoryClass != null) { - @SuppressWarnings("unchecked") - Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)Class.forName(connectionFactoryClass); - cf = clazz.newInstance(); - } else { - cf = new org.apache.activemq.ActiveMQConnectionFactory(); - } - BeanUtils.populate(cf, connectionFactoryProperties); - logger.debug("creation successful."); - return cf; - } catch (Exception e) { - throw new RuntimeException("Failed to create connection factory.", e); - } + logger.debug("connectionFactoryBuilder {}", "" + connectionFactoryBuilder); + + return connectionFactoryBuilder.buildConnectionFactory(); + } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java index 06e94c6..e4967ca 100644 --- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java @@ -81,6 +81,7 @@ public class JMSObjectInputOperatorTest context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap); operator = new JMSObjectInputOperator(); + operator.setSubject("TEST.FOO"); operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost"); sink = new CollectorTestSink<Object>(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java index 42f730c..82f1c67 100644 --- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java @@ -77,6 +77,7 @@ public class JMSStringInputOperatorTest context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap); operator = new JMSStringInputOperator(); + operator.setSubject("TEST.FOO"); operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost"); sink = new CollectorTestSink<>(); @@ -145,6 +146,7 @@ public class JMSStringInputOperatorTest throw new RuntimeException("fail ack"); } }; + testMeta.operator.setSubject("TEST.FOO"); testMeta.operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost"); testMeta.operator.setup(testMeta.context); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java new file mode 100644 index 0000000..53e787d --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.jms; + +import java.io.File; + +import javax.jms.ConnectionFactory; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.AssumptionViolatedException; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; + +import com.amazon.sqs.javamessaging.SQSConnectionFactory; +import com.amazonaws.auth.PropertiesFileCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Tests for {@link JMSStringInputOperator} for AMZ SQS. + * Note: for SQS we should use AckMode as "AUTO_ACKNOWLEDGE" and + * no transacted mode (transacted = false). + * + * Note: check the comment for com.amazon.sqs.javamessaging.SQSMessageConsumer.close() + * specifically: "Since consumer prefetch threads use SQS long-poll feature with 20 seconds + * timeout, closing each consumer prefetch thread can take up to 20 seconds, + * which in-turn will impact the time on consumer close." + * + * Because of the above this test takes a long time due to consumer.close() in + * com.datatorrent.lib.io.jms.AbstractJMSInputOperator.cleanup() + * + * NOTE: tests are automatically skipped if the secret key in sqstestCreds.properties + * is missing or blank. + * + * NOTE: each test creates its own uniquely named queue in SQS and then deletes it afterwards. + * Also we try to scrub any leftover queues from the previous runs just in case tests were + * aborted (check com.datatorrent.lib.io.jms.SQSTestBase.generateCurrentQueueName(String)) + * + */ +public class SQSStringInputOperatorTest +{ + public static class TestMeta extends TestWatcher + { + String baseDir; + JMSStringInputOperator operator; + CollectorTestSink<Object> sink; + Context.OperatorContext context; + SQSTestBase testBase; + + @Override + protected void starting(Description description) + { + final String methodName = description.getMethodName(); + final String className = description.getClassName(); + + testBase = new SQSTestBase(); + if (testBase.validateTestCreds() == false) { + return; + } + testBase.generateCurrentQueueName(methodName); + try { + testBase.beforTest(); + } catch (AssumptionViolatedException ave) { + throw ave; + } catch (Exception e) { + throw new RuntimeException(e); + } + + baseDir = "target/" + className + "/" + methodName; + + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500); + attributeMap.put(Context.DAGContext.APPLICATION_PATH, baseDir); + + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap); + operator = new JMSStringInputOperator(); + operator.setConnectionFactoryBuilder(new JMSBase.ConnectionFactoryBuilder() + { + + @Override + public ConnectionFactory buildConnectionFactory() + { + // Create the connection factory using the environment variable credential provider. + // Connections this factory creates can talk to the queues in us-east-1 region. + SQSConnectionFactory connectionFactory = + SQSConnectionFactory.builder() + .withRegion(Region.getRegion(Regions.US_EAST_1)) + .withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(testBase.getDevCredsFilePath())) + .build(); + return connectionFactory; + } + + @Override + public String toString() + { + return className + "/" + methodName + "/ConnectionFactoryBuilder"; + } + + }); + operator.setSubject(testBase.getCurrentQueueName()); + // for SQS ack mode should be "AUTO_ACKNOWLEDGE" and transacted = false + operator.setAckMode("AUTO_ACKNOWLEDGE"); + operator.setTransacted(false); + + sink = new CollectorTestSink<>(); + operator.output.setSink(sink); + operator.setup(context); + operator.activate(context); + } + + @Override + protected void finished(Description description) + { + if (operator == null) { + Assert.assertFalse(testBase.validateTestCreds()); + return; + } + operator.deactivate(); + operator.teardown(); + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + testBase.afterTest(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + + /** + * Basic string input test + * + * @throws Exception + */ + @Test + public void testStringMsgInput() throws Exception + { + testMeta.testBase.validateAssumption(); + testMeta.testBase.produceMsg("testStringMsgInput", 10, false); + Thread.sleep(1000); + testMeta.operator.emitTuples(); + Assert.assertEquals("num of messages", 10, testMeta.sink.collectedTuples.size()); + } + + + @Test + public void testRecoveryAndIdempotency() throws Exception + { + testMeta.testBase.validateAssumption(); + testMeta.testBase.produceUniqueMsgs("testRecoveryAndIdempotency", 25, false); + Thread.sleep(3000); + testMeta.operator.beginWindow(1); + testMeta.operator.emitTuples(); + testMeta.operator.endWindow(); + + Assert.assertEquals("num of messages in window 1 pre-failure", 25, testMeta.sink.collectedTuples.size()); + + // for some reason AMZ SQS doesn't preserve the order producer->consumer or we might have a race + // condition on our producer side, so we can't be sure that get(4) will return "4:..." + // In any case we will only check message matching between pre-failure + // and post-failure cases for 4th and 17th message + final String message4 = (String)testMeta.sink.collectedTuples.get(4); + final String message17 = (String)testMeta.sink.collectedTuples.get(17); + + //failure and then re-deployment of operator + testMeta.sink.collectedTuples.clear(); + testMeta.operator.setup(testMeta.context); + testMeta.operator.activate(testMeta.context); + + Assert.assertEquals("largest recovery window", 1, + testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); + + testMeta.operator.beginWindow(1); + testMeta.operator.endWindow(); + Assert.assertEquals("num of messages in window 1", 25, testMeta.sink.collectedTuples.size()); + Assert.assertEquals(message4, testMeta.sink.collectedTuples.get(4)); + Assert.assertEquals(message17, testMeta.sink.collectedTuples.get(17)); + testMeta.sink.collectedTuples.clear(); + } + + + /** + * This test is different from the one in JMSStringInputOperatorTest because of the differences + * in acknowledge mode. There is no Ack failure but rather failure in emit Tuple. But endWindow + * eventually drains the holdingBuffer and emits all the outstanding tuples so we see 9 tuples + * eventually. Because of the async nature of the operator (messages are delivered to the operator + * as an async listener) we can't be sure how many messages are present in testMeta.sink.collectedTuples + * before endWindow so our assertion is just testMeta.sink.collectedTuples.size() < 9 + * + * @throws Exception + */ + @Test + public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception + { + testMeta.testBase.validateAssumption(); + testMeta.sink = new CollectorTestSink<Object>() + { + @Override + public void put(Object payload) + { + if (payload instanceof String && ((String)payload).startsWith("4:")) { + throw new RuntimeException("fail 4th message"); + } + synchronized (collectedTuples) { + collectedTuples.add(payload); + collectedTuples.notifyAll(); + } + } + }; + testMeta.operator.output.setSink(testMeta.sink); + + testMeta.testBase.produceUniqueMsgs("testFailureAfterPersistenceAndBeforeRecovery", 10, false); + Thread.sleep(1000); + testMeta.operator.beginWindow(1); + try { + testMeta.operator.emitTuples(); + } catch (Throwable t) { + LOG.debug("emit exception"); + } + Assert.assertTrue("num of messages before endWindow 1", testMeta.sink.collectedTuples.size() < 9); + testMeta.operator.endWindow(); + Assert.assertEquals("num of messages after endWindow 1", 9, testMeta.sink.collectedTuples.size()); + + testMeta.operator.setup(testMeta.context); + testMeta.operator.activate(testMeta.context); + + Assert.assertEquals("window 1 should exist", 1, + testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); + } + + + private static final transient Logger LOG = LoggerFactory.getLogger(SQSStringInputOperatorTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java b/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java new file mode 100644 index 0000000..4dc63c3 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.jms; + +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; + +import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.auth.PropertiesFileCredentialsProvider; +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.CreateQueueResult; +import com.amazonaws.services.sqs.model.ListQueuesResult; +import com.amazonaws.services.sqs.model.PurgeQueueRequest; + +/** + * Base class for SQS tests. <br/> + * Various SQS (AWS) related helper functions + */ +public class SQSTestBase +{ + public static final String TEST_FOO = "TEST_FOO"; + + public PropertiesCredentials testCreds; + + /** + * creds to be used by the dev end eg. by the JMSInputOperator in SQS mode + */ + private static String SQSDEV_CREDS_FILENAME = "/sqsdevCreds.properties"; + + /** + * creds to be used by the test end eg. by SQSInputOperatorTest + */ + private static String SQSTEST_CREDS_FILENAME = "/sqstestCreds.properties"; + + private AmazonSQSClient sqs; + + private String currentQueueName; + private String currentQueueUrl; + + + public SQSTestBase() + { + PropertiesFileCredentialsProvider file = new PropertiesFileCredentialsProvider(getTestCredsFilePath()); + testCreds = (PropertiesCredentials)file.getCredentials(); + sqs = new AmazonSQSClient(testCreds); + } + + public String getTestCredsFilePath() + { + return getClass().getResource(SQSTEST_CREDS_FILENAME).getFile(); + } + + public String getDevCredsFilePath() + { + return getClass().getResource(SQSDEV_CREDS_FILENAME).getFile(); + } + + public String getCurrentQueueName() + { + return currentQueueName; + } + + public void setCurrentQueueName(String currentQueueName) + { + this.currentQueueName = currentQueueName; + } + + /** + * Each test creates its own uniquely named queue in SQS and then deletes it afterwards. + * We try to scrub any leftover queues from the previous runs just in case tests were + * aborted + * + * @param currentQueueNamePrefix + */ + public void generateCurrentQueueName(String currentQueueNamePrefix) + { + if (validateTestCreds()) { + ListQueuesResult list = sqs.listQueues(currentQueueNamePrefix); + for (String url : list.getQueueUrls()) { + sqs.deleteQueue(url); + } + } + this.currentQueueName = currentQueueNamePrefix + System.currentTimeMillis(); + } + + public void produceMsg(String[] msgs, boolean purgeFirst) throws Exception + { + CreateQueueResult res = sqs.createQueue(getCurrentQueueName()); + if (purgeFirst) { + PurgeQueueRequest purgeReq = new PurgeQueueRequest(res.getQueueUrl()); + sqs.purgeQueue(purgeReq); + } + for (String text : msgs) { + sqs.sendMessage(res.getQueueUrl(), text); + } + } + + /** + * + * @param text + * @throws Exception + */ + public void produceMsg(String text, boolean purgeFirst) throws Exception + { + produceMsg(new String[] {text}, purgeFirst); + } + + /** + * TODO: copy the logic of JMSTestBase.produceMsg + * + * @param text + * @throws Exception + */ + public void produceMsg(String text, int num, boolean purgeFirst) throws Exception + { + String[] array = new String[num]; + for (int i = 0; i < num; i++) { + array[i] = text; + } + produceMsg(array, purgeFirst); + } + + /** + * Produce unique messages + * + * @param text + * @throws Exception + */ + public void produceUniqueMsgs(String text, int num, boolean purgeFirst) throws Exception + { + String[] array = new String[num]; + for (int i = 0; i < num; i++) { + array[i] = "" + i + ":" + text; + } + produceMsg(array, purgeFirst); + } + + public boolean validateTestCreds() + { + return testCreds.getAWSSecretKey() != null && + testCreds.getAWSSecretKey().trim().isEmpty() == false; + } + + public void validateAssumption() + { + Assume.assumeTrue(validateTestCreds()); + } + + + /** + * create a queue we can use for testing + * + * @throws Exception + */ + @Before + public void beforTest() throws Exception + { + validateAssumption(); + // Create a queue + CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(getCurrentQueueName()); + currentQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl(); + } + + @After + public void afterTest() throws Exception + { + sqs.deleteQueue(currentQueueUrl); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/resources/sqsdevCreds.properties ---------------------------------------------------------------------- diff --git a/library/src/test/resources/sqsdevCreds.properties b/library/src/test/resources/sqsdevCreds.properties new file mode 100644 index 0000000..3ce01e4 --- /dev/null +++ b/library/src/test/resources/sqsdevCreds.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +accessKey= +secretKey= http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/resources/sqstestCreds.properties ---------------------------------------------------------------------- diff --git a/library/src/test/resources/sqstestCreds.properties b/library/src/test/resources/sqstestCreds.properties new file mode 100644 index 0000000..3ce01e4 --- /dev/null +++ b/library/src/test/resources/sqstestCreds.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +accessKey= +secretKey=
