FALCON-722 Add SLA for processes. Contributed by Ajay Yadav

Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/4d783216
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/4d783216
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/4d783216

Branch: refs/heads/master
Commit: 4d783216d2182f4319bf680a18274e5083f058fb
Parents: f5fe0ea
Author: srikanth.sundarrajan <srik...@apache.org>
Authored: Mon Nov 10 09:07:51 2014 +0530
Committer: srikanth.sundarrajan <srik...@apache.org>
Committed: Mon Nov 10 09:07:51 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  6 ++--
 client/src/main/resources/process-0.1.xsd       | 19 +++++++++++
 .../entity/parser/ProcessEntityParser.java      | 34 ++++++++++++++++++++
 .../entity/parser/ProcessEntityParserTest.java  | 25 ++++++++++++++
 .../resources/config/process/process-0.1.xml    |  1 +
 docs/src/site/twiki/EntitySpecification.twiki   |  9 ++++++
 6 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4d783216/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8117c04..9dbf7ae 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,10 @@ Trunk (Unreleased)
    (Venkatesh Seetharam)
 
   NEW FEATURES
+   FALCON-722 Add SLA for processes (Ajay Yadav via Srikanth Sundarrajan)
+
+   FALCON-721 Add SLA for Feeds (Ajay Yadav via Suhas Vasu)
+
    FALCON-687 Add hooks for extensions in Audit (Venkatesh Seetharam)
 
    FALCON-636 Add a sample recipe for disaster recovery of hdfs dirs/files
@@ -40,8 +44,6 @@ Trunk (Unreleased)
    FALCON-145 Feed eviction be implemented in appropriate Storage 
    implementation. (Ajay Yadav via Srikanth Sundarrajan)
 
-   FALCON-721 Add SLA for Feeds (Ajay Yadav via Suhas Vasu)
-
    FALCON-813 Expose job id for running jobs in Falcon (Suhas Vasu)
 
    FALCON-834 Propagate request id in the response to help trace and debug

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4d783216/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd 
b/client/src/main/resources/process-0.1.xsd
index 06a2fe4..2d64716 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -109,6 +109,13 @@
                     </xs:documentation>
                 </xs:annotation>
             </xs:element>
+            <xs:element type="sla" name="sla" minOccurs="0">
+                <xs:annotation>
+                    <xs:documentation>
+                        Defines SLA(Service Level Agreement) for process.
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
             <xs:element name="timezone" minOccurs="0" default="UTC">
                 <xs:simpleType>
                     <xs:annotation>
@@ -244,6 +251,18 @@
         </xs:restriction>
     </xs:simpleType>
 
+    <xs:complexType name="sla">
+        <xs:annotation>
+            <xs:documentation>
+                sla has 2 optional attributes - shouldStartIn and shouldEndIn. 
All the attributes
+                are written using expressions like frequency. shouldStartIn is 
the time in which the process should have
+                started. shouldEndIn is the time in which the process should 
have finished.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="frequency-type" name="shouldStartIn"/>
+        <xs:attribute type="frequency-type" name="shouldEndIn" />
+    </xs:complexType>
+
     <xs:complexType name="inputs">
         <xs:sequence>
             <xs:element type="input" name="input" maxOccurs="unbounded" 
minOccurs="1">

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4d783216/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java 
b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 55887ac..3119b36 100644
--- 
a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ 
b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.ACL;
@@ -35,6 +36,7 @@ import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -100,8 +102,40 @@ public class ProcessEntityParser extends 
EntityParser<Process> {
         }
         validateDatasetName(process.getInputs(), process.getOutputs());
         validateLateInputs(process);
+        validateProcessSLA(process);
     }
 
+
+    private void validateProcessSLA(Process process) throws FalconException {
+        if (process.getSla() != null) {
+            ExpressionHelper evaluator = ExpressionHelper.get();
+            ExpressionHelper.setReferenceDate(new Date());
+            Frequency shouldStartExpression = 
process.getSla().getShouldStartIn();
+            Frequency shouldEndExpression = process.getSla().getShouldEndIn();
+            Frequency timeoutExpression = process.getTimeout();
+
+            if (shouldStartExpression != null){
+                Date shouldStart = new 
Date(evaluator.evaluate(shouldStartExpression.toString(), Long.class));
+
+                if (shouldEndExpression != null) {
+                    Date shouldEnd = new 
Date(evaluator.evaluate(shouldEndExpression.toString(), Long.class));
+                    if (shouldStart.after(shouldEnd)) {
+                        throw new ValidationException("shouldStartIn of 
Process: " + shouldStartExpression
+                                + "is greater than shouldEndIn: "
+                                + shouldEndExpression);
+                    }
+                }
+
+                if (timeoutExpression != null) {
+                    Date timeout = new 
Date(evaluator.evaluate(timeoutExpression.toString(), Long.class));
+                    if (timeout.before(shouldStart)) {
+                        throw new ValidationException("shouldStartIn of 
Process: " + shouldStartExpression
+                                + " is greater than timeout: " + 
process.getTimeout());
+                    }
+                }
+            }
+        }
+    }
     /**
      * Validate if the user submitting this entity has access to the specific 
dirs on HDFS.
      *

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4d783216/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
 
b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index e3a2cd5..432e92a 100644
--- 
a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ 
b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -118,6 +118,9 @@ public class ProcessEntityParserTest extends 
AbstractTestBase {
         
Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()),
 "2091-12-30T00:00Z");
         Assert.assertEquals(process.getTimezone().getID(), "UTC");
 
+        Assert.assertEquals(process.getSla().getShouldStartIn().toString(), 
"hours(2)");
+        Assert.assertEquals(process.getSla().getShouldEndIn().toString(), 
"hours(4)");
+
         
Assert.assertEquals(process.getWorkflow().getEngine().name().toLowerCase(), 
"oozie");
         Assert.assertEquals(process.getWorkflow().getPath(), 
"/falcon/test/workflow");
 
@@ -159,6 +162,28 @@ public class ProcessEntityParserTest extends 
AbstractTestBase {
         }
     }
 
+    @Test(expectedExceptions = FalconException.class, 
expectedExceptionsMessageRegExp = "shouldStartIn of Process:.*")
+    public void testInvalidShouldStart() throws FalconException {
+        Process process = 
parser.parseAndValidate((ProcessEntityParserTest.class
+                .getResourceAsStream(PROCESS_XML)));
+        process.getSla().setShouldStartIn(new Frequency("hours(4)"));
+        process.getSla().setShouldEndIn(new Frequency("hours(2)"));
+        parser.validate(process);
+    }
+
+
+    @Test(expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = ".* greater than timeout.*")
+    public void testShouldStartGreaterThanTimeout() throws FalconException {
+        Process process = 
parser.parseAndValidate((ProcessEntityParserTest.class
+                .getResourceAsStream(PROCESS_XML)));
+        process.getSla().setShouldStartIn(new Frequency("hours(2)"));
+        process.setTimeout(new Frequency("hours(1)"));
+        parser.validate(process);
+    }
+
+
+
     @Test(expectedExceptions = FalconException.class)
     public void doParseInvalidXML() throws IOException, FalconException {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4d783216/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml 
b/common/src/test/resources/config/process/process-0.1.xml
index 99a0376..a4cfb46 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -27,6 +27,7 @@
     <parallel>1</parallel>
     <order>LIFO</order>
     <frequency>hours(1)</frequency>
+    <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
 
     <!-- what -->
     <inputs>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4d783216/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki 
b/docs/src/site/twiki/EntitySpecification.twiki
index a81a626..66fcd2f 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -382,6 +382,15 @@ Syntax:
 </process>
 </verbatim>
 
+---+++ SLA
+<verbatim>
+    <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
+</verbatim>
+A process can have SLA which is defined by 2 optional attributes - 
shouldStartIn and shouldEndIn. All the attributes
+are written using expressions like frequency. shouldStartIn is the time by 
which the process should have started.
+shouldEndIn is the time by which the process should have finished.
+
+
 ---+++ Validity
 Validity defines how long the workflow should run. It has 3 components - start 
time, end time and timezone. Start time and end time are timestamps defined in 
yyyy-MM-dd'T'HH:mm'Z' format and should always be in UTC. Timezone is used to 
compute the next instances starting from start time. The workflow will start at 
start time and end before end time specified on a given cluster. So, there will 
not be a workflow instance at end time.
 Syntax:

Reply via email to