Repository: falcon
Updated Branches:
  refs/heads/0.10 600b6bcc2 -> 0222d390b


FALCON-2051 PostProcessing needs to send JMS message so that REPL metrics can 
be added to the GraphDB

Basically, reverted the FALCON-1926 changes.
We will attempt to solve it properly by not sending JMS message from Falcon 
PostProcessing after 0.10 release.

Author: Venkatesan Ramachandran <[email protected]>

Reviewers: Venkat <[email protected]>, Pallavi Rao <[email protected]>, 
Praveen Adlakha <[email protected]>, Peeyush<[email protected]>

Closes #201 from vramachan/FALCON-2051.PostProcessingNotInvoked.0.10


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0222d390
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0222d390
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0222d390

Branch: refs/heads/0.10
Commit: 0222d390b5abef4b6b7263c84a3d409afac0afd6
Parents: 600b6bc
Author: Venkatesan Ramachandran <[email protected]>
Authored: Thu Jun 30 15:29:58 2016 +0530
Committer: peeyush b <[email protected]>
Committed: Thu Jun 30 15:29:58 2016 +0530

----------------------------------------------------------------------
 .../falcon/entity/WorkflowNameBuilder.java      |  7 ----
 docs/src/site/twiki/Configuration.twiki         |  3 --
 .../falcon/messaging/JMSMessageConsumer.java    |  3 +-
 .../messaging/JMSMessageConsumerTest.java       | 42 +++-----------------
 .../falcon/workflow/FalconPostProcessing.java   | 15 +++++++
 5 files changed, 22 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java 
b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
index f0d6073..c58be64 100644
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
@@ -34,9 +34,6 @@ import java.util.regex.Pattern;
 public class WorkflowNameBuilder<T extends Entity> {
     private static final String PREFIX = "FALCON";
 
-    // Oozie JMS message property name that holds the workflow app name
-    private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName";
-
     private T entity;
     private Tag tag;
     private List<String> suffixes;
@@ -156,9 +153,5 @@ public class WorkflowNameBuilder<T extends Entity> {
             }
             return null;
         }
-
-        public static String getJMSFalconSelector() {
-            return String.format("%s like '%s%s%%'", 
OOZIE_JMS_MSG_APPNAME_PROP, PREFIX, SEPARATOR);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki 
b/docs/src/site/twiki/Configuration.twiki
index ce32019..98acb83 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -103,9 +103,6 @@ Oozie workflow completes. Falcon listens to Oozie 
notification via JMS. You need
 explained below. Falcon post processing feature continues to only send user 
notifications so enabling Oozie
 JMS notification is important.
 
-*NOTE : If Oozie JMS notification is not enabled, the Falcon features such as 
failure retry, late data handling and metadata
-service will be disabled for all entities on the server.*
-
 ---+++Enable Oozie JMS notification
 
    * Please add/change the following properties in oozie-site.xml in the oozie 
installation dir.

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 8b48e93..90bbdd3 100644
--- 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -92,8 +92,7 @@ public class JMSMessageConsumer implements MessageListener, 
ExceptionListener {
 
             topicSession = (TopicSession) connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             Topic destination = topicSession.createTopic(topicName);
-            topicSubscriber = 
topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID,
-                    WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), 
false);
+            topicSubscriber = 
topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
             topicSubscriber.setMessageListener(this);
 
             connection.setExceptionListener(this);

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
 
b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 6237bdf..cffdb59 100644
--- 
a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ 
b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -83,11 +83,6 @@ public class JMSMessageConsumerTest {
 
     public void sendMessages(String topic, WorkflowExecutionContext.Type type)
         throws JMSException, FalconException, IOException {
-        sendMessages(topic, type, true);
-    }
-
-    public void sendMessages(String topic, WorkflowExecutionContext.Type type, 
boolean isFalconWF)
-        throws JMSException, FalconException, IOException {
         ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
@@ -105,10 +100,10 @@ public class JMSMessageConsumerTest {
                 message = getMockFalconMessage(i, session);
                 break;
             case WORKFLOW_JOB:
-                message = getMockOozieMessage(i, session, isFalconWF);
+                message = getMockOozieMessage(i, session);
                 break;
             case COORDINATOR_ACTION:
-                message = getMockOozieCoordMessage(i, session, isFalconWF);
+                message = getMockOozieCoordMessage(i, session);
             default:
                 break;
             }
@@ -117,15 +112,10 @@ public class JMSMessageConsumerTest {
         }
     }
 
-    private Message getMockOozieMessage(int i, Session session, boolean 
isFalconWF)
-        throws FalconException, JMSException {
+    private Message getMockOozieMessage(int i, Session session) throws 
FalconException, JMSException {
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "WORKFLOW_JOB");
-        if (isFalconWF) {
-            message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
-        } else {
-            message.setStringProperty("appName", "OozieSampleShellWF");
-        }
+        message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
         message.setStringProperty("user", "falcon");
         switch(i % 4) {
         case 0:
@@ -152,15 +142,11 @@ public class JMSMessageConsumerTest {
         return message;
     }
 
-    private Message getMockOozieCoordMessage(int i, Session session, boolean 
isFalconWF)
+    private Message getMockOozieCoordMessage(int i, Session session)
         throws FalconException, JMSException {
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "COORDINATOR_ACTION");
-        if (isFalconWF) {
-            message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
-        } else {
-            message.setStringProperty("appName", "OozieSampleShellWF");
-        }
+        message.setStringProperty("appName", 
"FALCON_PROCESS_DEFAULT_process1");
         message.setStringProperty("user", "falcon");
         switch(i % 5) {
         case 0:
@@ -292,20 +278,4 @@ public class JMSMessageConsumerTest {
         broker.stop();
         subscriber.closeSubscriber();
     }
-
-    @Test
-    public void testJMSMessagesFromOozieForNonFalconWF() throws Exception {
-        sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, 
false /* isFalconWF */);
-
-        final BrokerView adminView = broker.getAdminView();
-        Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
-
-        Thread.sleep(100);
-        Mockito.verify(jobEndService, 
Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, 
Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, 
Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, 
Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, 
Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java 
b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4961896..ea914f6 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -48,6 +48,14 @@ public class FalconPostProcessing extends Configured 
implements Tool {
         // serialize the context to HDFS under logs dir before sending the 
message
         context.serialize();
 
+        boolean systemNotificationEnabled = Boolean.parseBoolean(context.
+            getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, 
"true"));
+
+        if (systemNotificationEnabled) {
+            LOG.info("Sending Falcon message {} ", context);
+            invokeFalconMessageProducer(context);
+        }
+
         String userBrokerUrl = 
context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
         boolean userNotificationEnabled = Boolean.parseBoolean(context.
                 getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, 
"true"));
@@ -72,6 +80,13 @@ public class FalconPostProcessing extends Configured 
implements Tool {
         
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
+    private void invokeFalconMessageProducer(WorkflowExecutionContext context) 
throws Exception {
+        JMSMessageProducer jmsMessageProducer = 
JMSMessageProducer.builder(context)
+            .type(JMSMessageProducer.MessageType.FALCON)
+            .build();
+        jmsMessageProducer.sendMessage();
+    }
+
     private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {

Reply via email to