Repository: falcon Updated Branches: refs/heads/master 618f717d3 -> e5698fad3
FALCON-1031 Make post processing notifications to user topics optional. Contributed by Pallavi Rao Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e5698fad Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e5698fad Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e5698fad Branch: refs/heads/master Commit: e5698fad33c3004fed7ab4d02e82b2a993d525e6 Parents: 618f717 Author: Ajay Yadava <[email protected]> Authored: Wed Jul 8 10:38:38 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Wed Jul 8 10:38:38 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/entity/ClusterHelper.java | 4 +- .../entity/parser/ClusterEntityParser.java | 12 ++++- .../falcon/workflow/WorkflowExecutionArgs.java | 1 + .../entity/parser/ClusterEntityParserTest.java | 13 ++++++ .../config/cluster/cluster-no-messaging.xml | 38 +++++++++++++++ docs/src/site/twiki/EntitySpecification.twiki | 4 +- .../falcon/oozie/OozieCoordinatorBuilder.java | 2 + .../falcon/workflow/FalconPostProcessing.java | 13 ++++-- .../src/main/resources/action/post-process.xml | 2 + .../workflow/FalconPostProcessingTest.java | 49 ++++++++++++++++++-- 11 files changed, 130 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d2d589e..132e064 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ Trunk (Unreleased) FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava) IMPROVEMENTS + FALCON-1031 Make post processing notifications to user topics optional (Pallavi Rao via Ajay Yadava) + FALCON-1186 Add filtering capability to result of instance summary (Suhas Vasu) FALCON-1293 Update CHANGES.txt to change 0.6.1 branch to release (Shaik Idris Ali via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java index 49d408f..87b0fba 100644 --- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java @@ -41,6 +41,7 @@ import java.util.Map; public final class ClusterHelper { public static final String DEFAULT_BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory"; public static final String WORKINGDIR = "working"; + public static final String NO_USER_BROKER_URL = "NA"; private ClusterHelper() { } @@ -90,7 +91,8 @@ public final class ClusterHelper { } public static String getMessageBrokerUrl(Cluster cluster) { - return getInterface(cluster, Interfacetype.MESSAGING).getEndpoint(); + final Interface messageInterface = getInterface(cluster, Interfacetype.MESSAGING); + return messageInterface == null ? NO_USER_BROKER_URL : messageInterface.getEndpoint(); } public static String getMessageBrokerImplClass(Cluster cluster) { http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java index 5c3ce4f..59b0910 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java @@ -64,7 +64,10 @@ public class ClusterEntityParser extends EntityParser<Cluster> { validateScheme(cluster, Interfacetype.READONLY); validateScheme(cluster, Interfacetype.WRITE); validateScheme(cluster, Interfacetype.WORKFLOW); - validateScheme(cluster, Interfacetype.MESSAGING); + // User may choose to disable job completion notifications + if (ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING) != null) { + validateScheme(cluster, Interfacetype.MESSAGING); + } if (CatalogServiceFactory.isEnabled() && ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) { validateScheme(cluster, Interfacetype.REGISTRY); @@ -154,6 +157,13 @@ public class ClusterEntityParser extends EntityParser<Cluster> { } protected void validateMessagingInterface(Cluster cluster) throws ValidationException { + // Validate only if user has specified this + final Interface messagingInterface = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING); + if (messagingInterface == null) { + LOG.info("Messaging service is not enabled for cluster: {}", cluster.getName()); + return; + } + final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster); final String implementation = StartupProperties.get().getProperty("broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory"); http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java index 0a8be64..9456fb9 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java @@ -73,6 +73,7 @@ public enum WorkflowExecutionArgs { USER_BRKR_IMPL_CLASS("userBrokerImplClass", "user broker Impl class", false), USER_BRKR_URL("userBrokerUrl", "user broker url", false), BRKR_TTL("brokerTTL", "time to live for broker message in sec", false), + USER_JMS_NOTIFICATION_ENABLED("userJMSNotificationEnabled", "Is User notification via JMS enabled?", false), // state maintained LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false), http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java index b7886bd..638cef9 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java @@ -124,6 +124,19 @@ public class ClusterEntityParserTest extends AbstractTestBase { } @Test + public void testParseClusterWithoutMessaging() throws FalconException { + InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-messaging.xml"); + + // Parse should be successful + Cluster cluster = parser.parse(stream); + + Interface messaging = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING); + Assert.assertNull(messaging); + + Assert.assertEquals(ClusterHelper.getMessageBrokerUrl(cluster), ClusterHelper.NO_USER_BROKER_URL); + } + + @Test public void testParseClusterWithBadRegistry() throws Exception { // disable catalog service StartupProperties.get().remove(CatalogServiceFactory.CATALOG_SERVICE); http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/test/resources/config/cluster/cluster-no-messaging.xml ---------------------------------------------------------------------- diff --git a/common/src/test/resources/config/cluster/cluster-no-messaging.xml b/common/src/test/resources/config/cluster/cluster-no-messaging.xml new file mode 100644 index 0000000..93e94cb --- /dev/null +++ b/common/src/test/resources/config/cluster/cluster-no-messaging.xml @@ -0,0 +1,38 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +<cluster colo="default" description="" name="testCluster" xmlns="uri:falcon:cluster:0.1"> + <tags>[email protected], [email protected], department=forecasting</tags> + <interfaces> + <interface type="readonly" endpoint="hftp://localhost:50010" + version="0.20.2"/> + <interface type="write" endpoint="jail://testCluster:00" + version="0.20.2"/> + <interface type="execute" endpoint="localhost:8021" version="0.20.2"/> + <interface type="workflow" endpoint="http://localhost:11000/oozie/" + version="4.0"/> + <interface type="registry" endpoint="http://localhost:48080/templeton/v1" + version="0.11.0"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> +</cluster> http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/docs/src/site/twiki/EntitySpecification.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 1ed2cb5..0c1fae2 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -657,10 +657,12 @@ Syntax: </process> </verbatim> -queueName and jobPriority are special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job. +The following are some special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job. <verbatim> <property name="queueName" value="hadoopQueue"/> <property name="jobPriority" value="VERY_HIGH"/> + <!-- This property is used to turn off JMS notifications for this process. JMS notifications are enabled by default. --> + <property name="userJMSNotificationEnabled" value="false"/> </verbatim> ---+++ Workflow http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java index e5d75fb..92697b0 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java @@ -57,6 +57,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt protected static final String MR_JOB_PRIORITY = "jobPriority"; protected static final String IGNORE = "IGNORE"; + private static final Object USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled"; protected final LifeCycle lifecycle; public OozieCoordinatorBuilder(T entity, LifeCycle lifecycle) { @@ -132,6 +133,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt props.put(MR_QUEUE_NAME, "default"); props.put(MR_JOB_PRIORITY, "NORMAL"); + props.put(USER_JMS_NOTIFICATION_ENABLED, "true"); //props in entity override the set props. props.putAll(getEntityProperties(entity)); http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java index e5b3704..7557153 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.workflow; +import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.logging.JobLogMover; import org.apache.falcon.messaging.JMSMessageProducer; import org.apache.hadoop.conf.Configuration; @@ -39,15 +40,21 @@ public class FalconPostProcessing extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - WorkflowExecutionContext context = WorkflowExecutionContext.create(args, WorkflowExecutionContext.Type.POST_PROCESSING); LOG.info("Post workflow execution context created {}", context); // serialize the context to HDFS under logs dir before sending the message context.serialize(); - LOG.info("Sending user message {} ", context); - invokeUserMessageProducer(context); + String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL); + boolean userNotificationEnabled = Boolean.parseBoolean(context. + getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true")); + + if (userBrokerUrl != null && !userBrokerUrl.equals(ClusterHelper.NO_USER_BROKER_URL) + && userNotificationEnabled) { + LOG.info("Sending user message {} ", context); + invokeUserMessageProducer(context); + } // JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow LOG.info("Moving logs {}", context); http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/main/resources/action/post-process.xml ---------------------------------------------------------------------- diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml index f354351..df0d286 100644 --- a/oozie/src/main/resources/action/post-process.xml +++ b/oozie/src/main/resources/action/post-process.xml @@ -60,6 +60,8 @@ <arg>${userBrokerImplClass}</arg> <arg>-userBrokerUrl</arg> <arg>${userBrokerUrl}</arg> + <arg>-userJMSNotificationEnabled</arg> + <arg>${userJMSNotificationEnabled}</arg> <arg>-brokerTTL</arg> <arg>${brokerTTL}</arg> <arg>-feedNames</arg> http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java index d59c5e6..0a9aaa0 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java @@ -19,6 +19,7 @@ package org.apache.falcon.oozie.workflow; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.workflow.FalconPostProcessing; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.commons.lang3.StringUtils; @@ -45,12 +46,14 @@ public class FalconPostProcessingTest { private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true"; private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory"; private static final String ENTITY_NAME = "agg-coord"; + private String userBrokerUrl = BROKER_URL; private BrokerService broker; private volatile AssertionError error; private CountDownLatch latch = new CountDownLatch(1); private String[] outputFeedNames = {"out-click-logs", "out-raw-logs"}; private String[] outputFeedPaths = {"/out-click-logs/10/05/05/00/20", "/out-raw-logs/10/05/05/00/20"}; + private String userNotification = "true"; @BeforeClass public void setup() throws Exception { @@ -65,7 +68,8 @@ public class FalconPostProcessingTest { "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00", "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL, "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS, - "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), BROKER_URL, + "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), userBrokerUrl, + "-" + WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, userNotification, "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS, "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process", "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE", @@ -105,7 +109,7 @@ public class FalconPostProcessingTest { public void run() { try { // falcon message [FALCON_TOPIC_NAME] and user message ["FALCON." + ENTITY_NAME] - consumer(BROKER_URL, "FALCON.>"); + consumer(BROKER_URL, "FALCON.>", true); } catch (AssertionError e) { error = e; } catch (JMSException ignore) { @@ -115,15 +119,50 @@ public class FalconPostProcessingTest { }; t.start(); + userBrokerUrl = BROKER_URL; + + latch.await(); + new FalconPostProcessing().run(args); + t.join(); + if (error != null) { + throw error; + } + } + + @Test + public void testNoUserMessage() throws Exception { + Thread t = new Thread() { + @Override + public void run() { + try { + // falcon message [FALCON_TOPIC_NAME] and user message ["FALCON." + ENTITY_NAME] + consumer(BROKER_URL, "FALCON.>", false); + } catch (AssertionError e) { + error = e; + } catch (JMSException ignore) { + error = null; + } + } + }; + t.start(); + + userNotification = "false"; latch.await(); new FalconPostProcessing().run(this.args); t.join(); + + userNotification = "true"; + userBrokerUrl = ClusterHelper.NO_USER_BROKER_URL; + latch.await(); + new FalconPostProcessing().run(this.args); + t.join(); + if (error != null) { throw error; } } - private void consumer(String brokerUrl, String topic) throws JMSException { + private void consumer(String brokerUrl, String topic, boolean checkUserMessage) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); Connection connection = connectionFactory.createConnection(); connection.start(); @@ -135,7 +174,9 @@ public class FalconPostProcessingTest { latch.countDown(); // Verify user message - verifyMesssage(consumer); + if (checkUserMessage) { + verifyMesssage(consumer); + } // Verify falcon message verifyMesssage(consumer);
