Repository: falcon
Updated Branches:
  refs/heads/0.8 94b755615 -> 243a7dcad


FALCON-1564 Provide an option for users to disable system post-processing JMS 
notification


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/243a7dca
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/243a7dca
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/243a7dca

Branch: refs/heads/0.8
Commit: 243a7dcad3e884a7502b48058bb84ebc001eed6d
Parents: 94b7556
Author: Pallavi Rao <[email protected]>
Authored: Fri Oct 30 17:54:08 2015 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Fri Oct 30 17:54:08 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../falcon/workflow/WorkflowExecutionArgs.java  |  1 +
 .../WorkflowJobEndNotificationService.java      |  3 +-
 docs/src/site/twiki/Configuration.twiki         |  6 +++-
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  7 +++-
 .../falcon/workflow/FalconPostProcessing.java   | 15 +++++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  2 +-
 .../src/main/resources/action/post-process.xml  |  2 ++
 .../workflow/FalconPostProcessingTest.java      | 34 ++++++++++++++++++++
 src/conf/runtime.properties                     |  4 +++
 10 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d57a8ec..898f8e0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,8 @@ Release version: 0.8
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1564 Provide an option for users to disable system post-processing 
JMS notification (Pallavi Rao)
+
     FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal via 
Pallavi Rao)
 
     FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index ac7140c..4318620 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -77,6 +77,7 @@ public enum WorkflowExecutionArgs {
     USER_BRKR_URL("userBrokerUrl", "user broker url", false),
     BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
     USER_JMS_NOTIFICATION_ENABLED("userJMSNotificationEnabled", "Is User 
notification via JMS enabled?", false),
+    SYSTEM_JMS_NOTIFICATION_ENABLED("systemJMSNotificationEnabled", "Is system 
notification via JMS enabled?", false),
 
     // state maintained
     LOG_FILE("logFile", "log file path where feeds to be deleted are 
recorded", false),

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 5c75f5c..9d96fa3 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -214,7 +214,8 @@ public class WorkflowJobEndNotificationService implements 
FalconService {
                 engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine()
                         .isNotificationEnabled(context.getClusterName(), 
context.getWorkflowId());
             } catch (FalconException e) {
-                LOG.debug("Unable to determine if the notification is enabled 
on the wf engine. Assuming not.", e);
+                LOG.debug("Received error while checking if notification is 
enabled. "
+                        + "Hence, assuming notification is not enabled.");
             }
             // Ignore the message from post processing as there will be one 
more from Oozie.
             if (engineNotifEnabled) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki 
b/docs/src/site/twiki/Configuration.twiki
index 37b5717..743ce40 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -86,7 +86,6 @@ and change it to look as below
 export FALCON_SERVER_OPTS="-Djava.awt.headless=true 
-Djava.security.krb5.realm= -Djava.security.krb5.kdc="
 </verbatim>
 
-
 ---+++Activemq
 
 * falcon server starts embedded active mq. To control this behaviour, set the 
following system properties using -D
@@ -95,6 +94,11 @@ option in environment variable FALCON_OPTS:
    * falcon.embeddedmq.port=<port> - Port for embedded active mq, default 61616
    * falcon.embeddedmq.data=<path> - Data path for embedded active mq, default 
{package dir}/logs/data
 
+---+++Falcon System Notifications
+Some Falcon features such as late data handling, retries, metadata service, 
depend on JMS notifications sent when the Oozie workflow completes. These 
system notifications are sent as part of Falcon Post Processing action. Given 
that the post processing action is also a job, it is prone to failures and in 
case of failures, Falcon is blind to the status of the workflow. To alleviate 
this problem and make the notifications more reliable, you can enable Oozie's 
JMS notification feature and disable Falcon post-processing notification by 
making the following changes:
+   * In Falcon runtime.properties, set *.falcon.jms.notification.enabled to 
false. This will turn off JMS notification in post-processing.
+   * Copy notification related properties in oozie/conf/oozie-site.xml to 
oozie-site.xml of the Oozie installation.  Restart Oozie so changes get 
reflected.  
+
 ---+++Adding Extension Libraries
 
 Library extensions allows users to add custom libraries to entity lifecycles 
such as feed retention, feed replication

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index a04ae95..acb49ef 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -33,6 +33,7 @@ import 
org.apache.falcon.oozie.feed.FeedReplicationCoordinatorBuilder;
 import org.apache.falcon.oozie.feed.FeedRetentionCoordinatorBuilder;
 import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
 import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClient;
@@ -49,7 +50,9 @@ public abstract class OozieCoordinatorBuilder<T extends 
Entity> extends OozieEnt
     protected static final String NOMINAL_TIME_EL = 
"${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
     protected static final String ACTUAL_TIME_EL = 
"${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
 
-    private static final Object USER_JMS_NOTIFICATION_ENABLED = 
"userJMSNotificationEnabled";
+    private static final String USER_JMS_NOTIFICATION_ENABLED = 
"userJMSNotificationEnabled";
+    private static final String SYSTEM_JMS_NOTIFICATION_ENABLED = 
"systemJMSNotificationEnabled";
+
     protected final LifeCycle lifecycle;
 
     public OozieCoordinatorBuilder(T entity, LifeCycle lifecycle) {
@@ -111,6 +114,8 @@ public abstract class OozieCoordinatorBuilder<T extends 
Entity> extends OozieEnt
             new ExternalId(entity.getName(), 
EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());
         props.put(USER_JMS_NOTIFICATION_ENABLED, "true");
+        props.put(SYSTEM_JMS_NOTIFICATION_ENABLED,
+                
RuntimeProperties.get().getProperty("falcon.jms.notification.enabled", "true"));
 
         return props;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java 
b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4961896..3bdfe73 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -48,6 +48,14 @@ public class FalconPostProcessing extends Configured 
implements Tool {
         // serialize the context to HDFS under logs dir before sending the 
message
         context.serialize();
 
+        boolean systemNotificationEnabled = Boolean.parseBoolean(context.
+                
getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));
+
+        if (systemNotificationEnabled) {
+            LOG.info("Sending Falcon message {} ", context);
+            invokeFalconMessageProducer(context);
+        }
+
         String userBrokerUrl = 
context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
         boolean userNotificationEnabled = Boolean.parseBoolean(context.
                 getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, 
"true"));
@@ -72,6 +80,13 @@ public class FalconPostProcessing extends Configured 
implements Tool {
         
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
+    private void invokeFalconMessageProducer(WorkflowExecutionContext context) 
throws Exception {
+        JMSMessageProducer jmsMessageProducer = 
JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.FALCON)
+                .build();
+        jmsMessageProducer.sendMessage();
+    }
+
     private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 09c29ab..7262964 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -604,7 +604,7 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
                 }
             }
         } catch (OozieClientException e) {
-            LOG.error("Error while retrieving JMS connection info", e);
+            LOG.debug("Error while retrieving JMS connection info", e);
         }
 
         return false;

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml 
b/oozie/src/main/resources/action/post-process.xml
index df0d286..0f51df7 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -62,6 +62,8 @@
         <arg>${userBrokerUrl}</arg>
         <arg>-userJMSNotificationEnabled</arg>
         <arg>${userJMSNotificationEnabled}</arg>
+        <arg>-systemJMSNotificationEnabled</arg>
+        <arg>${systemJMSNotificationEnabled}</arg>
         <arg>-brokerTTL</arg>
         <arg>${brokerTTL}</arg>
         <arg>-feedNames</arg>

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
 
b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 4b74368..171068a 100644
--- 
a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ 
b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -55,6 +55,7 @@ public class FalconPostProcessingTest {
     private String[] outputFeedNames = {"out-click-logs", "out-raw-logs"};
     private String[] outputFeedPaths = {"/out-click-logs/10/05/05/00/20", 
"/out-raw-logs/10/05/05/00/20"};
     private String userNotification = "true";
+    private String systemNotification = "true";
 
     @BeforeClass
     public void setup() throws Exception {
@@ -71,6 +72,7 @@ public class FalconPostProcessingTest {
             "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), 
BROKER_IMPL_CLASS,
             "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), userBrokerUrl,
             "-" + WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, 
userNotification,
+            "-" + WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, 
systemNotification,
             "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), 
BROKER_IMPL_CLASS,
             "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
             "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
@@ -163,6 +165,38 @@ public class FalconPostProcessingTest {
         }
     }
 
+    @Test
+    public void testSystemMessage() throws Exception {
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    // falcon message [FALCON_TOPIC_NAME]
+                    consumer(BROKER_URL, "FALCON.>", false);
+                } catch (AssertionError e) {
+                    error = e;
+                } catch (JMSException ignore) {
+                    error = null;
+                }
+            }
+        };
+        t.start();
+
+        systemNotification = "false";
+        latch.await();
+        new FalconPostProcessing().run(this.args);
+        t.join();
+
+        systemNotification = "true";
+        latch.await();
+        new FalconPostProcessing().run(this.args);
+        t.join();
+
+        if (error != null) {
+            throw error;
+        }
+    }
+
     private void consumer(String brokerUrl, String topic, boolean 
checkUserMessage) throws JMSException {
         ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/src/conf/runtime.properties
----------------------------------------------------------------------
diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties
index 1260f55..25333fe 100644
--- a/src/conf/runtime.properties
+++ b/src/conf/runtime.properties
@@ -44,6 +44,10 @@ falcon.current.colo=local
 # If true, Falcon skips oozie dryrun while scheduling entities.
 *.falcon.skip.dryrun=false
 
+# If set to false, the post processing action of Oozie workflows will not 
generate JMS notification for Falcon.
+# If you are setting this to false, ensure you have enable Oozie JMS 
notifications in oozie-site.xml
+*.falcon.jms.notification.enabled=true
+
 ######### Proxyuser Configuration Start #########
 
 #List of hosts the '#USER#' user is allowed to perform 'doAs 'operations from. 
The '#USER#' must be replaced with the

Reply via email to