NIFI-1000 addressed PR comments

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8699e351
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8699e351
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8699e351

Branch: refs/heads/master
Commit: 8699e351081984a39b1e91ea0c6d878490413274
Parents: ef0be5a
Author: Oleg Zhurakousky <[email protected]>
Authored: Tue Nov 10 11:52:24 2015 -0500
Committer: Oleg Zhurakousky <[email protected]>
Committed: Tue Nov 10 12:13:16 2015 -0500

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |  1 -
 .../nifi/processors/standard/JmsConsumer.java   |  6 ++--
 .../processors/standard/TestGetJMSQueue.java    | 37 +++++++++-----------
 3 files changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8699e351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 931b939..cc4023b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -153,7 +153,6 @@ language governing permissions and limitations under the 
License. -->
         <dependency>
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-all</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.jayway.jsonpath</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8699e351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index 461d381..d4e1969 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -21,6 +21,7 @@ import static 
org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CL
 import static 
org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
 import static 
org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
 import static 
org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
+import static 
org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
 import static 
org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
 import static 
org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
 import static 
org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
@@ -89,6 +90,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
         descriptors.add(USERNAME);
         descriptors.add(PASSWORD);
         descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(DESTINATION_TYPE);
         descriptors.add(ACKNOWLEDGEMENT_MODE);
         descriptors.add(MESSAGE_SELECTOR);
         descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
@@ -158,8 +160,8 @@ public abstract class JmsConsumer extends AbstractProcessor 
{
 
         stopWatch.stop();
         if (processingSummary.getFlowFilesCreated() > 0) {
-            final float secs = ((float) 
stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
-            float messagesPerSec = ((float) 
processingSummary.getMessagesReceived()) / secs;
+            final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 
1000F);
+            float messagesPerSec = (processingSummary.getMessagesReceived()) / 
secs;
             final String dataRate = 
stopWatch.calculateDataRate(processingSummary.getBytesReceived());
             logger.info("Received {} messages in {} milliseconds, at a rate of 
{} messages/sec or {}",
                     new Object[]{processingSummary.getMessagesReceived(), 
stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});

http://git-wip-us.apache.org/repos/asf/nifi/blob/8699e351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
index ecb04c1..dde1158 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
@@ -46,14 +46,13 @@ public class TestGetJMSQueue {
     @Test
     public void testSendTextToQueue() throws Exception {
         GetJMSQueue getJmsQueue = new GetJMSQueue();
-        StandardProcessorTestRunner runner = (StandardProcessorTestRunner) 
TestRunners.newTestRunner(getJmsQueue);
+        TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
         runner.setProperty(JmsProperties.JMS_PROVIDER, 
JmsProperties.ACTIVEMQ_PROVIDER);
         runner.setProperty(JmsProperties.URL, 
"vm://localhost?broker.persistent=false");
         runner.setProperty(JmsProperties.DESTINATION_TYPE, 
JmsProperties.DESTINATION_TYPE_QUEUE);
         runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
         runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, 
JmsProperties.ACK_MODE_AUTO);
 
-        MockProcessSession pSession = (MockProcessSession) 
runner.getProcessSessionFactory().createSession();
         WrappedMessageProducer wrappedProducer = 
JmsFactory.createMessageProducer(runner.getProcessContext(), true);
         final Session jmsSession = wrappedProducer.getSession();
         final MessageProducer producer = wrappedProducer.getProducer();
@@ -62,16 +61,15 @@ public class TestGetJMSQueue {
         producer.send(message);
         jmsSession.commit();
 
-        getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+        runner.run();
 
-        List<MockFlowFile> flowFiles = pSession
+        List<MockFlowFile> flowFiles = runner
                 .getFlowFilesForRelationship(new 
Relationship.Builder().name("success").build());
 
         assertTrue(flowFiles.size() == 1);
         MockFlowFile successFlowFile = flowFiles.get(0);
-        String receivedMessage = new 
String(runner.getContentAsByteArray(successFlowFile));
-        assertEquals("Hello World", receivedMessage);
-        assertEquals("queue.testing", 
successFlowFile.getAttribute("jms.JMSDestination"));
+        successFlowFile.assertContentEquals("Hello World");
+        successFlowFile.assertAttributeEquals("jms.JMSDestination", 
"queue.testing");
         producer.close();
         jmsSession.close();
     }
@@ -79,7 +77,7 @@ public class TestGetJMSQueue {
     @Test
     public void testSendBytesToQueue() throws Exception {
         GetJMSQueue getJmsQueue = new GetJMSQueue();
-        StandardProcessorTestRunner runner = (StandardProcessorTestRunner) 
TestRunners.newTestRunner(getJmsQueue);
+        TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
         runner.setProperty(JmsProperties.JMS_PROVIDER, 
JmsProperties.ACTIVEMQ_PROVIDER);
         runner.setProperty(JmsProperties.URL, 
"vm://localhost?broker.persistent=false");
         runner.setProperty(JmsProperties.DESTINATION_TYPE, 
JmsProperties.DESTINATION_TYPE_QUEUE);
@@ -88,23 +86,21 @@ public class TestGetJMSQueue {
         WrappedMessageProducer wrappedProducer = 
JmsFactory.createMessageProducer(runner.getProcessContext(), true);
         final Session jmsSession = wrappedProducer.getSession();
         final MessageProducer producer = wrappedProducer.getProducer();
-        MockProcessSession pSession = (MockProcessSession) 
runner.getProcessSessionFactory().createSession();
         final BytesMessage message = jmsSession.createBytesMessage();
         message.writeBytes("Hello Bytes".getBytes());
 
         producer.send(message);
         jmsSession.commit();
 
-        getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+        runner.run();
 
-        List<MockFlowFile> flowFiles = pSession
+        List<MockFlowFile> flowFiles = runner
                 .getFlowFilesForRelationship(new 
Relationship.Builder().name("success").build());
 
         assertTrue(flowFiles.size() == 1);
         MockFlowFile successFlowFile = flowFiles.get(0);
-        String receivedMessage = new 
String(runner.getContentAsByteArray(successFlowFile));
-        assertEquals("Hello Bytes", receivedMessage);
-        assertEquals("queue.testing", 
successFlowFile.getAttribute("jms.JMSDestination"));
+        successFlowFile.assertContentEquals("Hello Bytes");
+        successFlowFile.assertAttributeEquals("jms.JMSDestination", 
"queue.testing");
         producer.close();
         jmsSession.close();
     }
@@ -112,7 +108,7 @@ public class TestGetJMSQueue {
     @Test
     public void testSendStreamToQueue() throws Exception {
         GetJMSQueue getJmsQueue = new GetJMSQueue();
-        StandardProcessorTestRunner runner = (StandardProcessorTestRunner) 
TestRunners.newTestRunner(getJmsQueue);
+        TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
         runner.setProperty(JmsProperties.JMS_PROVIDER, 
JmsProperties.ACTIVEMQ_PROVIDER);
         runner.setProperty(JmsProperties.URL, 
"vm://localhost?broker.persistent=false");
         runner.setProperty(JmsProperties.DESTINATION_TYPE, 
JmsProperties.DESTINATION_TYPE_QUEUE);
@@ -121,23 +117,22 @@ public class TestGetJMSQueue {
         WrappedMessageProducer wrappedProducer = 
JmsFactory.createMessageProducer(runner.getProcessContext(), true);
         final Session jmsSession = wrappedProducer.getSession();
         final MessageProducer producer = wrappedProducer.getProducer();
-        MockProcessSession pSession = (MockProcessSession) 
runner.getProcessSessionFactory().createSession();
+
         final StreamMessage message = jmsSession.createStreamMessage();
         message.writeBytes("Hello Stream".getBytes());
 
         producer.send(message);
         jmsSession.commit();
 
-        getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+        runner.run();
 
-        List<MockFlowFile> flowFiles = pSession
+        List<MockFlowFile> flowFiles = runner
                 .getFlowFilesForRelationship(new 
Relationship.Builder().name("success").build());
 
         assertTrue(flowFiles.size() == 1);
         MockFlowFile successFlowFile = flowFiles.get(0);
-        String receivedMessage = new 
String(runner.getContentAsByteArray(successFlowFile));
-        assertEquals("Hello Stream", receivedMessage);
-        assertEquals("queue.testing", 
successFlowFile.getAttribute("jms.JMSDestination"));
+        successFlowFile.assertContentEquals("Hello Stream");
+        successFlowFile.assertAttributeEquals("jms.JMSDestination", 
"queue.testing");
 
         producer.close();
         jmsSession.close();

Reply via email to