Repository: falcon
Updated Branches:
  refs/heads/master 5e26e11ca -> 98a904c21


FALCON-1926 Filter out effectively non-falcon related JMS messages …

* Falcon to retrieve Oozie JMS notifications that belong to Falcon generated 
Workflows .. ignoring notifications generated by other WF directly submitted to 
Oozie.

Author: Venkatesan Ramachandran <[email protected]>

Reviewers: "Balu Vellanki <[email protected]>, Ying Zheng 
<[email protected]>"

Closes #119 from vramachan/FALCON-1923.OpsCoord


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/98a904c2
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/98a904c2
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/98a904c2

Branch: refs/heads/master
Commit: 98a904c2118d368e110b88ff396da86c64490ed8
Parents: 5e26e11
Author: Venkatesan Ramachandran <[email protected]>
Authored: Wed May 11 18:09:16 2016 -0700
Committer: bvellanki <[email protected]>
Committed: Wed May 11 18:09:16 2016 -0700

----------------------------------------------------------------------
 .../falcon/entity/WorkflowNameBuilder.java      |  7 +++
 docs/src/site/twiki/Configuration.twiki         |  2 +-
 .../falcon/messaging/JMSMessageConsumer.java    |  3 +-
 .../messaging/JMSMessageConsumerTest.java       | 47 +++++++++++++++++---
 .../falcon/workflow/FalconPostProcessing.java   | 15 -------
 5 files changed, 51 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java 
b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
index c58be64..f0d6073 100644
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
@@ -34,6 +34,9 @@ import java.util.regex.Pattern;
 public class WorkflowNameBuilder<T extends Entity> {
     private static final String PREFIX = "FALCON";
 
+    // Oozie JMS message property name that holds the workflow app name
+    private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName";
+
     private T entity;
     private Tag tag;
     private List<String> suffixes;
@@ -153,5 +156,9 @@ public class WorkflowNameBuilder<T extends Entity> {
             }
             return null;
         }
+
+        public static String getJMSFalconSelector() {
+            return String.format("%s like '%s%s%%'", 
OOZIE_JMS_MSG_APPNAME_PROP, PREFIX, SEPARATOR);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki 
b/docs/src/site/twiki/Configuration.twiki
index 0df094f..8cf2a64 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -99,7 +99,7 @@ Some Falcon features such as late data handling, retries, 
metadata service, depe
    * In Falcon runtime.properties, set *.falcon.jms.notification.enabled to 
false. This will turn off JMS notification in post-processing.
    * Copy notification related properties in oozie/conf/oozie-site.xml to 
oozie-site.xml of the Oozie installation.  Restart Oozie so changes get 
reflected.  
 
-*NOTE : If you disable Falcon post-processing JMS notification and not enable 
Oozie JMS notification, features such as failure retry, late data handling and 
metadata service will be disabled for all entities on the server.*
+*NOTE : Oozie JMS notification needs to be enabled for features such as 
failure retry, late data handling and metadata service will be disabled for all 
entities on the server. Please refer Falcon documentation on how to configure 
Oozie for Falcon.*
 
 ---+++Enabling Falcon Native Scheudler
 You can either choose to schedule entities using Oozie's coordinator or using 
Falcon's native scheduler. To be able to schedule entities natively on Falcon, 
you will need to add some additional properties to 
<verbatim>$FALCON_HOME/conf/startup.properties</verbatim> before starting the 
Falcon Server. For details on the same, refer to 
[[FalconNativeScheduler][Falcon Native Scheduler]]

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 90bbdd3..8b48e93 100644
--- 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -92,7 +92,8 @@ public class JMSMessageConsumer implements MessageListener, 
ExceptionListener {
 
             topicSession = (TopicSession) connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             Topic destination = topicSession.createTopic(topicName);
-            topicSubscriber = 
topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
+            topicSubscriber = 
topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID,
+                    WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), 
false);
             topicSubscriber.setMessageListener(this);
 
             connection.setExceptionListener(this);

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
 
b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 5c53a3e..0ba9464 100644
--- 
a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ 
b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -83,6 +83,11 @@ public class JMSMessageConsumerTest {
 
     public void sendMessages(String topic, WorkflowExecutionContext.Type type)
         throws JMSException, FalconException, IOException {
+        sendMessages(topic, type, true);
+    }
+
+    public void sendMessages(String topic, WorkflowExecutionContext.Type type, 
boolean isFalconWF)
+        throws JMSException, FalconException, IOException {
         ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
@@ -100,10 +105,10 @@ public class JMSMessageConsumerTest {
                 message = getMockFalconMessage(i, session);
                 break;
             case WORKFLOW_JOB:
-                message = getMockOozieMessage(i, session);
+                message = getMockOozieMessage(i, session, isFalconWF);
                 break;
             case COORDINATOR_ACTION:
-                message = getMockOozieCoordMessage(i, session);
+                message = getMockOozieCoordMessage(i, session, isFalconWF);
             default:
                 break;
             }
@@ -112,10 +117,15 @@ public class JMSMessageConsumerTest {
         }
     }
 
-    private Message getMockOozieMessage(int i, Session session) throws 
FalconException, JMSException {
+    private Message getMockOozieMessage(int i, Session session, boolean 
isFalconWF)
+        throws FalconException, JMSException {
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "WORKFLOW_JOB");
-        message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
+        if (isFalconWF) {
+            message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
+        } else {
+            message.setStringProperty("appName", "OozieSampleShellWF");
+        }
         message.setStringProperty("user", "falcon");
         switch(i % 4) {
         case 0:
@@ -142,10 +152,15 @@ public class JMSMessageConsumerTest {
         return message;
     }
 
-    private Message getMockOozieCoordMessage(int i, Session session) throws 
FalconException, JMSException {
+    private Message getMockOozieCoordMessage(int i, Session session, boolean 
isFalconWF)
+        throws FalconException, JMSException {
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "COORDINATOR_ACTION");
-        message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
+        if (isFalconWF) {
+            message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
+        } else {
+            message.setStringProperty("appName", "OozieSampleShellWF");
+        }
         message.setStringProperty("user", "falcon");
         switch(i % 5) {
         case 0:
@@ -288,4 +303,24 @@ public class JMSMessageConsumerTest {
         broker.stop();
         subscriber.closeSubscriber();
     }
+
+    @Test
+    public void testJMSMessagesFromOozieForNonFalconWF() throws Exception {
+        sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, 
false /* isFalconWF */);
+
+        final BrokerView adminView = broker.getAdminView();
+
+        Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+        Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
+        Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
+        Assert.assertEquals(adminView.getTotalMessageCount(), 0);
+
+        Thread.sleep(100);
+        Mockito.verify(jobEndService, 
Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, 
Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, 
Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, 
Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, 
Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java 
b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 3bdfe73..4961896 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -48,14 +48,6 @@ public class FalconPostProcessing extends Configured 
implements Tool {
         // serialize the context to HDFS under logs dir before sending the 
message
         context.serialize();
 
-        boolean systemNotificationEnabled = Boolean.parseBoolean(context.
-                
getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));
-
-        if (systemNotificationEnabled) {
-            LOG.info("Sending Falcon message {} ", context);
-            invokeFalconMessageProducer(context);
-        }
-
         String userBrokerUrl = 
context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
         boolean userNotificationEnabled = Boolean.parseBoolean(context.
                 getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, 
"true"));
@@ -80,13 +72,6 @@ public class FalconPostProcessing extends Configured 
implements Tool {
         
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
-    private void invokeFalconMessageProducer(WorkflowExecutionContext context) 
throws Exception {
-        JMSMessageProducer jmsMessageProducer = 
JMSMessageProducer.builder(context)
-                .type(JMSMessageProducer.MessageType.FALCON)
-                .build();
-        jmsMessageProducer.sendMessage();
-    }
-
     private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {

Reply via email to