Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java (original) +++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java Wed May 8 17:59:52 2013 @@ -17,10 +17,16 @@ */ package org.apache.oozie.servlet; +import java.io.IOException; + import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.oozie.DagEngine; +import org.apache.oozie.DagEngineException; import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.service.DagEngineService; +import org.apache.oozie.service.Services; @SuppressWarnings("serial") public class V2JobServlet extends V1JobServlet { @@ -42,4 +48,21 @@ public class V2JobServlet extends V1JobS JsonBean actionBean = super.getWorkflowActionBean(request, response); return actionBean; } + + + @Override + protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + String topicName; + String jobId = getResourceName(request); + DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), + getAuthToken(request)); + try { + topicName = dagEngine.getJMSTopicName(jobId); + } + catch (DagEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + return topicName; + } }
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/main/resources/oozie-default.xml (original) +++ oozie/trunk/core/src/main/resources/oozie-default.xml Wed May 8 17:59:52 2013 @@ -164,14 +164,14 @@ bundle job and bundle action WORKFLOW=workflow, COORDINATOR=coordinator, - BUNDLE={jobId} + BUNDLE=${jobId} For jobs with no defined topic, default topic will be ${username} </description> </property> <!-- JMS Producer connection --> <property> - <name>oozie.jms.producer.notification.connection</name> + <name>oozie.jms.producer.connection.properties</name> <value>java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616;connectionFactoryNames#ConnectionFactory</value> </property> Modified: oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java Wed May 8 17:59:52 2013 @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,6 +34,7 @@ import org.apache.oozie.servlet.MockDagE import org.apache.oozie.servlet.V1AdminServlet; import org.apache.oozie.servlet.V1JobServlet; import org.apache.oozie.servlet.V1JobsServlet; +import org.apache.oozie.servlet.V2AdminServlet; import org.apache.oozie.servlet.V2JobServlet; import org.apache.oozie.util.XConfiguration; @@ -45,15 +46,15 @@ public class TestOozieCLI extends DagSer new V1JobServlet(); new V1JobsServlet(); new V1AdminServlet(); + new V2AdminServlet(); new V2JobServlet(); } static final boolean IS_SECURITY_ENABLED = false; static final String VERSION = "/v" + OozieClient.WS_PROTOCOL_VERSION; static final String[] END_POINTS = {"/versions", VERSION + "/jobs", VERSION + "/job/*", VERSION + "/admin/*"}; - static final Class[] SERVLET_CLASSES = - { HeaderTestingVersionServlet.class, V1JobsServlet.class, V1JobServlet.class, - V1AdminServlet.class, V2JobServlet.class }; + static final Class[] SERVLET_CLASSES = { HeaderTestingVersionServlet.class, V1JobsServlet.class, + V1JobServlet.class, V1AdminServlet.class, V2JobServlet.class, V2AdminServlet.class }; @Override protected void setUp() throws Exception { @@ -300,7 +301,7 @@ public class TestOozieCLI extends DagSer /** * Check if "-debug" option is accepted at CLI with job run command - * + * * @throws Exception */ public void testRunWithDebug() throws Exception { @@ -518,12 +519,12 @@ public class TestOozieCLI extends DagSer MockDagEngineService.JOB_ID + "1" + MockDagEngineService.JOB_ID_END}; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_SHOW_INFO, MockDagEngineService.did); - + args = new String[]{"job", "-oozie", oozieUrl, "-info", MockDagEngineService.JOB_ID + "2" + MockDagEngineService.JOB_ID_END}; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_SHOW_INFO, MockDagEngineService.did); - + args = new String[]{"job", "-oozie", oozieUrl, "-info", MockDagEngineService.JOB_ID + (MockDagEngineService.workflows.size() + 1)}; assertEquals(-1, new OozieCLI().run(args)); @@ -545,12 +546,12 @@ public class TestOozieCLI extends DagSer "name=x"}; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did); - + args = new String[]{"jobs", "-timezone", "PST", "-len", "3", "-offset", "2", "-oozie", oozieUrl, "-filter", "name=x"}; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did); - + args = new String[]{"jobs", "-jobtype", "coord", "-filter", "status=FAILED", "-oozie", oozieUrl}; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did); @@ -711,7 +712,7 @@ public class TestOozieCLI extends DagSer } }); } - + public void testInfo() throws Exception { String[] args = new String[]{"info"}; assertEquals(0, new OozieCLI().run(args)); Modified: oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java Wed May 8 17:59:52 2013 @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,6 +32,7 @@ import org.apache.oozie.servlet.V0JobsSe import org.apache.oozie.servlet.V1AdminServlet; import org.apache.oozie.servlet.V1JobServlet; import org.apache.oozie.servlet.V1JobsServlet; +import org.apache.oozie.servlet.V2AdminServlet; import org.apache.oozie.servlet.V2JobServlet; public class TestWorkflowClient extends DagServletTestCase { @@ -44,13 +45,15 @@ public class TestWorkflowClient extends new V1AdminServlet(); new V1JobServlet(); new V2JobServlet(); + new V2AdminServlet(); } private static final boolean IS_SECURITY_ENABLED = false; static final String VERSION = "/v" + OozieClient.WS_PROTOCOL_VERSION; static final String[] END_POINTS = {"/versions", VERSION + "/jobs", VERSION + "/job/*", VERSION + "/admin/*"}; - static final Class[] SERVLET_CLASSES = {HeaderTestingVersionServlet.class, V0JobsServlet.class, - V0JobServlet.class, V1AdminServlet.class, V1JobServlet.class, V2JobServlet.class, V1JobsServlet.class}; + static final Class[] SERVLET_CLASSES = { HeaderTestingVersionServlet.class, V0JobsServlet.class, + V0JobServlet.class, V1AdminServlet.class, V2AdminServlet.class, V1JobServlet.class, V2JobServlet.class, + V1JobsServlet.class }; protected void setUp() throws Exception { super.setUp(); Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java Wed May 8 17:59:52 2013 @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.command; - -import java.util.Properties; - -import org.apache.oozie.ErrorCode; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.client.rest.JMSConnectionInfoBean; -import org.apache.oozie.jms.JMSJobEventListener; -import org.apache.oozie.service.JMSAccessorService; -import org.apache.oozie.service.JMSTopicService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XDataTestCase; -import org.apache.oozie.workflow.WorkflowInstance; -import org.junit.Test; - -public class TestJMSInfoXCommand extends XDataTestCase { - - private Services services; - - @Override - protected void setUp() throws Exception { - super.setUp(); - services = new Services(); - services.init(); - } - - @Override - protected void tearDown() throws Exception { - services.destroy(); - super.tearDown(); - } - - @Test - public void testConnectionNotEnabled() { - try { - WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); - new JMSInfoXCommand(wfj.getId()).call(); - } - catch (CommandException e) { - assertEquals(ErrorCode.E1601, e.getErrorCode()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testJMSConnectionInfo() { - try { - services.destroy(); - services = new Services(); - services.getConf().set( - JMSJobEventListener.JMS_CONNECTION_PROPERTIES, - "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#" - + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory"); - services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName()); - services.init(); - - WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); - JMSInfoXCommand jmsInfoCmd = new JMSInfoXCommand(wfj.getId()); - JMSConnectionInfoBean jmsBean = jmsInfoCmd.call(); - assertEquals("test", jmsBean.getTopicName()); - Properties props = jmsBean.getJNDIProperties(); - assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial")); - assertEquals(localActiveMQBroker, props.get("java.naming.provider.url")); - assertEquals("ConnectionFactory", props.get("connectionFactoryNames")); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - -} Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java Wed May 8 17:59:52 2013 @@ -17,6 +17,7 @@ */ package org.apache.oozie.jms; +import javax.jms.JMSException; import javax.jms.Session; import org.apache.hadoop.conf.Configuration; @@ -54,11 +55,11 @@ public class TestDefaultConnectionContex } @Test - public void testThreadLocalSession() { + public void testThreadLocalSession() throws JMSException { String jmsProps = services.getConf().get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES); JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps); ConnectionContext jmsContext = Services.get().get(JMSAccessorService.class) - .createConnectionContext(connInfo, false); + .createConnectionContext(connInfo); Thread th = new Thread(new SessionThread(jmsContext)); th.start(); try { @@ -69,25 +70,17 @@ public class TestDefaultConnectionContex } assertEquals(session1, session2); - ThreadLocal<Session> threadLocal1 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); - Session session3 = threadLocal1.get(); - ThreadLocal<Session> threadLocal2 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); - Session session4 = threadLocal2.get(); + Session session3 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); + Session session4 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); // As session3 and session4 are created by same threads, they should be // equal assertTrue(session3.equals(session4)); // As session1 and session3 are created by diff threads, they shoudn't // be equal assertFalse(session1.equals(session3)); - threadLocal1.remove(); - ThreadLocal<Session> threadLocal3 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); - Session session5 = threadLocal3.get(); - assertFalse(session3.equals(session5)); - } class SessionThread implements Runnable { - private ConnectionContext connContext; SessionThread(ConnectionContext connContext) { @@ -96,10 +89,14 @@ public class TestDefaultConnectionContex @Override public void run() { - ThreadLocal<Session> th1 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); - session1 = th1.get(); - ThreadLocal<Session> th2 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); - session2 = th2.get(); + try { + session1 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); + session2 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE); + } + catch (JMSException e) { + e.printStackTrace(); + } + } } Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java Wed May 8 17:59:52 2013 @@ -89,7 +89,7 @@ public class TestJMSJobEventListener ext wfEventListener.onWorkflowJobEvent(wfe); TextMessage message = (TextMessage) consumer.receive(5000); assertFalse(message.getText().contains("endTime")); - WorkflowJobMessage wfStartMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message); + WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message); assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus()); assertEquals(startDate, wfStartMessage.getStartTime()); assertEquals("wfId1", wfStartMessage.getId()); @@ -122,7 +122,7 @@ public class TestJMSJobEventListener ext MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); wfEventListener.onWorkflowJobEvent(wfe); TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfSuccMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message); + WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message); assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus()); assertEquals(startDate, wfSuccMessage.getStartTime()); assertEquals(endDate, wfSuccMessage.getEndTime()); @@ -158,7 +158,7 @@ public class TestJMSJobEventListener ext MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); wfEventListener.onWorkflowJobEvent(wfe); TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); assertEquals(startDate, wfFailMessage.getStartTime()); assertEquals(endDate, wfFailMessage.getEndTime()); @@ -193,7 +193,7 @@ public class TestJMSJobEventListener ext wfEventListener.onWorkflowJobEvent(wfe); TextMessage message = (TextMessage) consumer.receive(5000); assertFalse(message.getText().contains("endTime")); - WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus()); assertEquals(startDate, wfFailMessage.getStartTime()); assertEquals("wfId1", wfFailMessage.getId()); @@ -226,7 +226,7 @@ public class TestJMSJobEventListener ext MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); wfEventListener.onWorkflowJobEvent(wfe); TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); assertEquals("user_1", wfFailMessage.getUser()); assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); @@ -276,7 +276,7 @@ public class TestJMSJobEventListener ext MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); wfEventListener.onWorkflowJobEvent(wfe); TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); assertEquals("user1", wfFailMessage.getUser()); assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); @@ -303,7 +303,7 @@ public class TestJMSJobEventListener ext MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); wfEventListener.onWorkflowJobEvent(wfe); TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); assertEquals("user1", wfFailMessage.getUser()); assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); @@ -339,8 +339,7 @@ public class TestJMSJobEventListener ext assertNotNull(jmsContext); broker.stop(); jmsContext = getConnectionContext(); - // Exception Listener should have removed the conn context from - // connection map + // Exception Listener should have removed the old conn context assertNull(jmsContext); broker = new BrokerService(); broker.addConnector(brokerURl); @@ -381,7 +380,7 @@ public class TestJMSJobEventListener ext assertFalse(message.getText().contains("endTime")); assertFalse(message.getText().contains("errorCode")); assertFalse(message.getText().contains("errorMessage")); - CoordinatorActionMessage coordActionWaitingMessage = (CoordinatorActionMessage) JMSMessagingUtils + CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils .getEventMessage(message); assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus()); assertEquals(startDate, coordActionWaitingMessage.getStartTime()); @@ -419,7 +418,7 @@ public class TestJMSJobEventListener ext assertFalse(message.getText().contains("errorCode")); assertFalse(message.getText().contains("errorMessage")); assertFalse(message.getText().contains("missingDependency")); - CoordinatorActionMessage coordActionStartMessage = (CoordinatorActionMessage) JMSMessagingUtils + CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils .getEventMessage(message); assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus()); assertEquals(startDate, coordActionStartMessage.getStartTime()); @@ -456,7 +455,7 @@ public class TestJMSJobEventListener ext assertFalse(message.getText().contains("errorCode")); assertFalse(message.getText().contains("errorMessage")); assertFalse(message.getText().contains("missingDependency")); - CoordinatorActionMessage coordActionSuccessMessage = (CoordinatorActionMessage) JMSMessagingUtils + CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils .getEventMessage(message); assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus()); assertEquals(startDate, coordActionSuccessMessage.getStartTime()); @@ -494,7 +493,7 @@ public class TestJMSJobEventListener ext coordEventListener.onCoordinatorActionEvent(cae); TextMessage message = (TextMessage) consumer.receive(5000); assertFalse(message.getText().contains("missingDependency")); - CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils + CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils .getEventMessage(message); assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus()); assertEquals(startDate, coordActionFailMessage.getStartTime()); @@ -530,7 +529,7 @@ public class TestJMSJobEventListener ext MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector); coordEventListener.onCoordinatorActionEvent(cae); TextMessage message = (TextMessage) consumer.receive(5000); - CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils + CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils .getEventMessage(message); Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus()); assertEquals("user1", coordActionFailMessage.getUser()); @@ -571,7 +570,7 @@ public class TestJMSJobEventListener ext String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES); JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps); JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); - ConnectionContext jmsContext = jmsService.createConnectionContext(connInfo, false); + ConnectionContext jmsContext = jmsService.createProducerConnectionContext(connInfo); return jmsContext; } Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java Wed May 8 17:59:52 2013 @@ -1,87 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.jms; - -import java.util.Properties; - -import org.apache.oozie.BundleActionBean; -import org.apache.oozie.BundleJobBean; -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorJobBean; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.client.rest.JMSConnectionInfoBean; -import org.apache.oozie.service.JMSAccessorService; -import org.apache.oozie.service.JMSTopicService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XDataTestCase; -import org.apache.oozie.workflow.WorkflowInstance; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestJMSServerInfo extends XDataTestCase { - - private Services services; - - @Before - protected void setUp() throws Exception { - super.setUp(); - services = new Services(); - services.getConf().set( - JMSJobEventListener.JMS_CONNECTION_PROPERTIES, - "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#" - + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory"); - services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName()); - services.init(); - } - - @After - protected void tearDown() throws Exception { - services.destroy(); - super.tearDown(); - } - - @Test - public void testJMSConnectionInfo() { - try { - JMSServerInfo jmsServerInfo = new DefaultJMSServerInfo(); - String connectionProperties = Services.get().getConf() - .get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES); - WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); - JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, wfj.getId()); - assertEquals(wfj.getUser(), jmsBean.getTopicName()); - Properties props = jmsBean.getJNDIProperties(); - assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial")); - assertEquals(localActiveMQBroker, props.get("java.naming.provider.url")); - assertEquals("ConnectionFactory", props.get("connectionFactoryNames")); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - -} Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Wed May 8 17:59:52 2013 @@ -56,10 +56,10 @@ public class TestJMSAccessorService exte JMSAccessorService jmsService = services.get(JMSAccessorService.class); // both servers should connect to default JMS server JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020")); - ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo, true); + ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo); assertTrue(ctxt1.isConnectionInitialized()); JMSConnectionInfo connInfo1 = hcatService.getJMSConnectionInfo(new URI("http://unknown:80")); - ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1, true); + ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1); assertTrue(ctxt2.isConnectionInitialized()); assertEquals(ctxt1, ctxt2); ctxt1.close(); @@ -213,13 +213,13 @@ public class TestJMSAccessorService exte assertTrue(jmsService.isListeningToTopic(connInfo, topic)); assertFalse(jmsService.isConnectionInRetryList(connInfo)); assertFalse(jmsService.isTopicInRetryList(connInfo, topic)); - ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo, true); + ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo); broker.stop(); try { connCtxt.createSession(Session.AUTO_ACKNOWLEDGE); fail("Exception expected"); } - catch (JMSException e) { + catch (Exception e) { Thread.sleep(100); assertFalse(jmsService.isListeningToTopic(connInfo, topic)); assertTrue(jmsService.isConnectionInRetryList(connInfo)); Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java Wed May 8 17:59:52 2013 @@ -18,6 +18,8 @@ package org.apache.oozie.service; +import java.util.Properties; + import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorActionBean; @@ -29,6 +31,8 @@ import org.apache.oozie.client.Coordinat import org.apache.oozie.client.Job; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.client.event.Event.AppType; +import org.apache.oozie.service.JMSTopicService.JobType; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.workflow.WorkflowInstance; @@ -87,24 +91,27 @@ public class TestJMSTopicService extends @Test public void testTopicAsJobId() { try { + final String TOPIC_PREFIX = "oozie."; services.destroy(); services = setupServicesForTopic(); services.getConf().set(JMSTopicService.TOPIC_NAME, "default=" + JMSTopicService.TopicType.JOBID.getValue()); + services.getConf().set(JMSTopicService.TOPIC_PREFIX, TOPIC_PREFIX); services.init(); JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); - assertEquals(wfj.getId(), jmsTopicService.getTopic(wfj.getId())); + assertEquals(TOPIC_PREFIX, jmsTopicService.getTopicPrefix()); + assertEquals(TOPIC_PREFIX+wfj.getId(), jmsTopicService.getTopic(wfj.getId())); WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING); - assertEquals(wfj.getId(), jmsTopicService.getTopic(wab.getId())); + assertEquals(TOPIC_PREFIX+wfj.getId(), jmsTopicService.getTopic(wab.getId())); CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true); - assertEquals(cjb.getId(), jmsTopicService.getTopic(cjb.getId())); + assertEquals(TOPIC_PREFIX+cjb.getId(), jmsTopicService.getTopic(cjb.getId())); CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-for-action-input-check.xml", 0); - assertEquals(cjb.getId(), jmsTopicService.getTopic(cab.getId())); + assertEquals(TOPIC_PREFIX+cjb.getId(), jmsTopicService.getTopic(cab.getId())); BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true); - assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId())); + assertEquals(TOPIC_PREFIX+bjb.getId(), jmsTopicService.getTopic(bjb.getId())); BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING); - assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId())); + assertEquals(TOPIC_PREFIX+bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId())); } catch (Exception e) { e.printStackTrace(); @@ -241,4 +248,56 @@ public class TestJMSTopicService extends } } + @Test + public void testTopicProperties1() { + try { + services.destroy(); + services = setupServicesForTopic(); + services.init(); + JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); + Properties props = jmsTopicService.getTopicPatternProperties(); + assertEquals("${username}", props.get(AppType.WORKFLOW_JOB)); + assertEquals("${username}", props.get(AppType.WORKFLOW_ACTION)); + assertEquals("${username}", props.get(AppType.COORDINATOR_JOB)); + assertEquals("${username}", props.get(AppType.COORDINATOR_ACTION)); + assertEquals("${username}", props.get(AppType.BUNDLE_JOB)); + assertEquals("${username}", props.get(AppType.BUNDLE_ACTION)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + @Test + public void testTopicProperties2() { + try { + services.destroy(); + services = setupServicesForTopic(); + services.getConf().set( + JMSTopicService.TOPIC_NAME, + JMSTopicService.JobType.WORKFLOW.getValue() + " = workflow," + + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord"); + services.init(); + JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); + Properties props = jmsTopicService.getTopicPatternProperties(); + assertEquals("workflow", props.get(AppType.WORKFLOW_JOB)); + assertEquals("workflow", props.get(AppType.WORKFLOW_ACTION)); + + assertEquals("coord", props.get(AppType.COORDINATOR_JOB)); + assertEquals("coord", props.get(AppType.COORDINATOR_ACTION)); + + assertEquals("${username}", props.get(AppType.BUNDLE_JOB)); + assertEquals("${username}", props.get(AppType.BUNDLE_ACTION)); + + services.destroy(); + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java Wed May 8 17:59:52 2013 @@ -178,12 +178,6 @@ public class MockDagEngineService extend } @Override - public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException { - did = RestConstants.JOB_SHOW_JMS_INFO; - return createDummyJMSConnectionInfo(); - } - - @Override public String getDefinition(String jobId) throws DagEngineException { did = RestConstants.JOB_SHOW_DEFINITION; int idx = validateWorkflowIdx(jobId); @@ -241,7 +235,7 @@ public class MockDagEngineService extend jmsProps.setProperty("k1", "v1"); jmsProps.setProperty("k2", "v2"); jmsBean.setJNDIProperties(jmsProps); - jmsBean.setTopicName("topic"); + jmsBean.setTopicPrefix("topicPrefix"); return jmsBean; } Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java Wed May 8 17:59:52 2013 @@ -337,22 +337,4 @@ public class TestV1JobServlet extends Da } }); } - - public void testJMSInfo() throws Exception { - runTest("/v1/job/*", V1JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() { - @Override - public Void call() throws Exception { - MockDagEngineService.reset(); - Map<String, String> params = new HashMap<String, String>(); - params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_JMS_INFO); - URL url = createURL(MockDagEngineService.JOB_ID + 1 + MockDagEngineService.JOB_ID_END, params); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("GET"); - assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode()); - assertEquals(RestConstants.JOB_SHOW_JMS_INFO, MockDagEngineService.did); - return null; - } - }); - } - } Modified: oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki (original) +++ oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki Wed May 8 17:59:52 2013 @@ -910,14 +910,32 @@ Assuming Oozie is runing at =OOZIE_URL=, * <OOZIE_URL>/v2/job * <OOZIE_URL>/v2/jobs -Please note that v1 and v2 are almost identical. -Only difference is the JSON format of Job Information API (*/job) particularly for map-reduce action. -No change for other actions. +*Changes in v2 job API:* +There is a difference in the JSON format of Job Information API (*/job) particularly for map-reduce action. +No change for other actions. In v1, externalId and consoleUrl point to spawned child job ID, and exteranlChildIDs is null in map-reduce action. In v2, externalId and consoleUrl point to launcher job ID, and exteranlChildIDs is spawned child job ID in map-reduce action. -v2/admin, v2/jobs remain the same with v1/admin, v1/jobs +v2 supports retrieving of JMS topic on which job notifications are sent + +*REST API URL:* + +<verbatim> +GET http://localhost:11000/oozie/v2/job/0000002-130507145349661-oozie-vira-W?show=jmstopic +</verbatim> + +*Changes in v2 admin API:* + +v2 adds support for retrieving JMS connection information related to JMS notifications. + +*REST API URL:* + +<verbatim> +GET http://localhost:11000/oozie/v2/admin/jmsinfo +</verbatim> + +v2/jobs remain the same as v1/jobs ---+++ Job and Jobs End-Points Modified: oozie/trunk/release-log.txt URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/release-log.txt (original) +++ oozie/trunk/release-log.txt Wed May 8 17:59:52 2013 @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1347 Additions to JMS topic API (virag) OOZIE-1231 Provide access to launcher job URL from web console when using Map Reduce action (ryota via virag) OOZIE-1335 The launcher job should use uber mode in Hadoop 2 by default (rkanter) OOZIE-1297 Add chgrp in FS action (ryota via virag) Modified: oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml URL: http://svn.apache.org/viewvc/oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml?rev=1480380&r1=1480379&r2=1480380&view=diff ============================================================================== --- oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml (original) +++ oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml Wed May 8 17:59:52 2013 @@ -49,6 +49,13 @@ </servlet> <servlet> + <servlet-name>v2admin</servlet-name> + <display-name>Oozie admin</display-name> + <servlet-class>org.apache.oozie.servlet.V2AdminServlet</servlet-class> + <load-on-startup>1</load-on-startup> + </servlet> + + <servlet> <servlet-name>callback</servlet-name> <display-name>Callback Notification</display-name> <servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class> @@ -114,7 +121,7 @@ </servlet-mapping> <servlet-mapping> - <servlet-name>v1admin</servlet-name> + <servlet-name>v2admin</servlet-name> <url-pattern>/v2/admin/*</url-pattern> </servlet-mapping>
