Repository: falcon Updated Branches: refs/heads/master ccdcd2e6f -> 5d8b36c16
FALCON-1425 Provide Email based plugin to send Notification once instance completed. Contributed by Peeyush Bishnoi. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5d8b36c1 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5d8b36c1 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5d8b36c1 Branch: refs/heads/master Commit: 5d8b36c16c72c0bb5581ad27b61da32ab652f3c4 Parents: ccdcd2e Author: Ajay Yadava <[email protected]> Authored: Thu Sep 17 18:13:16 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Sep 17 18:14:04 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/entity/v0/EntityNotification.java | 35 ++++ client/src/main/resources/feed-0.1.xsd | 39 +++++ client/src/main/resources/jaxb-binding.xjb | 8 + client/src/main/resources/process-0.1.xsd | 40 +++++ .../org/apache/falcon/entity/EntityUtil.java | 13 ++ common/src/main/resources/startup.properties | 24 +++ .../entity/parser/FeedEntityParserTest.java | 9 + .../entity/parser/ProcessEntityParserTest.java | 8 + .../src/test/resources/config/feed/feed-0.1.xml | 1 + .../resources/config/process/process-0.1.xml | 2 + metrics/pom.xml | 11 ++ .../falcon/plugin/NotificationPlugin.java | 29 ++++ .../falcon/util/EmailNotificationProps.java | 58 +++++++ .../apache/falcon/util/NotificationType.java | 46 +++++ prism/pom.xml | 11 ++ .../apache/falcon/plugin/EmailNotification.java | 167 +++++++++++++++++++ .../falcon/plugin/EmailNotificationPlugin.java | 74 ++++++++ .../falcon/plugin/NotificationHandler.java | 39 +++++ .../apache/falcon/util/NotificationUtil.java | 77 +++++++++ .../falcon/plugin/EmailNotificationTest.java | 160 ++++++++++++++++++ src/conf/startup.properties | 24 +++ 22 files changed, 877 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d431546..013e0fd 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1425 Provide Email based plugin to send Notification once instance completed(Peeyush Bishnoi via Ajay Yadava) + FALCON-1205 SLAService to keep track of missing SLAs for feeds(Ajay Yadava) FALCON-1449 Move getEntityProperties method to EntityUtil.(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/client/src/main/java/org/apache/falcon/entity/v0/EntityNotification.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/entity/v0/EntityNotification.java b/client/src/main/java/org/apache/falcon/entity/v0/EntityNotification.java new file mode 100644 index 0000000..bab70d4 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/entity/v0/EntityNotification.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity.v0; + +/** + * EntityNotification class to be extended by Feed/Process notification class. + */ +public abstract class EntityNotification { + public abstract String getType(); + public abstract String getLevel(); + public abstract String getTo(); + + public String toString() { + return "Notification{" + + "type=" + getType() + + ", to=" + getTo() + + "}"; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/client/src/main/resources/feed-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd index 4ff8baa..2af28d2 100644 --- a/client/src/main/resources/feed-0.1.xsd +++ b/client/src/main/resources/feed-0.1.xsd @@ -114,6 +114,14 @@ <xs:element type="locations" name="locations"/> <xs:element type="catalog-table" name="table"/> </xs:choice> + <xs:element type="notification" name="notification" minOccurs="0"> + <xs:annotation> + <xs:documentation>Notification will help to notify the users about the finished status of Falcon + Instance. Currently Email type notification is supported and users must specify the receiver's + email address. + </xs:documentation> + </xs:annotation> + </xs:element> <xs:element type="ACL" name="ACL"/> <xs:element type="schema" name="schema"/> <xs:element type="properties" name="properties" minOccurs="0"/> @@ -291,6 +299,37 @@ <xs:complexType name="partition"> <xs:attribute type="IDENTIFIER" name="name" use="required"/> </xs:complexType> + + <xs:complexType name="notification"> + <xs:annotation> + <xs:documentation> + Notification specifies the "type" of notification to be used to send notification. + Currently email based notification type is supported and user can specify the comma + separated email address with "to" property. + e.g: type="email" to="falcon@localhost,hive@localhost" + "limit" property in notification will help to set the frequency of email notification + in case of Falcon instance failure. Incase of feed entity limit="attempt" is only supported + as there is no retry element. + </xs:documentation> + </xs:annotation> + <xs:attribute name="type" use="required"> + <xs:simpleType> + <xs:restriction base="xs:string"> + <xs:enumeration value="email"/> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + <xs:attribute name="level" use="optional"> + <xs:simpleType> + <xs:restriction base="xs:string"> + <xs:enumeration value="attempt"/> + <xs:enumeration value="instance"/> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + <xs:attribute type="xs:string" name="to" use="required"/> + </xs:complexType> + <xs:complexType name="ACL"> <xs:annotation> <xs:documentation> http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/client/src/main/resources/jaxb-binding.xjb ---------------------------------------------------------------------- diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb index f644f40..6f1d6c7 100644 --- a/client/src/main/resources/jaxb-binding.xjb +++ b/client/src/main/resources/jaxb-binding.xjb @@ -40,6 +40,10 @@ <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends> </jaxb:bindings> + <jaxb:bindings schemaLocation="feed-0.1.xsd" node="//xs:complexType[@name='notification']"> + <inheritance:extends>org.apache.falcon.entity.v0.EntityNotification</inheritance:extends> + </jaxb:bindings> + <jaxb:bindings schemaLocation="process-0.1.xsd" node="//xs:complexType[@name='process']"> <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends> </jaxb:bindings> @@ -48,6 +52,10 @@ <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends> </jaxb:bindings> + <jaxb:bindings schemaLocation="process-0.1.xsd" node="//xs:complexType[@name='notification']"> + <inheritance:extends>org.apache.falcon.entity.v0.EntityNotification</inheritance:extends> + </jaxb:bindings> + <jaxb:globalBindings> <xjc:simple/> </jaxb:globalBindings> http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/client/src/main/resources/process-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd index c81d6f7..256a29f 100644 --- a/client/src/main/resources/process-0.1.xsd +++ b/client/src/main/resources/process-0.1.xsd @@ -170,6 +170,14 @@ </xs:documentation> </xs:annotation> </xs:element> + <xs:element type="notification" name="notification" minOccurs="0"> + <xs:annotation> + <xs:documentation>Notification will help to notify the users about the finished status of Falcon + Instance. Currently Email type notification is supported and users must specify the receiver's + email address. + </xs:documentation> + </xs:annotation> + </xs:element> <xs:element type="ACL" name="ACL" minOccurs="0"/> </xs:sequence> <xs:attribute type="IDENTIFIER" name="name" use="required"/> @@ -399,4 +407,36 @@ <xs:attribute type="xs:string" name="group"/> <xs:attribute type="xs:string" name="permission" default="*"/> </xs:complexType> + + <xs:complexType name="notification"> + <xs:annotation> + <xs:documentation> + Notification specifies the "type" of notification to be used to send notification. + Currently email based notification type is supported and user can specify the comma + separated email address with "to" property. + e.g: type="email" to="falcon@localhost,hive@localhost" + "limit" property in notification will help to set the frequency of email notification + in case of Falcon instance failure. + If limit="attempt" is set, for every instance failure email will be sent. + If limit="final" is set, failure email will be sent only when all the attempts has been + tried defined with retry element. + </xs:documentation> + </xs:annotation> + <xs:attribute name="type" use="required"> + <xs:simpleType> + <xs:restriction base="xs:string"> + <xs:enumeration value="email"/> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + <xs:attribute name="level" use="optional"> + <xs:simpleType> + <xs:restriction base="xs:string"> + <xs:enumeration value="attempt"/> + <xs:enumeration value="instance"/> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + <xs:attribute type="xs:string" name="to" use="required"/> + </xs:complexType> </xs:schema> http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 2f05b1f..646afc3 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -29,6 +29,7 @@ import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityGraph; +import org.apache.falcon.entity.v0.EntityNotification; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; @@ -936,5 +937,17 @@ public final class EntityUtil { return result; } + public static EntityNotification getEntityNotification(Entity entity) { + switch (entity.getEntityType()) { + case FEED: + Feed feed = (Feed) entity; + return feed.getNotification(); + case PROCESS: + Process process = (Process) entity; + return process.getNotification(); + default: + throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType()); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 39a412d..9db460c 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -198,3 +198,27 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle *.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider ######### Authorization Properties ######### + +######### SMTP Properties ######## + +# Setting SMTP hostname +#*.falcon.email.smtp.host=localhost + +# Setting SMTP port number +#*.falcon.email.smtp.port=25 + +# Setting email from address +#*.falcon.email.from.address=falcon@localhost + +# Setting email Auth +#*.falcon.email.smtp.auth=false + +#Setting user name +#*.falcon.email.smtp.user="" + +#Setting password +#*.falcon.email.smtp.password="" + +# Setting monitoring plugin, if SMTP parameters is defined +#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ +# org.apache.falcon.plugin.EmailNotificationPlugin http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java index d203b7c..1e9b72f 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java @@ -927,6 +927,15 @@ public class FeedEntityParserTest extends AbstractTestBase { } @Test + public void testValidateEmailNotification() throws Exception { + Feed feedNotification = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( + (FeedEntityParserTest.class.getResourceAsStream(FEED_XML))); + Assert.assertNotNull(feedNotification.getNotification()); + Assert.assertEquals(feedNotification.getNotification().getTo(), "falcon@localhost"); + Assert.assertEquals(feedNotification.getNotification().getType(), "email"); + } + + @Test public void testValidateFeedProperties() throws Exception { FeedEntityParser feedEntityParser = Mockito .spy((FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED)); http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java index 77f6a77..a935db3 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java @@ -357,6 +357,14 @@ public class ProcessEntityParserTest extends AbstractTestBase { } @Test + public void testValidateEmailNotification() throws Exception { + Process process = parser.parseAndValidate(getClass().getResourceAsStream(PROCESS_XML)); + Assert.assertNotNull(process.getNotification()); + Assert.assertEquals(process.getNotification().getTo(), "falcon@localhost"); + Assert.assertEquals(process.getNotification().getType(), "email"); + } + + @Test public void testValidateACLWithNoACLAndAuthorizationDisabled() throws Exception { InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML); http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/common/src/test/resources/config/feed/feed-0.1.xml ---------------------------------------------------------------------- diff --git a/common/src/test/resources/config/feed/feed-0.1.xml b/common/src/test/resources/config/feed/feed-0.1.xml index ee2607a..d223d5d 100644 --- a/common/src/test/resources/config/feed/feed-0.1.xml +++ b/common/src/test/resources/config/feed/feed-0.1.xml @@ -58,6 +58,7 @@ <location type="meta" path="/projects/falcon/clicksMetaData"/> </locations> + <notification type="email" to="falcon@localhost"/> <ACL owner="testuser-ut-user" group="group" permission="0x755"/> <schema location="/schema/clicks" provider="protobuf"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/common/src/test/resources/config/process/process-0.1.xml ---------------------------------------------------------------------- diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml index 2659903..039208c 100644 --- a/common/src/test/resources/config/process/process-0.1.xml +++ b/common/src/test/resources/config/process/process-0.1.xml @@ -54,4 +54,6 @@ <late-input input="impression" workflow-path="himpression/late/workflow"/> <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/> </late-process> + + <notification type="email" to="falcon@localhost"/> </process> http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/metrics/pom.xml ---------------------------------------------------------------------- diff --git a/metrics/pom.xml b/metrics/pom.xml index 748fb97..a0358db 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -35,6 +35,11 @@ <dependencies> <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-client</artifactId> + </dependency> + + <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> </dependency> @@ -54,5 +59,11 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + + <dependency> + <groupId>javax.mail</groupId> + <artifactId>mail</artifactId> + <version>1.4.7</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/metrics/src/main/java/org/apache/falcon/plugin/NotificationPlugin.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/plugin/NotificationPlugin.java b/metrics/src/main/java/org/apache/falcon/plugin/NotificationPlugin.java new file mode 100644 index 0000000..ff86d28 --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/plugin/NotificationPlugin.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.plugin; + +import org.apache.falcon.aspect.ResourceMessage; +import org.apache.falcon.entity.v0.EntityNotification; + +/** + * Interface to be implemented by notification class to send notification. + */ +public interface NotificationPlugin { + void sendNotification(ResourceMessage resourceMessage, EntityNotification entityNotification) throws Exception; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/metrics/src/main/java/org/apache/falcon/util/EmailNotificationProps.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/util/EmailNotificationProps.java b/metrics/src/main/java/org/apache/falcon/util/EmailNotificationProps.java new file mode 100644 index 0000000..7de7b73 --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/util/EmailNotificationProps.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +/** + * Email Notification Argument list. + */ +public enum EmailNotificationProps { + SMTP_HOST("falcon.email.smtp.host", "SMTP host server name", true), + SMTP_PORT("falcon.email.smtp.port", "SMTP port number", true), + SMTP_FROM("falcon.email.from.address", "SMTP from address", true), + SMTP_AUTH("falcon.email.smtp.auth", "SMTP authorization details", false), + SMTP_USER("falcon.email.smtp.user", "SMTP username", false), + SMTP_PASSWORD("falcon.email.smtp.password", "SMTP password", false); + + private final String name; + private final String description; + private final boolean isRequired; + + EmailNotificationProps(String name, String description, boolean isRequired) { + this.name = name; + this.description = description; + this.isRequired = isRequired; + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return this.description; + } + + public boolean isRequired() { + return this.isRequired; + } + + public String toString() { + return getName(); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/metrics/src/main/java/org/apache/falcon/util/NotificationType.java ---------------------------------------------------------------------- diff --git a/metrics/src/main/java/org/apache/falcon/util/NotificationType.java b/metrics/src/main/java/org/apache/falcon/util/NotificationType.java new file mode 100644 index 0000000..e5f7258 --- /dev/null +++ b/metrics/src/main/java/org/apache/falcon/util/NotificationType.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +/** + * Types of Notification. + */ +public enum NotificationType { + EMAIL("email", "send email notification"); + + private final String name; + private final String description; + + NotificationType(String name, String description) { + this.name = name; + this.description = description; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + public String toString() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/prism/pom.xml ---------------------------------------------------------------------- diff --git a/prism/pom.xml b/prism/pom.xml index be04ac9..6eca0b9 100644 --- a/prism/pom.xml +++ b/prism/pom.xml @@ -76,6 +76,11 @@ <dependency> <groupId>org.apache.falcon</groupId> + <artifactId>falcon-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> <artifactId>falcon-oozie-adaptor</artifactId> </dependency> @@ -148,6 +153,12 @@ <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> </dependency> + + <dependency> + <groupId>com.icegreen</groupId> + <artifactId>greenmail</artifactId> + <version>1.4.1</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/prism/src/main/java/org/apache/falcon/plugin/EmailNotification.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/plugin/EmailNotification.java b/prism/src/main/java/org/apache/falcon/plugin/EmailNotification.java new file mode 100644 index 0000000..8c42c70 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/plugin/EmailNotification.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.plugin; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.aspect.ResourceMessage; +import org.apache.falcon.entity.v0.EntityNotification; +import org.apache.falcon.util.EmailNotificationProps; +import org.apache.falcon.util.NotificationUtil; +import org.apache.falcon.util.StartupProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.Authenticator; +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.PasswordAuthentication; +import javax.mail.Session; +import javax.mail.Transport; +import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeMessage; +import java.util.Map; +import java.util.Properties; + +/** + * Concrete class for email notification. + */ +public class EmailNotification implements NotificationPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class); + + private static final String SMTP_HOST = StartupProperties.get().getProperty( + EmailNotificationProps.SMTP_HOST.getName(), "localhost"); + private static final String SMTP_PORT = StartupProperties.get().getProperty( + EmailNotificationProps.SMTP_PORT.getName(), "3025"); + private static final String SMTP_FROM = StartupProperties.get().getProperty( + EmailNotificationProps.SMTP_FROM.getName(), "falcon@localhost"); + private static final Boolean SMTP_AUTH = Boolean.valueOf(StartupProperties.get().getProperty( + EmailNotificationProps.SMTP_AUTH.getName(), "false")); + private static final String SMTP_USER = StartupProperties.get().getProperty( + EmailNotificationProps.SMTP_USER.getName(), ""); + private static final String SMTP_PASSWORD = StartupProperties.get().getProperty( + EmailNotificationProps.SMTP_PASSWORD.getName(), ""); + + private static final String NEWLINE_DELIM = System.getProperty("line.separator"); + private static final String TAB_DELIM = "\t"; + + private Message message; + + public EmailNotification() throws FalconException { + initialize(); + } + + private void initialize() throws FalconException { + Properties emailProperties = new Properties(); + if (StringUtils.isEmpty(SMTP_HOST) && StringUtils.isEmpty(SMTP_PORT) + && StringUtils.isEmpty(SMTP_FROM)) { + LOG.error("SMTP properties is not defined in startup.properties"); + return; + } + + emailProperties.setProperty("mail.smtp.host", SMTP_HOST); + emailProperties.setProperty("mail.smtp.port", SMTP_PORT); + emailProperties.setProperty("mail.smtp.auth", SMTP_AUTH.toString()); + try { + Session session; + if (!SMTP_AUTH) { + session = Session.getInstance(emailProperties); + } else { + session = Session.getInstance(emailProperties, new FalconMailAuthenticator(SMTP_USER, SMTP_PASSWORD)); + } + message = new MimeMessage(session); + message.setFrom(new InternetAddress(SMTP_FROM)); + } catch (MessagingException e) { + throw new FalconException("Exception occurred in SMTP initialization:" +e); + } + } + + + public void sendNotification(ResourceMessage resourceMessage, EntityNotification entityNotification) + throws FalconException { + try { + message.addRecipients(Message.RecipientType.TO, + NotificationUtil.getToAddress(entityNotification.getTo())); + + if (resourceMessage.getAction().equals("wf-instance-succeeded")) { + sendSuccessNotification(resourceMessage); + } else if ((resourceMessage.getAction().equals("wf-instance-failed"))) { + sendFailureNotification(resourceMessage); + } + + // Send message + Transport.send(message); + } catch (MessagingException e) { + throw new FalconException("Error occurred while sending email message using SMTP:" +e); + } + } + + private void sendSuccessNotification(ResourceMessage resourceMessage) throws FalconException { + try { + String subjectMessage = "Falcon Instance Succeeded : " + getSubjectMessage(resourceMessage); + message.setSubject(subjectMessage); + message.setText(subjectMessage + NEWLINE_DELIM + getBodyMessage(resourceMessage)); + } catch (MessagingException e) { + throw new FalconException("Error in composing email notification:" +e); + } + } + + private void sendFailureNotification(ResourceMessage resourceMessage) throws FalconException { + try { + String subjectMessage = "Falcon Instance Failed : " + getSubjectMessage(resourceMessage); + message.setSubject(subjectMessage); + message.setText(subjectMessage + NEWLINE_DELIM + getBodyMessage(resourceMessage)); + } catch (MessagingException e) { + throw new FalconException("Error in composing email notification:" +e); + } + } + + private String getSubjectMessage(ResourceMessage resourceMessage) throws FalconException { + return "Workflow id:" + resourceMessage.getDimensions().get("wf-id") + + " Name:" + resourceMessage.getDimensions().get("entity-name") + + " Type:" + resourceMessage.getDimensions().get("entity-type"); + } + + private String getBodyMessage(ResourceMessage resourceMessage) throws FalconException { + StringBuilder msg = new StringBuilder(); + Map<String, String> instanceMap = resourceMessage.getDimensions(); + for (Map.Entry<String, String> entry : instanceMap.entrySet()) { + msg.append(entry.getKey() + TAB_DELIM + entry.getValue()); + msg.append(NEWLINE_DELIM); + } + msg.append("---"); + return msg.toString(); + } + + private static class FalconMailAuthenticator extends Authenticator { + private String user; + private String password; + + public FalconMailAuthenticator(String user, String password) { + this.user = user; + this.password = password; + } + + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(user, password); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/prism/src/main/java/org/apache/falcon/plugin/EmailNotificationPlugin.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/plugin/EmailNotificationPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/EmailNotificationPlugin.java new file mode 100644 index 0000000..bcb8c0a --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/plugin/EmailNotificationPlugin.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.plugin; + +import org.apache.falcon.aspect.ResourceMessage; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityNotification; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.util.NotificationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EmailNotification plugin implementation for sending email notification. + */ +public class EmailNotificationPlugin implements MonitoringPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(EmailNotificationPlugin.class); + + @Override + public void monitor(ResourceMessage message) { + try { + String entityType = message.getDimensions().get("entity-type"); + String entityName = message.getDimensions().get("entity-name"); + Entity entity = null; + if (entityType.equals("PROCESS")) { + entity = ConfigurationStore.get().get(EntityType.PROCESS, entityName); + } else if (entityType.equals("FEED")) { + entity = ConfigurationStore.get().get(EntityType.FEED, entityName); + } + + EntityNotification entityNotification = EntityUtil.getEntityNotification(entity); + if (entityNotification == null) { + LOG.info("Notification tag is not defined for entity: {}", entityName); + return; + } + + if (!(NotificationUtil.isEmailAddressValid(entityNotification))) { + LOG.error("Notification email address is not valid: {}", entityNotification.getTo()); + return; + } + + if ((message.getAction().equals("wf-instance-succeeded") + || message.getAction().equals("wf-instance-failed"))) { + NotificationPlugin pluginType = NotificationHandler.getNotificationType(entityNotification.getType()); + if (pluginType != null) { + pluginType.sendNotification(message, entityNotification); + } else { + LOG.error("Notification type is not supported: {}", entityNotification.getType()); + } + } + } catch (Exception e) { + LOG.error("Exception in sending Notification from EmailNotificationPlugin:" +e); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/prism/src/main/java/org/apache/falcon/plugin/NotificationHandler.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/plugin/NotificationHandler.java b/prism/src/main/java/org/apache/falcon/plugin/NotificationHandler.java new file mode 100644 index 0000000..582d3eb --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/plugin/NotificationHandler.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.plugin; + +import org.apache.falcon.FalconException; +import org.apache.falcon.util.NotificationType; + +/** + * Notification Handler to initialize the Notification concrete class. + */ +public final class NotificationHandler { + + private NotificationHandler() { + } + + public static NotificationPlugin getNotificationType(final String notificationType) throws FalconException { + if (notificationType.equals(NotificationType.EMAIL.getName())) { + return new EmailNotification(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/prism/src/main/java/org/apache/falcon/util/NotificationUtil.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/util/NotificationUtil.java b/prism/src/main/java/org/apache/falcon/util/NotificationUtil.java new file mode 100644 index 0000000..d20422b --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/util/NotificationUtil.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.EntityNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.Address; +import javax.mail.internet.AddressException; +import javax.mail.internet.InternetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Utils class for notification. + */ +public final class NotificationUtil { + public static final Logger LOG = LoggerFactory.getLogger(NotificationUtil.class); + + private NotificationUtil() { + } + + private static final String EMAIL_PATTERN = "^[_A-Za-z0-9-\\+]+(\\.[_A-Za-z0-9-]+)*@" + + "[A-Za-z0-9-]+(\\.[A-Za-z0-9]+)*$"; + private static final String COMMA = ","; + + public static boolean isEmailAddressValid(EntityNotification entityNotification) throws FalconException { + Pattern pattern = Pattern.compile(EMAIL_PATTERN); + if (StringUtils.isEmpty(entityNotification.getTo())) { + return false; + } + + for (String address : entityNotification.getTo().split(COMMA)) { + Matcher matcher = pattern.matcher(address.trim()); + if (!(matcher.matches())) { + return false; + } + } + + return true; + } + + public static Address[] getToAddress(String toAddress) throws FalconException { + List<InternetAddress> toAddrs = new ArrayList<InternetAddress>(); + String []tos = toAddress.split(COMMA); + try { + for (String toStr : tos) { + toAddrs.add(new InternetAddress(toStr.trim())); + } + } catch (AddressException e) { + throw new FalconException("Exception in to address:"+e); + } + + return toAddrs.toArray(new InternetAddress[0]); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/prism/src/test/java/org/apache/falcon/plugin/EmailNotificationTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/plugin/EmailNotificationTest.java b/prism/src/test/java/org/apache/falcon/plugin/EmailNotificationTest.java new file mode 100644 index 0000000..a6671fa --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/plugin/EmailNotificationTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.plugin; + +import com.icegreen.greenmail.util.GreenMail; +import com.icegreen.greenmail.util.ServerSetup; +import org.apache.falcon.aspect.ResourceMessage; +import org.apache.falcon.entity.v0.process.Notification; +import org.apache.falcon.util.NotificationUtil; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.mail.Message; +import java.util.HashMap; +import java.util.Map; + +/** + * Unit test to test Email Notification. + */ + +public class EmailNotificationTest { + private static final String EMAIL_FROM = "falcon@localhost"; + + private GreenMail mailServer; + private ResourceMessage resourceMessage; + + @BeforeClass + protected void setUp() throws Exception { + ServerSetup setup = new ServerSetup(3025, "localhost", "smtp"); + mailServer = new GreenMail(setup); + mailServer.setUser(EMAIL_FROM, "", ""); + mailServer.start(); + resourceMessage = buildResourceMessage(); + } + + private ResourceMessage buildResourceMessage() { + String action = "wf-instance-succeeded"; + ResourceMessage.Status status = ResourceMessage.Status.SUCCEEDED; + long executionTime = 78819000000L; //Time in nano seconds. + Map<String, String> dimensions = new HashMap<String, String>(); + dimensions.put("entity-type", "process"); + dimensions.put("entity-name", "pig-process"); + dimensions.put("wf-id", "001-oozie-wf"); + dimensions.put("wf-user", "falcon"); + dimensions.put("run-id", "1"); + dimensions.put("operation", "GENERATE"); + + return new ResourceMessage(action, dimensions, status, executionTime); + } + + + @Test + public void testSendNotification() throws Exception { + String notificationType = "email"; + String emailTo = "falcon_to@localhost"; + + Notification notification = new Notification(); + notification.setType(notificationType); + notification.setTo(emailTo); + + NotificationPlugin pluginType = NotificationHandler.getNotificationType(notification.getType()); + Assert.assertNotNull(pluginType); + + pluginType.sendNotification(resourceMessage, notification); + mailServer.waitForIncomingEmail(5000, 1); + Message[] messages = mailServer.getReceivedMessages(); + Assert.assertNotNull(messages); + Assert.assertEquals(messages[0].getFrom()[0].toString(), EMAIL_FROM); + Assert.assertEquals(messages[0].getAllRecipients()[0].toString(), emailTo); + Assert.assertEquals(messages[0].getSubject(), "Falcon Instance Succeeded : Workflow id:001-oozie-wf " + + "Name:pig-process Type:process"); + } + + @Test + public void testNotificationType() throws Exception { + String notificationType = "email"; + String emailTo = "falcon@localhost"; + + Notification notification = new Notification(); + notification.setType(notificationType); + notification.setTo(emailTo); + + NotificationPlugin pluginType = NotificationHandler.getNotificationType(notification.getType()); + Assert.assertNotNull(pluginType); + + notificationType = "eml"; + notification.setType(notificationType); + pluginType = NotificationHandler.getNotificationType(notification.getType()); + Assert.assertNull(pluginType); + + notificationType = ""; + notification.setType(notificationType); + pluginType = NotificationHandler.getNotificationType(notification.getType()); + Assert.assertNull(pluginType); + } + + @Test + public void testNotificationEmailAddress() throws Exception { + String notificationType = "email"; + String emailAddress = "falcon@locahost"; + + Notification notification = new Notification(); + notification.setType(notificationType); + notification.setTo(emailAddress); + + Assert.assertTrue(NotificationUtil.isEmailAddressValid(notification)); + + emailAddress = "falcon_123@localhost"; + notification.setTo(emailAddress); + Assert.assertTrue(NotificationUtil.isEmailAddressValid(notification)); + + emailAddress = "[email protected]"; + notification.setTo(emailAddress); + Assert.assertTrue(NotificationUtil.isEmailAddressValid(notification)); + + emailAddress = "falcon@locahost,hive@localhost,[email protected]"; + notification.setTo(emailAddress); + Assert.assertTrue(NotificationUtil.isEmailAddressValid(notification)); + + emailAddress = "falcon@locahost,,,"; + notification.setTo(emailAddress); + Assert.assertTrue(NotificationUtil.isEmailAddressValid(notification)); + + emailAddress = "falcon"; + notification.setTo(emailAddress); + Assert.assertFalse(NotificationUtil.isEmailAddressValid(notification)); + + emailAddress = "falcon#localhost"; + notification.setTo(emailAddress); + Assert.assertFalse(NotificationUtil.isEmailAddressValid(notification)); + + emailAddress = ""; + notification.setTo(emailAddress); + Assert.assertFalse(NotificationUtil.isEmailAddressValid(notification)); + } + + + @AfterClass + public void tearDown() throws Exception { + mailServer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/5d8b36c1/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 305ac36..8f3bc35 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -202,3 +202,27 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider ######### Authorization Properties ######### + +######### SMTP Properties ######## + +# Setting SMTP hostname +#*.falcon.email.smtp.host=localhost + +# Setting SMTP port number +#*.falcon.email.smtp.port=25 + +# Setting email from address +#*.falcon.email.from.address=falcon@localhost + +# Setting email Auth +#*.falcon.email.smtp.auth=false + +#Setting user name, if Auth is true +#*.falcon.email.smtp.user="" + +#Setting password, if Auth is true +#*.falcon.email.smtp.password="" + +# Setting monitoring plugin, if SMTP parameters is defined +#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ +# org.apache.falcon.plugin.EmailNotificationPlugin
