Repository: falcon Updated Branches: refs/heads/master dc3e729a8 -> bc58b55a1
FALCON-1888 Falcon JMS notification details and example Updates the operability document to enable Falcon user notifications and example code to read and Map Message from ActiveMQ. Author: Venkatesan Ramachandran <[email protected]> Reviewers: Sowmya<[email protected]>, Peeyush<[email protected]> Closes #101 from vramachan/master and squashes the following commits: 3429177 [Venkatesan Ramachandran] FALCON-1888 : Falcon JMS system and user notification details and sample code 766c130 [Venkatesan Ramachandran] FALCON-1888 : Falcon JMS system and user notification details and sample code a1fe26e [Venkatesan Ramachandran] git rebase from upstream master to ramachan fork master 3cfc4e0 [Venkatesan Ramachandran] FALCON-1888 : Falcon JMS details and usage fac28b1 [Venkatesan Ramachandran] Merge afa5173 [Venkatesan Ramachandran] FALCON-1838:Export instances are not added graph db for lineage tracking - remove call to findVertex(), refactor method name a886958 [Venkatesan Ramachandran] FALCON-1838:Export instances are not added graph db for lineage tracking Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bc58b55a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bc58b55a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bc58b55a Branch: refs/heads/master Commit: bc58b55a195a52a7656e2cbd813043676662ace6 Parents: dc3e729 Author: Venkatesan Ramachandran <[email protected]> Authored: Mon Apr 18 14:15:39 2016 +0530 Committer: peeyush b <[email protected]> Committed: Mon Apr 18 14:15:39 2016 +0530 ---------------------------------------------------------------------- docs/src/site/twiki/Operability.twiki | 147 +++++++++++++++++++++++++---- 1 file changed, 131 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/bc58b55a/docs/src/site/twiki/Operability.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Operability.twiki b/docs/src/site/twiki/Operability.twiki index 05850c1..616af36 100644 --- a/docs/src/site/twiki/Operability.twiki +++ b/docs/src/site/twiki/Operability.twiki @@ -72,22 +72,137 @@ or send alerts according to their requirements. ---++ Notifications -Falcon creates a JMS topic for every process/feed that is scheduled in Falcon. -The implementation class and the broker url of the JMS engine are read from the dependent cluster's definition. -Users may register consumers on the required topic to check the availability or status of feed instances. - -For a given process that is scheduled, the name of the topic is same as the process name. -Falcon sends a Map message for every feed produced by the instance of a process to the JMS topic. -The JMS !MapMessage sent to a topic has the following properties: -entityName, feedNames, feedInstancePath, workflowId, runId, nominalTime, timeStamp, brokerUrl, brokerImplClass, entityType, operation, logFile, topicName, status, brokerTTL; - -For a given feed that is scheduled, the name of the topic is same as the feed name. -Falcon sends a map message for every feed instance that is deleted/archived/replicated depending upon the retention policy set in the feed definition. -The JMS !MapMessage sent to a topic has the following properties: -entityName, feedNames, feedInstancePath, workflowId, runId, nominalTime, timeStamp, brokerUrl, brokerImplClass, entityType, operation, logFile, topicName, status, brokerTTL; - -The JMS messages are automatically purged after a certain period (default 3 days) by the Falcon JMS house-keeping service.TTL (Time-to-live) for JMS message -can be configured in the Falcon's startup.properties file. +Falcon has two types of notifications - System and User notifications. + +---+++ System notifications +The System notifications are internally generated and used by Falcon to monitor the Falcon orchestrated workflow jobs. +By default, Falcon starts an ActiveMQ embedded JMS server on Falcon machine on port 61616 as a daemon. Alternatively, +users can make Falcon to use an existing JMS server instead of starting an embedded instance by doing the +following 2 steps: + + * Setting the property broker.url in the startup.properties as below +<verbatim> + *.broker.url=tcp://jms-server-host:61616 +</verbatim> + * Set the system property falcon.embeddedmq to false as below +<verbatim> + <FALCON-INSTALL-DIR>/bin/falcon-start -Dfalcon.embeddedmq=false +</verbatim> + +Falcon uses FALCON.ENTITY.TOPIC to publish system notifications. This topic and the Map Message fields are internal +and could change between releases. + +---+++ User notifications + +Falcon, in addition to the FALCON.ENTITY.TOPIC, also creates a JMS topic for every process/feed that is scheduled in +Falcon as part of User notification. To enable User notifications, the broker url and implementation class of the JMS +engine need to be specified in the cluster definition associated with the feed/process. Users may register consumers +on the required topic to check the availability or status of feed instances. The User notification JMS broker instance +can be same as the System notification or different. + +The name of the JMS topic is same as the process/feed name. Falcon sends a map message for every feed instance that is +created/deleted/replicated/imported/exported to the JMS topic. The JMS Map Message sent to a topic has the following +fields: + + 1. cluster - name of the current cluster the feed/process is dependent on. + 1. entityType - type of the entity (feed or process). + 1. entityName - name of the entity. + 1. nominalTime - instance time (or data date). + 1. operation - operation like generate, delete, replicate, import, export. + 1. feedNames - name of the feeds which are generated/replicated/deleted/imported/exported. + 1. feedInstancePaths - comma separated feed instance paths. + 1. workflowId - current workflow-id of the instance. + 1. workflowUser - user who owns the feed instance (i.e partition). + 1. runId - current run-id of the instance. + 1. status - status of the user workflow instance. + 1. timeStamp - current timestamp. + 1. logDir - log dir where lineage can be recorded. + +The JMS messages are automatically purged after a certain period (default 3 days) by the Falcon JMS house-keeping +service. TTL (Time-to-live) for JMS message can be configured in the Falcon's startup.properties file. + +The following example shows how to enable and read user notification by connecting to the JMS broker. + +First, specify the JMS broker url in the cluster definition XML as shown below. + +<verbatim> + +<?xml version="1.0"?> +<!-- filename : primaryCluster.xml --> +<cluster colo="USWestOregon" description="oregonHadoopCluster" name="primaryCluster" xmlns="uri:falcon:cluster:0.1"> + <interfaces> + ... + ... + <interface type="messaging" endpoint="tcp://user-jms-broker-host:61616?daemon=true" version="5.1.6" /> + ... + </interfaces> +</cluster> + +</verbatim> + +Next, use a JMS consumer (example below in Java) to read the message from the topic with the name +FALCON.<feed-or-process-name> + +<verbatim> +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQMapMessage; +import javax.jms.ConnectionFactory; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Topic; +import javax.jms.Session; +import javax.jms.TopicSession; + +public class FalconUserJMSClient { + public static void main(String[] args)throws Exception { + // Note: specify the JMS broker URL + String brokerUrl = "tcp://localhost:61616"; + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); + Connection connection = connectionFactory.createConnection(); + connection.setClientID("Falcon User JMS Consumer"); + TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + + // Note: the topic name for the feed will be FALCON.<feed-name> + Topic falconTopic = session.createTopic("FALCON.feed-sample"); + MessageConsumer consumer = session.createConsumer(falconTopic); + connection.start(); + while (true) { + ActiveMQMapMessage msg = (ActiveMQMapMessage) consumer.receive(); + System.out.println("cluster : " + msg.getString("cluster")); + System.out.println("entityType : " + msg.getString("entityType")); + System.out.println("entityName : " + msg.getString("entityName")); + System.out.println("nominalTime : " + msg.getString("nominalTime")); + System.out.println("operation : " + msg.getString("operation")); + + System.out.println("feedNames : " + msg.getString("feedNames")); + System.out.println("feedInstancePaths : " + msg.getString("feedInstancePaths")); + + System.out.println("workflowId : " + msg.getString("workflowId")); + System.out.println("workflowUser : " + msg.getString("workflowUser")); + System.out.println("runId : " + msg.getString("runId")); + System.out.println("status : " + msg.getString("status")); + System.out.println("timeStamp : " + msg.getString("timeStamp")); + System.out.println("logDir : " + msg.getString("logDir")); + + System.out.println("brokerUrl : " + msg.getString("brokerUrl")); + System.out.println("brokerImplClass : " + msg.getString("brokerImplClass")); + System.out.println("logFile : " + msg.getString("logFile")); + System.out.println("topicName : " + msg.getString("topicName")); + System.out.println("brokerTTL : " + msg.getString("brokerTTL")); + } + } finally { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + } +} +</verbatim> ---++ Alerts
