Repository: oozie
Updated Branches:
  refs/heads/master 5e515f814 -> 50f4b5984


OOZIE-3132 Instrument SLACalculatorMemory (andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/50f4b598
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/50f4b598
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/50f4b598

Branch: refs/heads/master
Commit: 50f4b5984832941f1341586e43fd832c293b3275
Parents: 5e515f8
Author: Andras Piros <[email protected]>
Authored: Wed Nov 22 15:17:44 2017 +0100
Committer: Andras Piros <[email protected]>
Committed: Wed Nov 22 15:17:44 2017 +0100

----------------------------------------------------------------------
 .../apache/oozie/sla/SLACalculatorMemory.java   | 56 ++++++++---
 .../org/apache/oozie/util/Instrumentation.java  | 12 +++
 .../oozie/sla/TestSLACalculatorMemory.java      | 98 +++++++++++++++++++-
 docs/src/site/twiki/DG_SLAMonitoring.twiki      |  4 +-
 release-log.txt                                 |  1 +
 5 files changed, 155 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java 
b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 3c6b50a..ef019e7 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -52,17 +52,19 @@ import 
org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import 
org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.service.SLAService;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.LogUtils;
-import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.Pair;
-
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.oozie.util.XLog;
 
 /**
  * Implementation class for SLACalculator that calculates SLA related to
@@ -79,6 +81,9 @@ public class SLACalculatorMemory implements SLACalculator {
     protected EventHandlerService eventHandler;
     private static int modifiedAfter;
     private static long jobEventLatency;
+    private Instrumentation instrumentation;
+    public static final String INSTRUMENTATION_GROUP = "sla-calculator";
+    public static final String SLA_MAP = "sla-map";
 
     @Override
     public void init(Configuration conf) throws ServiceException {
@@ -88,6 +93,7 @@ public class SLACalculatorMemory implements SLACalculator {
         historySet = Collections.synchronizedSet(new HashSet<String>());
         jpaService = Services.get().get(JPAService.class);
         eventHandler = Services.get().get(EventHandlerService.class);
+        instrumentation = 
Services.get().get(InstrumentationService.class).get();
         // load events modified after
         modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
         loadOnRestart();
@@ -139,7 +145,7 @@ public class SLACalculatorMemory implements SLACalculator {
                     .execute(new 
SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter));
             for (SLASummaryBean summaryBean : summaryBeans) {
                 String jobId = summaryBean.getId();
-                slaMap.put(jobId, new SLACalcStatus(summaryBean));
+                putAndIncrement(jobId, new SLACalcStatus(summaryBean));
             }
             LOG.info("Loaded {0} SLASummary object after restart", 
slaMap.size());
         }
@@ -205,8 +211,10 @@ public class SLACalculatorMemory implements SLACalculator {
 
     @Override
     public void clear() {
+        final int originalSize = slaMap.size();
         slaMap.clear();
         historySet.clear();
+        instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, originalSize);
     }
 
     /**
@@ -229,7 +237,7 @@ public class SLACalculatorMemory implements SLACalculator {
         catch (JPAExecutorException e) {
             if (e.getErrorCode().equals(ErrorCode.E0604) || 
e.getErrorCode().equals(ErrorCode.E0605)) {
                 LOG.debug("job [{0}] is is not in DB, removing from Memory", 
jobId);
-                slaMap.remove(jobId);
+                removeAndDecrement(jobId);
                 return;
             }
             throw e;
@@ -240,7 +248,7 @@ public class SLACalculatorMemory implements SLACalculator {
             if (eventProc == 7) {
                 historySet.add(jobId);
             }
-            slaMap.remove(jobId);
+            removeAndDecrement(jobId);
             LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
         }
         else {
@@ -260,7 +268,7 @@ public class SLACalculatorMemory implements SLACalculator {
                 catch (XException e) {
                     if (e.getErrorCode().equals(ErrorCode.E0604) || 
e.getErrorCode().equals(ErrorCode.E0605)) {
                         LOG.debug("job [{0}] is is not in DB, removing from 
Memory", slaCalc.getId());
-                        slaMap.remove(jobId);
+                        removeAndDecrement(jobId);
                     }
                     else {
                         if (firstCheckAfterRetstart) {
@@ -361,7 +369,7 @@ public class SLACalculatorMemory implements SLACalculator {
                 SLACalcStatus slaCalc = new SLACalcStatus(reg);
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
                 slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
-                slaMap.put(jobId, slaCalc);
+                putAndIncrement(jobId, slaCalc);
                 List<JsonBean> insertList = new ArrayList<JsonBean>();
                 final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc);
                 final Timestamp currentTime = 
DateUtils.convertDateToTimestamp(new Date());
@@ -415,7 +423,7 @@ public class SLACalculatorMemory implements SLACalculator {
                 SLACalcStatus slaCalc = new SLACalcStatus(reg);
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
                 slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
-                slaMap.put(jobId, slaCalc);
+                putAndIncrement(jobId, slaCalc);
 
                 @SuppressWarnings("rawtypes")
                 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
@@ -445,7 +453,7 @@ public class SLACalculatorMemory implements SLACalculator {
      */
     @Override
     public void removeRegistration(String jobId) {
-        if (slaMap.remove(jobId) == null) {
+        if (!removeAndDecrement(jobId)) {
             historySet.remove(jobId);
         }
     }
@@ -470,7 +478,7 @@ public class SLACalculatorMemory implements SLACalculator {
                 SLASummaryBean slaSummaryBean = 
SLASummaryQueryExecutor.getInstance().get(
                         SLASummaryQuery.GET_SLA_SUMMARY, jobId);
                 slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
-                slaMap.put(jobId, slaCalc);
+                putAndIncrement(jobId, slaCalc);
             }
         }
         else {
@@ -509,12 +517,12 @@ public class SLACalculatorMemory implements SLACalculator 
{
     private void checkEventProc(SLACalcStatus slaCalc){
         byte eventProc = slaCalc.getEventProcessed();
         if (slaCalc.getEventProcessed() >= 8) {
-            slaMap.remove(slaCalc.getId());
+            removeAndDecrement(slaCalc.getId());
             LOG.debug("Removed Job [{0}] from map after Event-processed=8", 
slaCalc.getId());
         }
         if (eventProc == 7) {
             historySet.add(slaCalc.getId());
-            slaMap.remove(slaCalc.getId());
+            removeAndDecrement(slaCalc.getId());
             LOG.debug("Removed Job [{0}] from map after Event-processed=7", 
slaCalc.getId());
         }
     }
@@ -659,4 +667,26 @@ public class SLACalculatorMemory implements SLACalculator {
         }
         return false;
     }
+
+    private boolean putAndIncrement(final String jobId, final SLACalcStatus 
newStatus) {
+        if (slaMap.put(jobId, newStatus) == null) {
+            LOG.trace("Added a new item to SLA map. [jobId={0}]", jobId);
+            instrumentation.incr(INSTRUMENTATION_GROUP, SLA_MAP, 1);
+            return true;
+        }
+
+        LOG.trace("Updated an existing item in SLA map. [jobId={0}]", jobId);
+        return false;
+    }
+
+    private boolean removeAndDecrement(final String jobId) {
+        if (slaMap.remove(jobId) != null) {
+            LOG.trace("Removed an existing item from SLA map. [jobId={0}]", 
jobId);
+            instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, 1);
+            return true;
+        }
+
+        LOG.trace("Tried to remove a non-existing item from SLA map. 
[jobId={0}]", jobId);
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/core/src/main/java/org/apache/oozie/util/Instrumentation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/Instrumentation.java 
b/core/src/main/java/org/apache/oozie/util/Instrumentation.java
index 45219a9..a57b665 100644
--- a/core/src/main/java/org/apache/oozie/util/Instrumentation.java
+++ b/core/src/main/java/org/apache/oozie/util/Instrumentation.java
@@ -521,6 +521,18 @@ public class Instrumentation {
     }
 
     /**
+     * Decrement an instrumentation counter. The counter is created if it does 
not exists. <p> This method is thread
+     * safe.
+     *
+     * @param group counter group.
+     * @param name counter name.
+     * @param count decrement to add to the counter.
+     */
+    public void decr(final String group, final String name, final long count) {
+        incr(group, name, -count);
+    }
+
+    /**
      * Interface for instrumentation variables. <p> For example a the database 
service could expose the number of
      * currently active connections.
      */

http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java 
b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index 3a7db10..ee906f4 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -56,11 +56,13 @@ import 
org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.JobUtils;
 import org.apache.oozie.util.Pair;
 import org.apache.oozie.workflow.WorkflowInstance;
@@ -71,6 +73,7 @@ import org.junit.Test;
 public class TestSLACalculatorMemory extends XDataTestCase {
     private Services services;
     private JPAService jpaService;
+    private Instrumentation instrumentation;
 
     @Override
     @Before
@@ -78,11 +81,13 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         super.setUp();
         services = new Services();
         Configuration conf = 
services.get(ConfigurationService.class).getConf();
-        conf.set(Services.CONF_SERVICE_EXT_CLASSES, 
"org.apache.oozie.service.EventHandlerService,"
-                + "org.apache.oozie.sla.service.SLAService");
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES, 
"org.apache.oozie.service.EventHandlerService," +
+                "org.apache.oozie.sla.service.SLAService," +
+                "org.apache.oozie.service.InstrumentationService");
         conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 600);
         services.init();
         jpaService = services.get(JPAService.class);
+        instrumentation = services.get(InstrumentationService.class).get();
     }
 
     @Override
@@ -985,4 +990,93 @@ public class TestSLACalculatorMemory extends XDataTestCase 
{
         assertEquals(slaSummaryBean.getJobStatus(), 
WorkflowInstance.Status.SUCCEEDED.toString());
     }
 
+    public void testSingleAddUpdateRemoveInstrumentedCorrectly() throws 
Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+
+        WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, 
WorkflowInstance.Status.PREP);
+        SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), 
AppType.WORKFLOW_JOB);
+        Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 
1000);
+        slaRegBean.setExpectedStart(startTime); // 1 hour back
+        slaRegBean.setExpectedDuration(1000);
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 
* 3600 * 1000));
+        String jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+
+        long slaMapSize = 
instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
+                get(SLACalculatorMemory.SLA_MAP).getValue();
+
+        assertEquals("SLA map size after add should be 1", 1, slaMapSize);
+
+        slaCalcMemory.updateJobSla(jobId);
+
+        slaMapSize = 
instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
+                get(SLACalculatorMemory.SLA_MAP).getValue();
+        assertEquals("SLA map size after update should be 1", 1, slaMapSize);
+
+        slaCalcMemory.removeRegistration(jobId);
+
+        slaMapSize = 
instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
+                get(SLACalculatorMemory.SLA_MAP).getValue();
+        assertEquals("SLA map size after remove should be 0", 0, slaMapSize);
+    }
+
+    public void testAddMultipleRestartRemoveMultipleInstrumentedCorrectly() 
throws Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", 
AppType.WORKFLOW_JOB);
+        String jobId1 = slaRegBean1.getId();
+        SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2-W", 
AppType.WORKFLOW_JOB);
+        String jobId2 = slaRegBean2.getId();
+        SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3-W", 
AppType.WORKFLOW_JOB);
+        String jobId3 = slaRegBean3.getId();
+        List<String> idList = new ArrayList<String>();
+        idList.add(slaRegBean1.getId());
+        idList.add(slaRegBean2.getId());
+        idList.add(slaRegBean3.getId());
+        createWorkflow(idList);
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+        slaRegBean1.setAppName("app-name");
+        slaRegBean1.setExpectedDuration(123);
+        slaRegBean1.setExpectedEnd(sdf.parse("2012-02-07"));
+        slaRegBean1.setExpectedStart(sdf.parse("2011-02-07"));
+        slaRegBean1.setNominalTime(sdf.parse("2012-01-06"));
+        slaRegBean1.setUser("user");
+        slaRegBean1.setParentId("parentId");
+        slaRegBean1.setUpstreamApps("upstreamApps");
+        slaRegBean1.setNotificationMsg("notificationMsg");
+        slaRegBean1.setAlertContact("[email protected]");
+        slaRegBean1.setAlertEvents("MISS");
+        slaRegBean1.setJobData("jobData");
+
+        Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 
1000); // 1 hour back
+        Date endTime = new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 
1000); // 1 hour back
+
+        slaRegBean3.setExpectedStart(startTime);
+        slaRegBean3.setExpectedEnd(endTime);
+
+        slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+        slaCalcMemory.addRegistration(jobId2, slaRegBean2);
+        slaCalcMemory.addRegistration(jobId3, slaRegBean3);
+
+        long slaMapSize = 
instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
+                get(SLACalculatorMemory.SLA_MAP).getValue();
+
+        assertEquals("SLA map size after add all should be 3", 3, slaMapSize);
+
+        slaCalcMemory.updateAllSlaStatus();
+
+        slaMapSize = 
instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
+                get(SLACalculatorMemory.SLA_MAP).getValue();
+        assertEquals("SLA map size after update all should be 2. An instance 
of SLACalcStatus was removed", 2, slaMapSize);
+
+        slaCalcMemory.removeRegistration(jobId1);
+        slaCalcMemory.removeRegistration(jobId2);
+        slaCalcMemory.removeRegistration(jobId3);
+
+        slaMapSize = 
instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
+                get(SLACalculatorMemory.SLA_MAP).getValue();
+        assertEquals("SLA map size after remove all should be 0", 0, 
slaMapSize);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/docs/src/site/twiki/DG_SLAMonitoring.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SLAMonitoring.twiki 
b/docs/src/site/twiki/DG_SLAMonitoring.twiki
index 2d6b138..29dd395 100644
--- a/docs/src/site/twiki/DG_SLAMonitoring.twiki
+++ b/docs/src/site/twiki/DG_SLAMonitoring.twiki
@@ -151,10 +151,12 @@ Same schema can be applied to and embedded under 
Workflow-Action as well as Coor
 
 ---++ Accessing SLA Information
 
-SLA information is accessible via the following two ways
+SLA information is accessible via the following ways:
    * Through the SLA tab of the Oozie Web UI.
    * JMS messages sent to a configured JMS provider for instantaneous tracking.
    * RESTful API to query for SLA summary.
+   * As an =Instrumentation.Counter= entry that is accessible via RESTful API 
and reflects to the number of all SLA tracked external
+   entities. Name of this counter is =sla-calculator.sla-map=.
 
 For JMS Notifications, you have to have a message broker in place, on which 
Oozie publishes messages and you can
 hook on a subscriber to receive those messages. For more info on setting up 
and consuming JMS messages, refer

http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index abb4ab4..4620875 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-3132 Instrument SLACalculatorMemory (andras.piros)
 OOZIE-2945 Update SpotBugs to stable version after GA (dbist13 via gezapeti)
 OOZIE-3114 Fix javadoc for warning: no @return (dbist13 via gezapeti)
 OOZIE-3107 org.apache.oozie.action.hadoop.TestHiveMain#testMain is flaky 
(dbist13 via pbacsko)

Reply via email to