Repository: oozie Updated Branches: refs/heads/master 5e515f814 -> 50f4b5984
OOZIE-3132 Instrument SLACalculatorMemory (andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/50f4b598 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/50f4b598 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/50f4b598 Branch: refs/heads/master Commit: 50f4b5984832941f1341586e43fd832c293b3275 Parents: 5e515f8 Author: Andras Piros <[email protected]> Authored: Wed Nov 22 15:17:44 2017 +0100 Committer: Andras Piros <[email protected]> Committed: Wed Nov 22 15:17:44 2017 +0100 ---------------------------------------------------------------------- .../apache/oozie/sla/SLACalculatorMemory.java | 56 ++++++++--- .../org/apache/oozie/util/Instrumentation.java | 12 +++ .../oozie/sla/TestSLACalculatorMemory.java | 98 +++++++++++++++++++- docs/src/site/twiki/DG_SLAMonitoring.twiki | 4 +- release-log.txt | 1 + 5 files changed, 155 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index 3c6b50a..ef019e7 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -52,17 +52,19 @@ import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.EventHandlerService; +import org.apache.oozie.service.InstrumentationService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.SchedulerService; import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; import org.apache.oozie.sla.service.SLAService; + +import com.google.common.annotations.VisibleForTesting; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.LogUtils; -import org.apache.oozie.util.XLog; import org.apache.oozie.util.Pair; - -import com.google.common.annotations.VisibleForTesting; +import org.apache.oozie.util.XLog; /** * Implementation class for SLACalculator that calculates SLA related to @@ -79,6 +81,9 @@ public class SLACalculatorMemory implements SLACalculator { protected EventHandlerService eventHandler; private static int modifiedAfter; private static long jobEventLatency; + private Instrumentation instrumentation; + public static final String INSTRUMENTATION_GROUP = "sla-calculator"; + public static final String SLA_MAP = "sla-map"; @Override public void init(Configuration conf) throws ServiceException { @@ -88,6 +93,7 @@ public class SLACalculatorMemory implements SLACalculator { historySet = Collections.synchronizedSet(new HashSet<String>()); jpaService = Services.get().get(JPAService.class); eventHandler = Services.get().get(EventHandlerService.class); + instrumentation = Services.get().get(InstrumentationService.class).get(); // load events modified after modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7); loadOnRestart(); @@ -139,7 +145,7 @@ public class SLACalculatorMemory implements SLACalculator { .execute(new SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter)); for (SLASummaryBean summaryBean : summaryBeans) { String jobId = summaryBean.getId(); - slaMap.put(jobId, new SLACalcStatus(summaryBean)); + putAndIncrement(jobId, new SLACalcStatus(summaryBean)); } LOG.info("Loaded {0} SLASummary object after restart", slaMap.size()); } @@ -205,8 +211,10 @@ public class SLACalculatorMemory implements SLACalculator { @Override public void clear() { + final int originalSize = slaMap.size(); slaMap.clear(); historySet.clear(); + instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, originalSize); } /** @@ -229,7 +237,7 @@ public class SLACalculatorMemory implements SLACalculator { catch (JPAExecutorException e) { if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId); - slaMap.remove(jobId); + removeAndDecrement(jobId); return; } throw e; @@ -240,7 +248,7 @@ public class SLACalculatorMemory implements SLACalculator { if (eventProc == 7) { historySet.add(jobId); } - slaMap.remove(jobId); + removeAndDecrement(jobId); LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); } else { @@ -260,7 +268,7 @@ public class SLACalculatorMemory implements SLACalculator { catch (XException e) { if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId()); - slaMap.remove(jobId); + removeAndDecrement(jobId); } else { if (firstCheckAfterRetstart) { @@ -361,7 +369,7 @@ public class SLACalculatorMemory implements SLACalculator { SLACalcStatus slaCalc = new SLACalcStatus(reg); slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); slaCalc.setJobStatus(getJobStatus(reg.getAppType())); - slaMap.put(jobId, slaCalc); + putAndIncrement(jobId, slaCalc); List<JsonBean> insertList = new ArrayList<JsonBean>(); final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc); final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date()); @@ -415,7 +423,7 @@ public class SLACalculatorMemory implements SLACalculator { SLACalcStatus slaCalc = new SLACalcStatus(reg); slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); slaCalc.setJobStatus(getJobStatus(reg.getAppType())); - slaMap.put(jobId, slaCalc); + putAndIncrement(jobId, slaCalc); @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); @@ -445,7 +453,7 @@ public class SLACalculatorMemory implements SLACalculator { */ @Override public void removeRegistration(String jobId) { - if (slaMap.remove(jobId) == null) { + if (!removeAndDecrement(jobId)) { historySet.remove(jobId); } } @@ -470,7 +478,7 @@ public class SLACalculatorMemory implements SLACalculator { SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( SLASummaryQuery.GET_SLA_SUMMARY, jobId); slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean); - slaMap.put(jobId, slaCalc); + putAndIncrement(jobId, slaCalc); } } else { @@ -509,12 +517,12 @@ public class SLACalculatorMemory implements SLACalculator { private void checkEventProc(SLACalcStatus slaCalc){ byte eventProc = slaCalc.getEventProcessed(); if (slaCalc.getEventProcessed() >= 8) { - slaMap.remove(slaCalc.getId()); + removeAndDecrement(slaCalc.getId()); LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId()); } if (eventProc == 7) { historySet.add(slaCalc.getId()); - slaMap.remove(slaCalc.getId()); + removeAndDecrement(slaCalc.getId()); LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId()); } } @@ -659,4 +667,26 @@ public class SLACalculatorMemory implements SLACalculator { } return false; } + + private boolean putAndIncrement(final String jobId, final SLACalcStatus newStatus) { + if (slaMap.put(jobId, newStatus) == null) { + LOG.trace("Added a new item to SLA map. [jobId={0}]", jobId); + instrumentation.incr(INSTRUMENTATION_GROUP, SLA_MAP, 1); + return true; + } + + LOG.trace("Updated an existing item in SLA map. [jobId={0}]", jobId); + return false; + } + + private boolean removeAndDecrement(final String jobId) { + if (slaMap.remove(jobId) != null) { + LOG.trace("Removed an existing item from SLA map. [jobId={0}]", jobId); + instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, 1); + return true; + } + + LOG.trace("Tried to remove a non-existing item from SLA map. [jobId={0}]", jobId); + return false; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/core/src/main/java/org/apache/oozie/util/Instrumentation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/Instrumentation.java b/core/src/main/java/org/apache/oozie/util/Instrumentation.java index 45219a9..a57b665 100644 --- a/core/src/main/java/org/apache/oozie/util/Instrumentation.java +++ b/core/src/main/java/org/apache/oozie/util/Instrumentation.java @@ -521,6 +521,18 @@ public class Instrumentation { } /** + * Decrement an instrumentation counter. The counter is created if it does not exists. <p> This method is thread + * safe. + * + * @param group counter group. + * @param name counter name. + * @param count decrement to add to the counter. + */ + public void decr(final String group, final String name, final long count) { + incr(group, name, -count); + } + + /** * Interface for instrumentation variables. <p> For example a the database service could expose the number of * currently active connections. */ http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java index 3a7db10..ee906f4 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java @@ -56,11 +56,13 @@ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.EventHandlerService; +import org.apache.oozie.service.InstrumentationService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.sla.service.SLAService; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.Pair; import org.apache.oozie.workflow.WorkflowInstance; @@ -71,6 +73,7 @@ import org.junit.Test; public class TestSLACalculatorMemory extends XDataTestCase { private Services services; private JPAService jpaService; + private Instrumentation instrumentation; @Override @Before @@ -78,11 +81,13 @@ public class TestSLACalculatorMemory extends XDataTestCase { super.setUp(); services = new Services(); Configuration conf = services.get(ConfigurationService.class).getConf(); - conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService," - + "org.apache.oozie.sla.service.SLAService"); + conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService," + + "org.apache.oozie.sla.service.SLAService," + + "org.apache.oozie.service.InstrumentationService"); conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 600); services.init(); jpaService = services.get(JPAService.class); + instrumentation = services.get(InstrumentationService.class).get(); } @Override @@ -985,4 +990,93 @@ public class TestSLACalculatorMemory extends XDataTestCase { assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString()); } + public void testSingleAddUpdateRemoveInstrumentedCorrectly() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + + WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP); + SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB); + Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); + slaRegBean.setExpectedStart(startTime); // 1 hour back + slaRegBean.setExpectedDuration(1000); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); + String jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); + + long slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP). + get(SLACalculatorMemory.SLA_MAP).getValue(); + + assertEquals("SLA map size after add should be 1", 1, slaMapSize); + + slaCalcMemory.updateJobSla(jobId); + + slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP). + get(SLACalculatorMemory.SLA_MAP).getValue(); + assertEquals("SLA map size after update should be 1", 1, slaMapSize); + + slaCalcMemory.removeRegistration(jobId); + + slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP). + get(SLACalculatorMemory.SLA_MAP).getValue(); + assertEquals("SLA map size after remove should be 0", 0, slaMapSize); + } + + public void testAddMultipleRestartRemoveMultipleInstrumentedCorrectly() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); + String jobId1 = slaRegBean1.getId(); + SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2-W", AppType.WORKFLOW_JOB); + String jobId2 = slaRegBean2.getId(); + SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3-W", AppType.WORKFLOW_JOB); + String jobId3 = slaRegBean3.getId(); + List<String> idList = new ArrayList<String>(); + idList.add(slaRegBean1.getId()); + idList.add(slaRegBean2.getId()); + idList.add(slaRegBean3.getId()); + createWorkflow(idList); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); + slaRegBean1.setAppName("app-name"); + slaRegBean1.setExpectedDuration(123); + slaRegBean1.setExpectedEnd(sdf.parse("2012-02-07")); + slaRegBean1.setExpectedStart(sdf.parse("2011-02-07")); + slaRegBean1.setNominalTime(sdf.parse("2012-01-06")); + slaRegBean1.setUser("user"); + slaRegBean1.setParentId("parentId"); + slaRegBean1.setUpstreamApps("upstreamApps"); + slaRegBean1.setNotificationMsg("notificationMsg"); + slaRegBean1.setAlertContact("[email protected]"); + slaRegBean1.setAlertEvents("MISS"); + slaRegBean1.setJobData("jobData"); + + Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back + Date endTime = new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000); // 1 hour back + + slaRegBean3.setExpectedStart(startTime); + slaRegBean3.setExpectedEnd(endTime); + + slaCalcMemory.addRegistration(jobId1, slaRegBean1); + slaCalcMemory.addRegistration(jobId2, slaRegBean2); + slaCalcMemory.addRegistration(jobId3, slaRegBean3); + + long slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP). + get(SLACalculatorMemory.SLA_MAP).getValue(); + + assertEquals("SLA map size after add all should be 3", 3, slaMapSize); + + slaCalcMemory.updateAllSlaStatus(); + + slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP). + get(SLACalculatorMemory.SLA_MAP).getValue(); + assertEquals("SLA map size after update all should be 2. An instance of SLACalcStatus was removed", 2, slaMapSize); + + slaCalcMemory.removeRegistration(jobId1); + slaCalcMemory.removeRegistration(jobId2); + slaCalcMemory.removeRegistration(jobId3); + + slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP). + get(SLACalculatorMemory.SLA_MAP).getValue(); + assertEquals("SLA map size after remove all should be 0", 0, slaMapSize); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/docs/src/site/twiki/DG_SLAMonitoring.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SLAMonitoring.twiki b/docs/src/site/twiki/DG_SLAMonitoring.twiki index 2d6b138..29dd395 100644 --- a/docs/src/site/twiki/DG_SLAMonitoring.twiki +++ b/docs/src/site/twiki/DG_SLAMonitoring.twiki @@ -151,10 +151,12 @@ Same schema can be applied to and embedded under Workflow-Action as well as Coor ---++ Accessing SLA Information -SLA information is accessible via the following two ways +SLA information is accessible via the following ways: * Through the SLA tab of the Oozie Web UI. * JMS messages sent to a configured JMS provider for instantaneous tracking. * RESTful API to query for SLA summary. + * As an =Instrumentation.Counter= entry that is accessible via RESTful API and reflects to the number of all SLA tracked external + entities. Name of this counter is =sla-calculator.sla-map=. For JMS Notifications, you have to have a message broker in place, on which Oozie publishes messages and you can hook on a subscriber to receive those messages. For more info on setting up and consuming JMS messages, refer http://git-wip-us.apache.org/repos/asf/oozie/blob/50f4b598/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index abb4ab4..4620875 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-3132 Instrument SLACalculatorMemory (andras.piros) OOZIE-2945 Update SpotBugs to stable version after GA (dbist13 via gezapeti) OOZIE-3114 Fix javadoc for warning: no @return (dbist13 via gezapeti) OOZIE-3107 org.apache.oozie.action.hadoop.TestHiveMain#testMain is flaky (dbist13 via pbacsko)
