Repository: oozie
Updated Branches:
  refs/heads/master 81f6c747a -> 0b4d3521a


OOZIE-1765 JMS Notifications for Workflows not always on the correct topic 
(rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0b4d3521
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0b4d3521
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0b4d3521

Branch: refs/heads/master
Commit: 0b4d3521a92ca8f7d4f7e6f91e6470d2a09118bc
Parents: 81f6c74
Author: Robert Kanter <[email protected]>
Authored: Mon Mar 31 12:49:28 2014 -0700
Committer: Robert Kanter <[email protected]>
Committed: Mon Mar 31 12:49:28 2014 -0700

----------------------------------------------------------------------
 .../apache/oozie/service/JMSTopicService.java   | 11 ++++-
 .../oozie/service/TestJMSTopicService.java      | 46 ++++++++++++++++++--
 docs/src/site/twiki/AG_Install.twiki            |  4 +-
 docs/src/site/twiki/DG_JMSNotifications.twiki   | 21 +++++----
 release-log.txt                                 |  1 +
 5 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JMSTopicService.java 
b/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
index 7a7a960..1646a17 100644
--- a/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
+++ b/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
@@ -193,7 +193,16 @@ public class JMSTopicService implements Service {
             }
         }
         else if (appType == AppType.WORKFLOW_JOB || appType == 
AppType.WORKFLOW_ACTION) {
-            topicName = topicMap.get(JobType.WORKFLOW);
+            topicName = topicMap.get(JobType.WORKFLOW.value);
+            if (appType == AppType.WORKFLOW_ACTION) {
+                id = parentJobId;
+            }
+        }
+        else if (appType == AppType.BUNDLE_JOB || appType == 
AppType.BUNDLE_ACTION) {
+            topicName = topicMap.get(JobType.BUNDLE.value);
+            if (appType == AppType.BUNDLE_ACTION) {
+                id = parentJobId;
+            }
         }
 
         if (topicName == null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java 
b/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
index 5f70153..21ca849 100644
--- a/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
@@ -67,17 +67,25 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = 
Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
         assertEquals(wfj.getUser(), jmsTopicService.getTopic(wfj.getId()));
+        assertEquals(wfj.getUser(), 
jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), 
null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", 
WorkflowAction.Status.RUNNING);
         assertEquals(wfj.getUser(), jmsTopicService.getTopic(wab.getId()));
+        assertEquals(wfj.getUser(), 
jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), 
wab.getWfId()));
         CoordinatorJobBean cjb = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
-        assertEquals(wfj.getUser(), jmsTopicService.getTopic(cjb.getId()));
+        assertEquals(cjb.getUser(), jmsTopicService.getTopic(cjb.getId()));
+        assertEquals(cjb.getUser(), 
jmsTopicService.getTopic(AppType.COORDINATOR_JOB, cjb.getUser(), cjb.getId(), 
null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 
1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
-        assertEquals(wfj.getUser(), jmsTopicService.getTopic(cab.getId()));
+        assertEquals(cjb.getUser(), jmsTopicService.getTopic(cab.getId()));
+        assertEquals(cjb.getUser(),
+                jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, 
cjb.getUser(), cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, 
true);
-        assertEquals(wfj.getUser(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getUser(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getUser(), 
jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 
1, Job.Status.RUNNING);
-        assertEquals(wfj.getUser(), 
jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getUser(), 
jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getUser(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), 
bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -91,17 +99,27 @@ public class TestJMSTopicService extends XDataTestCase {
         WorkflowJobBean wfj = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
         assertEquals(TOPIC_PREFIX, jmsTopicService.getTopicPrefix());
         assertEquals(TOPIC_PREFIX + wfj.getId(), 
jmsTopicService.getTopic(wfj.getId()));
+        assertEquals(TOPIC_PREFIX + wfj.getId(), 
jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), 
null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", 
WorkflowAction.Status.RUNNING);
         assertEquals(TOPIC_PREFIX + wfj.getId(), 
jmsTopicService.getTopic(wab.getId()));
+        assertEquals(TOPIC_PREFIX + wfj.getId(),
+                jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, 
wfj.getUser(), wab.getId(), wab.getWfId()));
         CoordinatorJobBean cjb = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals(TOPIC_PREFIX + cjb.getId(), 
jmsTopicService.getTopic(cjb.getId()));
+        assertEquals(TOPIC_PREFIX + cjb.getId(),
+                jmsTopicService.getTopic(AppType.COORDINATOR_JOB, 
cjb.getUser(), cjb.getId(), null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 
1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals(TOPIC_PREFIX + cjb.getId(), 
jmsTopicService.getTopic(cab.getId()));
+        assertEquals(TOPIC_PREFIX + cjb.getId(),
+                jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, 
cjb.getUser(), cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, 
true);
         assertEquals(TOPIC_PREFIX + bjb.getId(), 
jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(TOPIC_PREFIX + bjb.getId(), 
jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 
1, Job.Status.RUNNING);
         assertEquals(TOPIC_PREFIX + bjb.getId(), 
jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(TOPIC_PREFIX + bjb.getId(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), 
bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -116,17 +134,24 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = 
Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
         assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+        assertEquals("workflow", 
jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), 
null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", 
WorkflowAction.Status.RUNNING);
         assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+        assertEquals("workflow", 
jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), 
wab.getWfId()));
         CoordinatorJobBean cjb = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+        assertEquals("coord", 
jmsTopicService.getTopic(AppType.COORDINATOR_JOB, cjb.getUser(), cjb.getId(), 
null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 
1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+        assertEquals("coord", 
jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), 
cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, 
true);
         assertEquals("bundle", jmsTopicService.getTopic(bjb.getId()));
+        assertEquals("bundle", jmsTopicService.getTopic(AppType.BUNDLE_JOB, 
bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 
1, Job.Status.RUNNING);
         assertEquals("bundle", 
jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals("bundle",
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), 
bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -141,17 +166,24 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = 
Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
         assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+        assertEquals("workflow", 
jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), 
null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", 
WorkflowAction.Status.RUNNING);
         assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+        assertEquals("workflow", 
jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), 
wab.getWfId()));
         CoordinatorJobBean cjb = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+        assertEquals("coord", 
jmsTopicService.getTopic(AppType.COORDINATOR_JOB, cjb.getUser(), cjb.getId(), 
null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 
1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+        assertEquals("coord", 
jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), 
cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, 
true);
         assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getId(), jmsTopicService.getTopic(AppType.BUNDLE_JOB, 
bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 
1, Job.Status.RUNNING);
         assertEquals(bjb.getId(), 
jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getId(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), 
bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -165,18 +197,24 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = 
Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
         assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+        assertEquals("workflow", 
jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), 
null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", 
WorkflowAction.Status.RUNNING);
         assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+        assertEquals("workflow", 
jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), 
wab.getWfId()));
         CoordinatorJobBean cjb = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 
1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+        assertEquals("coord", 
jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), 
cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, 
true);
         // As no default is specified, user will be considered as topic
         assertEquals(bjb.getUser(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getUser(), 
jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 
1, Job.Status.RUNNING);
         assertEquals(bjb.getUser(), 
jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getUser(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), 
bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/docs/src/site/twiki/AG_Install.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_Install.twiki 
b/docs/src/site/twiki/AG_Install.twiki
index 41f7c4c..e343d7e 100644
--- a/docs/src/site/twiki/AG_Install.twiki
+++ b/docs/src/site/twiki/AG_Install.twiki
@@ -570,7 +570,7 @@ identifier (e.g. default) assigned to a semi-colon 
separated key#value list of p
      <property>
         <name>oozie.jms.producer.connection.properties</name>
         <value>
-            
default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616
+            
java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616;connectionFactoryNames#ConnectionFactory
         </value>
      </property>
      </verbatim>
@@ -584,7 +584,7 @@ about the various jobs.
             default=${username}
         </value>
         <description>
-            Topic options are ${username} or a fixed string which can be 
specified as default or for a
+            Topic options are ${username}, ${jobId}, or a fixed string which 
can be specified as default or for a
             particular job type.
             For e.g To have a fixed string topic for workflows, coordinators 
and bundles,
             specify in the following comma-separated format: 
{jobtype1}={some_string1}, {jobtype2}={some_string2}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/docs/src/site/twiki/DG_JMSNotifications.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_JMSNotifications.twiki 
b/docs/src/site/twiki/DG_JMSNotifications.twiki
index eaa6b7e..098b080 100644
--- a/docs/src/site/twiki/DG_JMSNotifications.twiki
+++ b/docs/src/site/twiki/DG_JMSNotifications.twiki
@@ -50,10 +50,10 @@ String getTopicPrefix();
 </verbatim>
 
 The topic is obtained by concatenating topic prefix and the substituted value 
for topic pattern. The topic pattern
-can be a constant value like workflow or coordinator which the administrator 
has configured or ${username}.
-If ${username}, it has to be substitued with the name of the user who has 
submitted the job. Administrators can chose
-to publish messages to topics containing user names to avoid having one topic 
containing all messages and all users having
-to apply selectors to filter the message they are interested in.
+can be a constant value like workflow or coordinator which the administrator 
has configured or a variable (either ${username}
+or ${jobId}). If ${username}, it has to be substituted with the name of the 
user who has submitted the job; and if ${jobId} it has
+to be substituted with the job Id. Administrators can chose to publish 
messages to topics containing user names to avoid having one
+topic containing all messages and all users having to apply selectors to 
filter the message they are interested in.
 
 The getJMSTopicName API can be used if the job id is already known and will 
give the exact topic name to which the
 notifications for that job are published.
@@ -162,8 +162,7 @@ First, create the Oozie client and retrieve the JNDI 
properties to make a connec
    Properties jndiProperties = jmsInfo.getJNDIProperties();
    Context jndiContext = new InitialContext(jndiProperties);
    String connectionFactoryName = (String) 
jndiContext.getEnvironment().get("connectionFactoryNames");
-   ConnectionFactory connectionFactory = (ConnectionFactory);
-   jndiContext.lookup(connectionFactoryName);
+   ConnectionFactory connectionFactory = (ConnectionFactory) 
jndiContext.lookup(connectionFactoryName);
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    String topicPrefix = jmsInfo.getTopicPrefix();
@@ -173,6 +172,10 @@ First, create the Oozie client and retrieve the JNDI 
properties to make a connec
    String topicName = null;
    if (topicPattern.equals("${username}")) {
        topicName = "john";
+   // Following code checks if the topic pattern is
+   //'jobId', then the topic name is set to the job id
+   } else if (topicPattern.equals("${jobId}")) {
+       topicName = "0000004-140328125200198-oozie-oozi-W";
    }
    Destination topic = session.createTopic(topicPrefix+topicName);
    MessageConsumer consumer = session.createConsumer(topic);
@@ -185,12 +188,12 @@ interface needs to be implemented. Also, its onMessage() 
method  needs to be imp
 This method will be called whenever a message is available on the JMS bus.
 
 <verbatim>
-    public void onMessage(Message m) {
-       if 
(message.getStringProperty(JMSHeaderConsants.MSG_TYPE).equals(MessageType.SLA.name()){
+    public void onMessage(Message message) {
+       if 
(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE).equals(MessageType.SLA.name())){
           SLAMessage slaMessage = JMSMessagingUtils.getEventMessage(message);
           // Further processing
        }
-       else if 
(message.getStringProperty(JMSHeaderConsants.APP_TYPE).equals(AppType.WORKFLOW_JOB.name()){
+       else if 
(message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals(AppType.WORKFLOW_JOB.name())){
           WorkflowJobMessage wfJobMessage = 
JMSMessagingUtils.getEventMessage(message);
           // Further processing
        }

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a65c8af..1bfa9c2 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1765 JMS Notifications for Workflows not always on the correct topic 
(rkanter)
 OOZIE-1732 Sharelib instrumentation fails if sharelib.system.libpath is not 
created (ryota)
 OOZIE-1692 modify log message when checking completion of child job in 
Map-Reduce action (ryota)
 OOZIE-1734 Oozie returned 500 Internal Server error when user passes invalid 
request (checha via rkanter)

Reply via email to