Repository: falcon Updated Branches: refs/heads/0.9 601929630 -> 634577c3d
FALCON-1750 Null Pointer Exception while listening to Workflow Notifi⦠â¦cations Author: Pallavi Rao <[email protected]> Reviewers: Sandeep Samudrala <[email protected]>, Pavan Kolamuri <[email protected]> Closes #17 from pallavi-rao/master and squashes the following commits: ae8dacb [Pallavi Rao] FALCON-1750 Addressed review comments 9e95e12 [Pallavi Rao] Merge remote-tracking branch 'upstream/master' b04519e [Pallavi Rao] FALCON-1750 Null Pointer Exception while listening to Workflow Notifications (cherry picked from commit ae08745f10cfbfa5df5a7f797ac04bded6daf3fa) Signed-off-by: Pallavi Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/634577c3 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/634577c3 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/634577c3 Branch: refs/heads/0.9 Commit: 634577c3da86b27e1f8ef34920ccbc08a39cad83 Parents: 6019296 Author: Pallavi Rao <[email protected]> Authored: Fri Jan 29 17:17:05 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Jan 29 17:18:01 2016 +0530 ---------------------------------------------------------------------- .../WorkflowJobEndNotificationService.java | 105 ++++++++++--------- .../WorkflowJobEndNotificationServiceTest.java | 4 +- .../falcon/messaging/JMSMessageConsumer.java | 2 +- .../messaging/JMSMessageConsumerTest.java | 7 ++ .../falcon/rerun/handler/RetryHandler.java | 4 + 5 files changed, 70 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/634577c3/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java index 630c56c..faea25c 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java @@ -21,10 +21,9 @@ package org.apache.falcon.workflow; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.service.FalconService; @@ -54,8 +53,6 @@ public class WorkflowJobEndNotificationService implements FalconService { // Maintain a cache of context built, so we don't have to query Oozie for every state change. private Map<String, Properties> contextMap = new ConcurrentHashMap<>(); - private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get(); - @Override public String getName() { return SERVICE_NAME; @@ -97,17 +94,19 @@ public class WorkflowJobEndNotificationService implements FalconService { listeners.remove(listener); } - public void notifyFailure(WorkflowExecutionContext context) { + public void notifyFailure(WorkflowExecutionContext context) throws FalconException { notifyWorkflowEnd(context); } - public void notifySuccess(WorkflowExecutionContext context) { + public void notifySuccess(WorkflowExecutionContext context) throws FalconException { notifyWorkflowEnd(context); } - public void notifyStart(WorkflowExecutionContext context) { + public void notifyStart(WorkflowExecutionContext context) throws FalconException { // Start notifications can only be from Oozie JMS notifications - updateContextFromWFConf(context); + if (!updateContextFromWFConf(context)) { + return; + } LOG.debug("Sending workflow start notification to listeners with context : {} ", context); for (WorkflowExecutionListener listener : listeners) { try { @@ -119,9 +118,11 @@ public class WorkflowJobEndNotificationService implements FalconService { } } - public void notifySuspend(WorkflowExecutionContext context) { + public void notifySuspend(WorkflowExecutionContext context) throws FalconException { // Suspend notifications can only be from Oozie JMS notifications - updateContextFromWFConf(context); + if (!updateContextFromWFConf(context)) { + return; + } LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context); for (WorkflowExecutionListener listener : listeners) { try { @@ -136,9 +137,13 @@ public class WorkflowJobEndNotificationService implements FalconService { contextMap.remove(context.getWorkflowId()); } - public void notifyWait(WorkflowExecutionContext context) { + public void notifyWait(WorkflowExecutionContext context) throws FalconException { // Wait notifications can only be from Oozie JMS notifications - updateContextFromWFConf(context); + + if (!updateContextFromWFConf(context)) { + return; + } + LOG.debug("Sending workflow wait notification to listeners with context : {} ", context); for (WorkflowExecutionListener listener : listeners) { try { @@ -152,48 +157,48 @@ public class WorkflowJobEndNotificationService implements FalconService { // The method retrieves the conf from the cache if it is in cache. // Else, queries WF Engine to retrieve the conf of the workflow - private void updateContextFromWFConf(WorkflowExecutionContext context) { - try { - Properties wfProps = contextMap.get(context.getWorkflowId()); - if (wfProps == null) { - Entity entity = CONFIG_STORE.get(EntityType.valueOf(context.getEntityType()), context.getEntityName()); - // Entity can be null in case of delete. Engine will generate notifications for instance kills. - // But, the entity would no longer be in the config store. - if (entity == null) { - return; - } - for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { - try { - InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity) - .getJobDetails(cluster, context.getWorkflowId()).getInstances(); - if (instances != null && instances.length > 0) { - wfProps = getWFProps(instances[0].getWfParams()); - // Required by RetryService. But, is not part of conf. - wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(), - Integer.toString(instances[0].getRunId())); - } - } catch (FalconException e) { - // Do Nothing. The workflow may not have been deployed on this cluster. - continue; + private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException { + Properties wfProps = contextMap.get(context.getWorkflowId()); + if (wfProps == null) { + Entity entity = null; + try { + entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName()); + } catch (EntityNotRegisteredException e) { + // Entity no longer exists. No need to notify. + LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.", + context.getEntityName(), context.getEntityType()); + contextMap.remove(context.getWorkflowId()); + return false; + } + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + try { + InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity) + .getJobDetails(cluster, context.getWorkflowId()).getInstances(); + if (instances != null && instances.length > 0) { + wfProps = getWFProps(instances[0].getWfParams()); + // Required by RetryService. But, is not part of conf. + wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(), + Integer.toString(instances[0].getRunId())); } - contextMap.put(context.getWorkflowId(), wfProps); + } catch (FalconException e) { + // Do Nothing. Move on to the next cluster. + continue; } + contextMap.put(context.getWorkflowId(), wfProps); } + } - // No extra props to enhance the context with. - if (wfProps == null || wfProps.isEmpty()) { - return; - } + // No extra props to enhance the context with. + if (wfProps == null || wfProps.isEmpty()) { + return true; + } - for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) { - if (wfProps.containsKey(arg.getName())) { - context.setValue(arg, wfProps.getProperty(arg.getName())); - } + for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) { + if (wfProps.containsKey(arg.getName())) { + context.setValue(arg, wfProps.getProperty(arg.getName())); } - - } catch (FalconException e) { - LOG.error("Unable to retrieve entity {} of type {} from config store.", e); } + return true; } private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) { @@ -205,7 +210,7 @@ public class WorkflowJobEndNotificationService implements FalconService { } // This method handles both success and failure notifications. - private void notifyWorkflowEnd(WorkflowExecutionContext context) { + private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException { // Need to distinguish notification from post processing for backward compatibility if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) { boolean engineNotifEnabled = false; @@ -224,7 +229,9 @@ public class WorkflowJobEndNotificationService implements FalconService { updateContextWithTime(context); } } else { - updateContextFromWFConf(context); + if (!updateContextFromWFConf(context)) { + return; + } } LOG.debug("Sending workflow end notification to listeners with context : {} ", context); http://git-wip-us.apache.org/repos/asf/falcon/blob/634577c3/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java index 1a9597b..9dd8f93 100644 --- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java +++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java @@ -127,11 +127,11 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL } - private void notifyFailure(WorkflowExecutionContext context) { + private void notifyFailure(WorkflowExecutionContext context) throws FalconException { service.notifyFailure(context); } - private void notifySuccess(WorkflowExecutionContext context) { + private void notifySuccess(WorkflowExecutionContext context) throws FalconException { service.notifySuccess(context); } http://git-wip-us.apache.org/repos/asf/falcon/blob/634577c3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index 2380e47..ccc2cfb 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -189,7 +189,7 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { return dateFormat.format(time); } - private void invokeListener(WorkflowExecutionContext context) { + private void invokeListener(WorkflowExecutionContext context) throws FalconException { // Login the user so listeners can access FS and WfEngine as this user CurrentUser.authenticate(context.getWorkflowUser()); http://git-wip-us.apache.org/repos/asf/falcon/blob/634577c3/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java index 5600746..5c53a3e 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java @@ -22,6 +22,9 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerView; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.util.FalconTestUtil; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; @@ -73,6 +76,9 @@ public class JMSMessageConsumerTest { BROKER_URL, TOPIC_NAME + "," + SECONDARY_TOPIC_NAME, jobEndService); subscriber.startSubscriber(); + Process mockProcess = new Process(); + mockProcess.setName("process1"); + ConfigurationStore.get().publish(EntityType.PROCESS, mockProcess); } public void sendMessages(String topic, WorkflowExecutionContext.Type type) @@ -278,6 +284,7 @@ public class JMSMessageConsumerTest { @AfterMethod public void tearDown() throws Exception{ + ConfigurationStore.get().remove(EntityType.PROCESS, "process1"); broker.stop(); subscriber.closeSubscriber(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/634577c3/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java index 84cd93f..e817a93 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java @@ -19,6 +19,7 @@ package org.apache.falcon.rerun.handler; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.Frequency; @@ -75,6 +76,9 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends "All retry attempt failed out of configured: " + attempts + " attempt for entity instance::"); } + } catch (EntityNotRegisteredException ee) { + LOG.warn("Entity {} of type {} doesn't exist in config store. Retry will be skipped.", + entityName, entityType); } catch (FalconException e) { LOG.error("Error during retry of entity instance {}:{}", entityName, nominalTime, e); GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,
