Repository: oozie Updated Branches: refs/heads/master 88aa654d6 -> 61c646c33
OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/117153a9 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/117153a9 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/117153a9 Branch: refs/heads/master Commit: 117153a90f7a7b42fbdaa252b8ec7821e6ef3d29 Parents: 88aa654 Author: Gezapeti Cseh <gezap...@apache.org> Authored: Wed May 16 13:52:10 2018 +0200 Committer: Gezapeti Cseh <gezap...@apache.org> Committed: Wed May 16 13:52:10 2018 +0200 ---------------------------------------------------------------------- .../oozie/jms/TestJMSJobEventListener.java | 534 ++++++++----------- release-log.txt | 1 + 2 files changed, 226 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/117153a9/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java b/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java index f375dec..8e8db51 100644 --- a/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java +++ b/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java @@ -18,7 +18,6 @@ package org.apache.oozie.jms; -import java.text.ParseException; import java.util.Date; import java.util.Random; @@ -77,7 +76,7 @@ public class TestJMSJobEventListener extends XTestCase { } @Test - public void testOnWorkflowJobStartedEvent() throws ParseException { + public void testOnWorkflowJobStartedEvent() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -85,32 +84,26 @@ public class TestJMSJobEventListener extends XTestCase { "wf-app-name1", startDate, null); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - assertFalse(message.getText().contains("endTime")); - WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message); - assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus()); - assertEquals(startDate, wfStartMessage.getStartTime()); - assertEquals("wfId1", wfStartMessage.getId()); - assertEquals("caId1", wfStartMessage.getParentId()); - assertEquals(MessageType.JOB, wfStartMessage.getMessageType()); - assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType()); - assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus()); - assertEquals("user1", wfStartMessage.getUser()); - assertEquals("wf-app-name1", wfStartMessage.getAppName()); - wfEventListener.destroy(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + assertFalse(message.getText().contains("endTime")); + WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message); + assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus()); + assertEquals(startDate, wfStartMessage.getStartTime()); + assertEquals("wfId1", wfStartMessage.getId()); + assertEquals("caId1", wfStartMessage.getParentId()); + assertEquals(MessageType.JOB, wfStartMessage.getMessageType()); + assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType()); + assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus()); + assertEquals("user1", wfStartMessage.getUser()); + assertEquals("wf-app-name1", wfStartMessage.getAppName()); + wfEventListener.destroy(); } @Test - public void testOnWorkflowJobSuccessEvent() throws ParseException { + public void testOnWorkflowJobSuccessEvent() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -119,33 +112,26 @@ public class TestJMSJobEventListener extends XTestCase { "wf-app-name1", startDate, endDate); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message); - assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus()); - assertEquals(startDate, wfSuccMessage.getStartTime()); - assertEquals(endDate, wfSuccMessage.getEndTime()); - assertEquals("wfId1", wfSuccMessage.getId()); - assertEquals("caId1", wfSuccMessage.getParentId()); - assertEquals(MessageType.JOB, wfSuccMessage.getMessageType()); - assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType()); - assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus()); - assertEquals("user1", wfSuccMessage.getUser()); - assertEquals("wf-app-name1", wfSuccMessage.getAppName()); - wfEventListener.destroy(); - - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message); + assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus()); + assertEquals(startDate, wfSuccMessage.getStartTime()); + assertEquals(endDate, wfSuccMessage.getEndTime()); + assertEquals("wfId1", wfSuccMessage.getId()); + assertEquals("caId1", wfSuccMessage.getParentId()); + assertEquals(MessageType.JOB, wfSuccMessage.getMessageType()); + assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType()); + assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus()); + assertEquals("user1", wfSuccMessage.getUser()); + assertEquals("wf-app-name1", wfSuccMessage.getAppName()); + wfEventListener.destroy(); } @Test - public void testOnWorkflowJobFailureEvent() throws ParseException { + public void testOnWorkflowJobFailureEvent() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -155,171 +141,136 @@ public class TestJMSJobEventListener extends XTestCase { wfe.setErrorCode("dummyErrorCode"); wfe.setErrorMessage("dummyErrorMessage"); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); - assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); - assertEquals(startDate, wfFailMessage.getStartTime()); - assertEquals(endDate, wfFailMessage.getEndTime()); - assertEquals("wfId1", wfFailMessage.getId()); - assertEquals("caId1", wfFailMessage.getParentId()); - assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); - assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType()); - assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus()); - assertEquals("user1", wfFailMessage.getUser()); - assertEquals("wf-app-name1", wfFailMessage.getAppName()); - assertEquals("dummyErrorCode", wfFailMessage.getErrorCode()); - assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage()); - wfEventListener.destroy(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); + assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); + assertEquals(startDate, wfFailMessage.getStartTime()); + assertEquals(endDate, wfFailMessage.getEndTime()); + assertEquals("wfId1", wfFailMessage.getId()); + assertEquals("caId1", wfFailMessage.getParentId()); + assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); + assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType()); + assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus()); + assertEquals("user1", wfFailMessage.getUser()); + assertEquals("wf-app-name1", wfFailMessage.getAppName()); + assertEquals("dummyErrorCode", wfFailMessage.getErrorCode()); + assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage()); + wfEventListener.destroy(); } @Test - public void testOnWorkflowJobSuspendEvent() throws ParseException { + public void testOnWorkflowJobSuspendEvent() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1", "wf-app-name1", startDate, null); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - assertFalse(message.getText().contains("endTime")); - WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); - assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus()); - assertEquals(startDate, wfFailMessage.getStartTime()); - assertEquals("wfId1", wfFailMessage.getId()); - assertEquals("caId1", wfFailMessage.getParentId()); - assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); - assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType()); - assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus()); - assertEquals("user1", wfFailMessage.getUser()); - assertEquals("wf-app-name1", wfFailMessage.getAppName()); - assertNull(wfFailMessage.getErrorCode()); - assertNull(wfFailMessage.getErrorMessage()); - wfEventListener.destroy(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe)); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + assertFalse(message.getText().contains("endTime")); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); + assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus()); + assertEquals(startDate, wfFailMessage.getStartTime()); + assertEquals("wfId1", wfFailMessage.getId()); + assertEquals("caId1", wfFailMessage.getParentId()); + assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); + assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType()); + assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus()); + assertEquals("user1", wfFailMessage.getUser()); + assertEquals("wf-app-name1", wfFailMessage.getAppName()); + assertNull(wfFailMessage.getErrorCode()); + assertNull(wfFailMessage.getErrorMessage()); + wfEventListener.destroy(); } @Test - public void testWorkflowJobSelectors() throws ParseException { + public void testWorkflowJobSelectors() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1", "wf-app-name1", new Date(), new Date()); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - String selector = JMSHeaderConstants.USER + "='user_1'"; - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); - Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); - assertEquals("user_1", wfFailMessage.getUser()); - assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); - wfEventListener.destroy(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + String selector = JMSHeaderConstants.USER + "='user_1'"; + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); + Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); + assertEquals("user_1", wfFailMessage.getUser()); + assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); + wfEventListener.destroy(); } @Test - public void testWorkflowJobSelectorsNegative() { + public void testWorkflowJobSelectorsNegative() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", new Date(), new Date()); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - // Pass a selector which wont match and assert for null message - String selector = JMSHeaderConstants.USER + "='Non_matching_user'"; - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - assertNull(message); - wfEventListener.destroy(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + // Pass a selector which wont match and assert for null message + String selector = JMSHeaderConstants.USER + "='Non_matching_user'"; + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + assertNull(message); + wfEventListener.destroy(); } @Test - public void testWorkflowJobSelectorsOr() { + public void testWorkflowJobSelectorsOr() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", new Date(), new Date()); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - // Pass a selector using OR condition - String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME - + "='wf-app-name1'"; - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); - Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); - assertEquals("user1", wfFailMessage.getUser()); - assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); - wfEventListener.destroy(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + // Pass a selector using OR condition + String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME + + "='wf-app-name1'"; + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); + Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); + assertEquals("user1", wfFailMessage.getUser()); + assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); + wfEventListener.destroy(); } @Test - public void testWorkflowJobSelectorsAnd() { + public void testWorkflowJobSelectorsAnd() throws Exception { JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1", "wf-app-name1", new Date(), new Date()); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - // Pass a selector using AND condition - String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE - + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'"; - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); - wfEventListener.onWorkflowJobEvent(wfe); - TextMessage message = (TextMessage) consumer.receive(5000); - WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); - Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); - assertEquals("user1", wfFailMessage.getUser()); - assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); - wfEventListener.destroy(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + // Pass a selector using AND condition + String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE + + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'"; + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector); + wfEventListener.onWorkflowJobEvent(wfe); + TextMessage message = (TextMessage) consumer.receive(5000); + WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message); + Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus()); + assertEquals("user1", wfFailMessage.getUser()); + assertEquals(MessageType.JOB, wfFailMessage.getMessageType()); + wfEventListener.destroy(); } @Test - public void testConnectionDrop() { + public void testConnectionDrop() throws Exception { Random random = new Random(); + BrokerService broker = null; try { services.destroy(); services = new Services(); @@ -334,7 +285,7 @@ public class TestJMSJobEventListener extends XTestCase { services.init(); JMSJobEventListener wfEventListener = new JMSJobEventListener(); wfEventListener.init(conf); - BrokerService broker = new BrokerService(); + broker = new BrokerService(); broker.setDataDirectory(getTestCaseDir()); broker.addConnector(brokerURl); broker.start(); @@ -360,15 +311,16 @@ public class TestJMSJobEventListener extends XTestCase { broker.stop(); wfEventListener.destroy(); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + finally { + if (broker != null) { + broker.stop(); + } } } @Test - public void testOnCoordinatorActionWaitingEvent() throws ParseException { + public void testOnCoordinatorActionWaitingEvent() throws Exception { JMSJobEventListener wfEventListner = new JMSJobEventListener(); wfEventListner.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -376,36 +328,30 @@ public class TestJMSJobEventListener extends XTestCase { CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING, "user1", "wf-app-name1", nominalTime, startDate, "missingDep1"); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae)); - wfEventListner.onCoordinatorActionEvent(cae); - TextMessage message = (TextMessage) consumer.receive(5000); - assertFalse(message.getText().contains("endTime")); - assertFalse(message.getText().contains("errorCode")); - assertFalse(message.getText().contains("errorMessage")); - CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils - .getEventMessage(message); - assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus()); - assertEquals(startDate, coordActionWaitingMessage.getStartTime()); - assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime()); - assertEquals("caJobId1", coordActionWaitingMessage.getParentId()); - assertEquals("caId1", coordActionWaitingMessage.getId()); - assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType()); - assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType()); - assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus()); - assertEquals("user1", coordActionWaitingMessage.getUser()); - assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName()); - assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae)); + wfEventListner.onCoordinatorActionEvent(cae); + TextMessage message = (TextMessage) consumer.receive(5000); + assertFalse(message.getText().contains("endTime")); + assertFalse(message.getText().contains("errorCode")); + assertFalse(message.getText().contains("errorMessage")); + CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils + .getEventMessage(message); + assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus()); + assertEquals(startDate, coordActionWaitingMessage.getStartTime()); + assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime()); + assertEquals("caJobId1", coordActionWaitingMessage.getParentId()); + assertEquals("caId1", coordActionWaitingMessage.getId()); + assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType()); + assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType()); + assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus()); + assertEquals("user1", coordActionWaitingMessage.getUser()); + assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName()); + assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency()); } @Test - public void testOnCoordinatorActionStartEvent() throws ParseException { + public void testOnCoordinatorActionStartEvent() throws Exception { JMSJobEventListener coordEventListener = new JMSJobEventListener(); coordEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -413,35 +359,29 @@ public class TestJMSJobEventListener extends XTestCase { CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING, "user1", "wf-app-name1", nominalTime, startDate, null); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae)); - coordEventListener.onCoordinatorActionEvent(cae); - TextMessage message = (TextMessage) consumer.receive(5000); - assertFalse(message.getText().contains("endTime")); - assertFalse(message.getText().contains("errorCode")); - assertFalse(message.getText().contains("errorMessage")); - assertFalse(message.getText().contains("missingDependency")); - CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils - .getEventMessage(message); - assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus()); - assertEquals(startDate, coordActionStartMessage.getStartTime()); - assertEquals("caJobId1", coordActionStartMessage.getParentId()); - assertEquals("caId1", coordActionStartMessage.getId()); - assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType()); - assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType()); - assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus()); - assertEquals("user1", coordActionStartMessage.getUser()); - assertEquals("wf-app-name1", coordActionStartMessage.getAppName()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae)); + coordEventListener.onCoordinatorActionEvent(cae); + TextMessage message = (TextMessage) consumer.receive(5000); + assertFalse(message.getText().contains("endTime")); + assertFalse(message.getText().contains("errorCode")); + assertFalse(message.getText().contains("errorMessage")); + assertFalse(message.getText().contains("missingDependency")); + CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils + .getEventMessage(message); + assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus()); + assertEquals(startDate, coordActionStartMessage.getStartTime()); + assertEquals("caJobId1", coordActionStartMessage.getParentId()); + assertEquals("caId1", coordActionStartMessage.getId()); + assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType()); + assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType()); + assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus()); + assertEquals("user1", coordActionStartMessage.getUser()); + assertEquals("wf-app-name1", coordActionStartMessage.getAppName()); } @Test - public void testOnCoordinatorJobSuccessEvent() throws ParseException { + public void testOnCoordinatorJobSuccessEvent() throws Exception { JMSJobEventListener coordEventListener = new JMSJobEventListener(); coordEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -451,35 +391,29 @@ public class TestJMSJobEventListener extends XTestCase { CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", nominalTime, startDate, null); cae.setEndTime(endDate); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae)); - coordEventListener.onCoordinatorActionEvent(cae); - TextMessage message = (TextMessage) consumer.receive(5000); - assertFalse(message.getText().contains("errorCode")); - assertFalse(message.getText().contains("errorMessage")); - assertFalse(message.getText().contains("missingDependency")); - CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils - .getEventMessage(message); - assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus()); - assertEquals(startDate, coordActionSuccessMessage.getStartTime()); - assertEquals(endDate, coordActionSuccessMessage.getEndTime()); - assertEquals("caJobId1", coordActionSuccessMessage.getParentId()); - assertEquals("caId1", coordActionSuccessMessage.getId()); - assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType()); - assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType()); - assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus()); - assertEquals("user1", coordActionSuccessMessage.getUser()); - assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae)); + coordEventListener.onCoordinatorActionEvent(cae); + TextMessage message = (TextMessage) consumer.receive(5000); + assertFalse(message.getText().contains("errorCode")); + assertFalse(message.getText().contains("errorMessage")); + assertFalse(message.getText().contains("missingDependency")); + CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils + .getEventMessage(message); + assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus()); + assertEquals(startDate, coordActionSuccessMessage.getStartTime()); + assertEquals(endDate, coordActionSuccessMessage.getEndTime()); + assertEquals("caJobId1", coordActionSuccessMessage.getParentId()); + assertEquals("caId1", coordActionSuccessMessage.getId()); + assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType()); + assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType()); + assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus()); + assertEquals("user1", coordActionSuccessMessage.getUser()); + assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName()); } @Test - public void testOnCoordinatorJobFailureEvent() throws ParseException { + public void testOnCoordinatorJobFailureEvent() throws Exception { JMSJobEventListener coordEventListener = new JMSJobEventListener(); coordEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -491,35 +425,29 @@ public class TestJMSJobEventListener extends XTestCase { cae.setErrorCode("E0101"); cae.setErrorMessage("dummyError"); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae)); - coordEventListener.onCoordinatorActionEvent(cae); - TextMessage message = (TextMessage) consumer.receive(5000); - assertFalse(message.getText().contains("missingDependency")); - CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils - .getEventMessage(message); - assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus()); - assertEquals(startDate, coordActionFailMessage.getStartTime()); - assertEquals(endDate, coordActionFailMessage.getEndTime()); - assertEquals("caJobId1", coordActionFailMessage.getParentId()); - assertEquals("caId1", coordActionFailMessage.getId()); - assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType()); - assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType()); - assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus()); - assertEquals("user1", coordActionFailMessage.getUser()); - assertEquals("wf-app-name1", coordActionFailMessage.getAppName()); - assertEquals("E0101", coordActionFailMessage.getErrorCode()); - assertEquals("dummyError", coordActionFailMessage.getErrorMessage()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae)); + coordEventListener.onCoordinatorActionEvent(cae); + TextMessage message = (TextMessage) consumer.receive(5000); + assertFalse(message.getText().contains("missingDependency")); + CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils + .getEventMessage(message); + assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus()); + assertEquals(startDate, coordActionFailMessage.getStartTime()); + assertEquals(endDate, coordActionFailMessage.getEndTime()); + assertEquals("caJobId1", coordActionFailMessage.getParentId()); + assertEquals("caId1", coordActionFailMessage.getId()); + assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType()); + assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType()); + assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus()); + assertEquals("user1", coordActionFailMessage.getUser()); + assertEquals("wf-app-name1", coordActionFailMessage.getAppName()); + assertEquals("E0101", coordActionFailMessage.getErrorCode()); + assertEquals("dummyError", coordActionFailMessage.getErrorMessage()); } @Test - public void testCoordinatorActionSelectors() throws ParseException { + public void testCoordinatorActionSelectors() throws Exception { JMSJobEventListener coordEventListener = new JMSJobEventListener(); coordEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -527,26 +455,20 @@ public class TestJMSJobEventListener extends XTestCase { CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED, "user1", "wf-app-name1", nominalTime, startDate, null); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - String selector = JMSHeaderConstants.USER + "='user1'"; - MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector); - coordEventListener.onCoordinatorActionEvent(cae); - TextMessage message = (TextMessage) consumer.receive(5000); - CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils - .getEventMessage(message); - Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus()); - assertEquals("user1", coordActionFailMessage.getUser()); - assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + String selector = JMSHeaderConstants.USER + "='user1'"; + MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector); + coordEventListener.onCoordinatorActionEvent(cae); + TextMessage message = (TextMessage) consumer.receive(5000); + CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils + .getEventMessage(message); + Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus()); + assertEquals("user1", coordActionFailMessage.getUser()); + assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType()); } @Test - public void testCoordinatorActionSelectorsNegative() throws ParseException { + public void testCoordinatorActionSelectorsNegative() throws Exception { JMSJobEventListener coordEventListener = new JMSJobEventListener(); coordEventListener.init(conf); Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z"); @@ -554,19 +476,13 @@ public class TestJMSJobEventListener extends XTestCase { CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED, "user1", "wf-app-name1", nominalTime, startDate, null); ConnectionContext jmsContext = getConnectionContext(); - try { - Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); - // Pass a selector which wont match and assert for null message - String selector = JMSHeaderConstants.USER + "='Non_matching_user'"; - MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector); - coordEventListener.onCoordinatorActionEvent(cae); - TextMessage message = (TextMessage) consumer.receive(5000); - assertNull(message); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE); + // Pass a selector which wont match and assert for null message + String selector = JMSHeaderConstants.USER + "='Non_matching_user'"; + MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector); + coordEventListener.onCoordinatorActionEvent(cae); + TextMessage message = (TextMessage) consumer.receive(5000); + assertNull(message); } private ConnectionContext getConnectionContext() { http://git-wip-us.apache.org/repos/asf/oozie/blob/117153a9/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6dbe922..060f4fb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti) OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti) OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros) OOZIE-3217 Enable definition of admin users using oozie-site.xml (orova via andras.piros)