Repository: oozie Updated Branches: refs/heads/master 550850da7 -> 87b3d0fff
OOZIE-3260 [sla] Remove stale item above max retries on JPA related errors from in-memory SLA map (andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/87b3d0ff Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/87b3d0ff Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/87b3d0ff Branch: refs/heads/master Commit: 87b3d0fff47de3e25251a0b667050a8ffc36f615 Parents: 550850d Author: Andras Piros <[email protected]> Authored: Tue Jun 12 14:37:14 2018 +0200 Committer: Andras Piros <[email protected]> Committed: Tue Jun 12 14:37:14 2018 +0200 ---------------------------------------------------------------------- .../org/apache/oozie/sla/SLACalcStatus.java | 13 +- .../apache/oozie/sla/SLACalculatorMemory.java | 53 +++++-- .../apache/oozie/sla/service/SLAService.java | 1 + core/src/main/resources/oozie-default.xml | 11 ++ .../org/apache/oozie/sla/TestSLACalcStatus.java | 41 ++++++ .../oozie/sla/TestSLACalculatorMemory.java | 145 +++++++++++++++++-- docs/src/site/twiki/DG_SLAMonitoring.twiki | 11 ++ release-log.txt | 1 + 8 files changed, 257 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java index 7be16f0..3d7d6e8 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.oozie.sla; import java.util.Date; @@ -46,6 +45,7 @@ public class SLACalcStatus extends SLAEvent { private Date lastModifiedTime; private byte eventProcessed; private String jobId; + private int retryCount = 0; private XLog LOG; @@ -307,4 +307,15 @@ public class SLACalcStatus extends SLAEvent { setSLARegistrationBean(reg); } + int getRetryCount() { + return retryCount; + } + + void incrementRetryCount() { + this.retryCount++; + } + + void resetRetryCount() { + this.retryCount = 0; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index a6ed0ff..c7ccfcc 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -74,7 +74,7 @@ public class SLACalculatorMemory implements SLACalculator { private static XLog LOG = XLog.getLog(SLACalculatorMemory.class); // TODO optimization priority based insertion/processing/bumping up-down - protected Map<String, SLACalcStatus> slaMap; + private Map<String, SLACalcStatus> slaMap; protected Set<String> historySet; private static int capacity; private static JPAService jpaService; @@ -84,11 +84,13 @@ public class SLACalculatorMemory implements SLACalculator { private Instrumentation instrumentation; public static final String INSTRUMENTATION_GROUP = "sla-calculator"; public static final String SLA_MAP = "sla-map"; + private int maxRetryCount; @Override public void init(Configuration conf) throws ServiceException { capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY); jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY); + maxRetryCount = ConfigurationService.getInt(conf, SLAService.CONF_MAXIMUM_RETRY_COUNT); slaMap = new ConcurrentHashMap<String, SLACalcStatus>(); historySet = Collections.synchronizedSet(new HashSet<String>()); jpaService = Services.get().get(JPAService.class); @@ -218,9 +220,14 @@ public class SLACalculatorMemory implements SLACalculator { } /** - * Invoked via periodic run, update the SLA for registered jobs + * Invoked via periodic run, update the SLA for registered jobs. + * <p> + * Track the number of times the {@link SLACalcStatus} entry has not been processed successfully, and when a preconfigured + * {code oozie.sla.service.SLAService.maximum.retry.count} is reached, remove any {@link SLACalculatorMemory#slaMap} entries + * that are causing {@code JPAExecutorException}s of certain {@link ErrorCode}s. + * @param jobId the workflow or coordinator job or action ID the SLA is tracked against */ - protected void updateJobSla(String jobId) throws Exception { + void updateJobSla(String jobId) throws Exception { SLACalcStatus slaCalc = slaMap.get(jobId); if (slaCalc == null) { @@ -231,13 +238,16 @@ public class SLACalculatorMemory implements SLACalculator { // get eventProcessed on DB for validation in HA SLASummaryBean summaryBean = null; try { - summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()) + summaryBean = SLASummaryQueryExecutor.getInstance() .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); - } - catch (JPAExecutorException e) { - if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { - LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId); - removeAndDecrement(jobId); + resetRetryCount(jobId); + } + catch (final JPAExecutorException e) { + if (e.getErrorCode().equals(ErrorCode.E0603) + || e.getErrorCode().equals(ErrorCode.E0604) + || e.getErrorCode().equals(ErrorCode.E0605)) { + LOG.debug("job [{0}] is not in DB, removing from Memory", jobId); + incrementRetryCountAndRemove(jobId); return; } throw e; @@ -689,4 +699,29 @@ public class SLACalculatorMemory implements SLACalculator { LOG.trace("Tried to remove a non-existing item from SLA map. [jobId={0}]", jobId); return false; } + + private void resetRetryCount(final String jobId) { + if (slaMap.containsKey(jobId)) { + LOG.debug("Resetting retry count on [{0}]", jobId); + final SLACalcStatus existingStatus = slaMap.get(jobId); + existingStatus.resetRetryCount(); + putAndIncrement(jobId, existingStatus); + } + } + + private void incrementRetryCountAndRemove(final String jobId) { + LOG.debug("Checking SLA calculator status [{0}] for retry count", jobId); + if (slaMap.containsKey(jobId)) { + final SLACalcStatus existingStatus = slaMap.get(jobId); + if (existingStatus.getRetryCount() < maxRetryCount) { + existingStatus.incrementRetryCount(); + LOG.debug("Retrying with SLA calculator status [{0}] retry count [{1}]", jobId, existingStatus.getRetryCount()); + putAndIncrement(jobId, existingStatus); + } + else { + LOG.debug("Removing [{0}] from SLA map as maximum retry count reached", jobId); + removeAndDecrement(jobId); + } + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/java/org/apache/oozie/sla/service/SLAService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java index 2d23a22..646fe29 100644 --- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java +++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java @@ -53,6 +53,7 @@ public class SLAService implements Service { public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay"; public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout"; public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval"; + public static final String CONF_MAXIMUM_RETRY_COUNT = CONF_PREFIX + "maximum.retry.count"; private static SLACalculator calcImpl; private static boolean slaEnabled = false; http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 3d627be..8d7465c 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2566,6 +2566,17 @@ will be the requeue interval for the actions which are waiting for a long time w </description> </property> + <property> + <name>oozie.sla.service.SLAService.maximum.retry.count</name> + <value>3</value> + <description> + Number of times an SLA calculator status will be tried to get updated when any database related error occurs. + It's possible that multiple WorkflowJobBean / CoordActionBean instances being inserted won't have SLACalcStatus entries + inside SLACalculatorMemory#slaMap by the time written to database, and thus, no SLA will be tracked. + In those rare cases, preconfigured maximum retry count can be extended. + </description> + </property> + <!-- ZooKeeper configuration --> <property> <name>oozie.zookeeper.connection.string</name> http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java new file mode 100644 index 0000000..85cf9b3 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.sla; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestSLACalcStatus { + + @Test + public void testRetryCountOperations() { + final SLACalcStatus status = new SLACalcStatus(); + assertEquals("initial retryCount mismatch", 0, status.getRetryCount()); + + status.incrementRetryCount(); + status.incrementRetryCount(); + status.incrementRetryCount(); + assertEquals("retryCount mismatch after increments", 3, status.getRetryCount()); + + status.resetRetryCount(); + assertEquals("retryCount mismatch after reset", 0, status.getRetryCount()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java index f340137..4a8e8e6 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java @@ -63,6 +63,7 @@ import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.InstrumentationService; import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; import org.apache.oozie.sla.service.SLAService; import org.apache.oozie.test.XDataTestCase; @@ -1107,7 +1108,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); try { - FailingDBHelperForTest.setDbPredicate(new SLARegistrationDmlPredicate()); + FailingDBHelperForTest.setDbPredicate(new SLARegistrationInsertUpdatePredicate()); prepareFailingDB(); slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); fail("Expected JPAExecutorException not thrown"); @@ -1154,7 +1155,99 @@ public class TestSLACalculatorMemory extends XDataTestCase { FailingDBHelperForTest.resetDbPredicate(); System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER); } + } + public void testWhenSLARegistrationExistsWithoutSLASummaryUpdateSLARetries() throws Exception { + final SLACalculatorMemory slaCalculatorMemory = new SLACalculatorMemory(); + slaCalculatorMemory.init(Services.get().get(ConfigurationService.class).getConf()); + final String jobId = "job-1-W"; + final SLARegistrationBean slaRegistration = _createSLARegistration(jobId, AppType.WORKFLOW_JOB); + slaCalculatorMemory.addRegistration(jobId, slaRegistration); + + updateJobSlaFailing(slaCalculatorMemory, jobId, + new Runnable() { + @Override + public void run() { + try { + Assert.assertNotNull("after first update, SLACalcStatus should still be present", + slaCalculatorMemory.get(slaRegistration.getId())); + Assert.assertEquals("updating SLA_REGISTRATION should have been failed", + slaRegistration, + slaCalculatorMemory.get(jobId).getSLARegistrationBean()); + Assert.assertEquals("SLACalcStatus.retryCount should have been increased", + 1, slaCalculatorMemory.get(jobId).getRetryCount()); + } catch (JPAExecutorException ignored) { + } + } + }); + + updateJobSlaFailing(slaCalculatorMemory, jobId, + new Runnable() { + @Override + public void run() { + try { + Assert.assertNotNull("after second update, SLACalcStatus should still be present", + slaCalculatorMemory.get(slaRegistration.getId())); + Assert.assertEquals("updating SLA_REGISTRATION should have been failed", + slaRegistration, + slaCalculatorMemory.get(jobId).getSLARegistrationBean()); + Assert.assertEquals("SLACalcStatus.retryCount should have been increased", + 2, slaCalculatorMemory.get(jobId).getRetryCount()); + } catch (JPAExecutorException ignored) { + } + } + } + ); + + updateJobSlaFailing(slaCalculatorMemory, jobId, + new Runnable() { + @Override + public void run() { + try { + Assert.assertNotNull("after third update, SLACalcStatus should still be present", + slaCalculatorMemory.get(slaRegistration.getId())); + Assert.assertEquals("updating SLA_REGISTRATION should have been failed", + slaRegistration, + slaCalculatorMemory.get(jobId).getSLARegistrationBean()); + Assert.assertEquals("SLACalcStatus.retryCount should have been increased", + 3, slaCalculatorMemory.get(jobId).getRetryCount()); + } catch (JPAExecutorException ignored) { + } + } + } + ); + + updateJobSlaFailing(slaCalculatorMemory, jobId, + new Runnable() { + @Override + public void run() { + try { + Assert.assertNull("after fourth update, SLACalcStatus should no more be present", + slaCalculatorMemory.get(slaRegistration.getId())); + } catch (JPAExecutorException ignored) { + } + } + } + ); + } + + private void updateJobSlaFailing(final SLACalculatorMemory slaCalculatorMemory, + final String jobId, + final Runnable assertsWhenFailing) + throws Exception { + try { + FailingDBHelperForTest.setDbPredicate(new SLASummarySelectPredicate(1)); + prepareFailingDB(); + + slaCalculatorMemory.updateJobSla(jobId); + } + catch (final JPAExecutorException e) { + assertsWhenFailing.run(); + } + finally { + FailingDBHelperForTest.resetDbPredicate(); + System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER); + } } private void addAndUpdateRegistration(final SLACalculatorMemory slaCalcMemory, final String jobId, @@ -1168,12 +1261,12 @@ public class TestSLACalculatorMemory extends XDataTestCase { final SLARegistrationBean slaRegBean, final SLARegistrationBean slaRegBean2) throws Exception { slaCalcMemory.addRegistration(jobId, slaRegBean); - FailingDBHelperForTest.setDbPredicate(new SLARegistrationDmlPredicate()); + FailingDBHelperForTest.setDbPredicate(new SLARegistrationInsertUpdatePredicate()); prepareFailingDB(); slaCalcMemory.updateRegistration(jobId, slaRegBean2); } - private void prepareFailingDB() throws Exception { + private void prepareFailingDB() throws ServiceException { System.setProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER, Boolean.TRUE.toString()); Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(JPAService.CONF_DRIVER, AlwaysFailingHSQLDriverMapper.class.getCanonicalName()); @@ -1182,22 +1275,56 @@ public class TestSLACalculatorMemory extends XDataTestCase { jpaService.init(services); } - static class SLARegistrationDmlPredicate implements com.google.common.base.Predicate<String> { - private static final String TABLE_NAME = "SLA_REGISTRATION"; - private static final Set<String> OPERATIONS = Sets.newHashSet("INSERT INTO ", "UPDATE "); + static class SLARegistrationInsertUpdatePredicate extends DmlPredicate { + SLARegistrationInsertUpdatePredicate() { + super("SLA_REGISTRATION", Sets.newHashSet("INSERT INTO ", "UPDATE ")); + } + } + + static class SLASummarySelectPredicate extends DmlPredicate { + private int remainingSuccessfulAttempts; + SLASummarySelectPredicate(final int remainingSuccessfulAttempts) { + super("SLA_SUMMARY", Sets.newHashSet("SELECT ")); + this.remainingSuccessfulAttempts = remainingSuccessfulAttempts; + } + + @Override + public boolean apply(@Nullable String input) { + if (super.apply(input)) { + if (remainingSuccessfulAttempts <= 0) { + return true; + } + else { + remainingSuccessfulAttempts--; + return false; + } + } + else { + return false; + } + } + } + + static class DmlPredicate implements com.google.common.base.Predicate<String> { + private final String tableName; + private final Set<String> operationPrefixes; + + DmlPredicate(final String tableName, final Set<String> operationPrefixes) { + this.tableName = tableName; + this.operationPrefixes = operationPrefixes; + } @Override public boolean apply(@Nullable String input) { Preconditions.checkArgument(!Strings.isNullOrEmpty(input)); boolean operationMatch = false; - for (String s: OPERATIONS) { + for (String s: operationPrefixes) { if (input.startsWith(s)) { operationMatch = true; break; } } - return operationMatch && input.toUpperCase().contains(TABLE_NAME); + return operationMatch && input.toUpperCase().contains(tableName); } } - } http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/docs/src/site/twiki/DG_SLAMonitoring.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SLAMonitoring.twiki b/docs/src/site/twiki/DG_SLAMonitoring.twiki index 29dd395..c91c227 100644 --- a/docs/src/site/twiki/DG_SLAMonitoring.twiki +++ b/docs/src/site/twiki/DG_SLAMonitoring.twiki @@ -408,6 +408,17 @@ Refer [[DG_CommandLineTool#Changing_job_SLA_definition_and_alerting][Changing jo ---++++ 4. Change using REST API Refer the REST API [[WebServicesAPI#Changing_job_SLA_definition_and_alerting][Changing job SLA definition and alerting]]. +---++ In-memory SLA entries and database content + +There are special circumstances when the in-memory =SLACalcStatus= entries can exist without the workflow or coordinator job or +action instances in database. For example: + * SLA tracked database content may already have been deleted, and =SLA_SUMMARY= entry is not present anymore in database + * SLA tracked database content and =SLA_SUMMARY= entry aren't yet present in database + +By the time =SLAService= scheduled job will be running, SLA map contents are checked. When the =SLA_SUMMARY= entry for the in-memory +SLA entry is missing, a counter is increased. When this counter reaches the server-wide preconfigured value +=oozie.sla.service.SLAService.maximum.retry.count= (by default =3=), in-memory SLA entry will get purged. + ---++ Known issues There are two known issues when you define SLA for a workflow action. * If there are decision nodes and SLA is defined for a workflow action not in the execution path because of the decision node, you will still get an SLA_MISS notification. http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 3e7e98d..a93be9d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3260 [sla] Remove stale item above max retries on JPA related errors from in-memory SLA map (andras.piros) OOZIE-3233 Remove DST shift from the coordinator job's end time (kmarton via andras.piros) OOZIE-1393 Allow sending emails via TLS (mbalakrishnan, dionusos via andras.piros) OOZIE-3156 Retry SSH action check when cannot connect to remote host (txsing via andras.piros)
