Repository: oozie Updated Branches: refs/heads/master 2521e89ac -> a4732f9e9
OOZIE-3134 Potential inconsistency between the in-memory SLA map and the Oozie database (kmarton via andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a4732f9e Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a4732f9e Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a4732f9e Branch: refs/heads/master Commit: a4732f9e9ec19d4d5cddc9bdebd804486166e0a4 Parents: 2521e89 Author: Andras Piros <andras.pi...@cloudera.com> Authored: Thu Apr 5 10:09:11 2018 +0200 Committer: Andras Piros <andras.pi...@cloudera.com> Committed: Thu Apr 5 10:09:11 2018 +0200 ---------------------------------------------------------------------- .../apache/oozie/sla/SLACalculatorMemory.java | 4 +- .../util/db/AlwaysFailingHSQLDriverMapper.java | 25 ++++ .../oozie/util/db/FailingConnectionWrapper.java | 19 +-- .../oozie/util/db/FailingDBHelperForTest.java | 50 ++++++++ .../util/db/FailingHSQLDBDriverWrapper.java | 14 ++- .../util/db/FailingMySQLDriverWrapper.java | 4 +- .../oozie/sla/TestSLACalculatorMemory.java | 121 +++++++++++++++++++ release-log.txt | 6 +- 8 files changed, 229 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index ef019e7..a6ed0ff 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -369,7 +369,6 @@ public class SLACalculatorMemory implements SLACalculator { SLACalcStatus slaCalc = new SLACalcStatus(reg); slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); slaCalc.setJobStatus(getJobStatus(reg.getAppType())); - putAndIncrement(jobId, slaCalc); List<JsonBean> insertList = new ArrayList<JsonBean>(); final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc); final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date()); @@ -378,6 +377,7 @@ public class SLACalculatorMemory implements SLACalculator { insertList.add(reg); insertList.add(summaryBean); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + putAndIncrement(jobId, slaCalc); LOG.trace("SLA Registration Event - Job:" + jobId); return true; } @@ -423,7 +423,6 @@ public class SLACalculatorMemory implements SLACalculator { SLACalcStatus slaCalc = new SLACalcStatus(reg); slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); slaCalc.setJobStatus(getJobStatus(reg.getAppType())); - putAndIncrement(jobId, slaCalc); @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); @@ -431,6 +430,7 @@ public class SLACalculatorMemory implements SLACalculator { updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, new SLASummaryBean(slaCalc))); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); + putAndIncrement(jobId, slaCalc); LOG.trace("SLA Registration Event - Job:" + jobId); return true; } http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java b/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java new file mode 100644 index 0000000..a55088e --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +public class AlwaysFailingHSQLDriverMapper extends FailingHSQLDBDriverWrapper{ + public AlwaysFailingHSQLDriverMapper() { + super(100); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java index 0e31025..3b87cc9 100644 --- a/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java +++ b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java @@ -50,13 +50,18 @@ public class FailingConnectionWrapper implements Connection { private static final XLog LOG = XLog.getLog(FailingConnectionWrapper.class); private final Connection delegate; - private static final RuntimeExceptionInjector<PersistenceException> injector = - new RuntimeExceptionInjector<>(PersistenceException.class, 5); - private static final OozieDmlStatementPredicate oozieDmlStatementPredicate = - new OozieDmlStatementPredicate(); + private RuntimeExceptionInjector<PersistenceException> injector; + private Predicate<String> predicate; - public FailingConnectionWrapper(final Connection delegate) throws SQLException { + public FailingConnectionWrapper(final Connection delegate, final int failurePercent, + @Nullable final Predicate<String> predicate) { this.delegate = delegate; + injector = new RuntimeExceptionInjector<>(PersistenceException.class, failurePercent); + if (predicate == null) { + this.predicate = new OozieDmlStatementPredicate(); + } else { + this.predicate = predicate; + } } @Override @@ -162,8 +167,8 @@ public class FailingConnectionWrapper implements Connection { @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { - if (oozieDmlStatementPredicate.apply(sql)) { - LOG.trace("Injecting random failure. It's a DML statement of an Oozie table, preparing this statement might fail."); + if (predicate.apply(sql)) { + LOG.trace("Injecting random failure. Preparing this statement might fail."); injector.inject(String.format("Deliberately failing to prepare statement. [sql=%s]", sql)); } http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java b/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java new file mode 100644 index 0000000..af55b06 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; + +/** + * Helper class for simulating DB failures. + */ +public class FailingDBHelperForTest { + static final Predicate<String> DEFAULT_DB_PREDICATE = new FailingConnectionWrapper.OozieDmlStatementPredicate(); + // predicate which defines when the failure should be injected. + // By default the DML statements for Oozie tables witt be the targeted statements + @VisibleForTesting + static Predicate<String> dbPredicate = DEFAULT_DB_PREDICATE; + + /** + * change the used predicate value + * @param predicate + */ + @VisibleForTesting + public static void setDbPredicate (final Predicate predicate) { + FailingDBHelperForTest.dbPredicate = predicate; + } + + /** + * reset the predicate, to the default Oozie DML predicate. + */ + @VisibleForTesting + public static void resetDbPredicate() { + FailingDBHelperForTest.dbPredicate = DEFAULT_DB_PREDICATE; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java index fe9f08b..5f5d56f 100644 --- a/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java +++ b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java @@ -21,17 +21,27 @@ package org.apache.oozie.util.db; import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; +import com.google.common.base.Predicate; public class FailingHSQLDBDriverWrapper extends org.hsqldb.jdbcDriver { public static final String USE_FAILING_DRIVER = "oozie.sql.use.failing.driver"; + public static final int DEFAULT_FAILURE_PERCENT = 5; + private int failurePercent; + + public FailingHSQLDBDriverWrapper () { + this(DEFAULT_FAILURE_PERCENT); + } + + public FailingHSQLDBDriverWrapper(int failurePercent) { + this.failurePercent = failurePercent; + } public Connection connect(final String url, final Properties info) throws SQLException { if (Boolean.parseBoolean(System.getProperty(USE_FAILING_DRIVER, "false"))) { - return new FailingConnectionWrapper(super.connect(url, info)); + return new FailingConnectionWrapper(super.connect(url, info), failurePercent, FailingDBHelperForTest.dbPredicate); } - return super.connect(url, info); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java index f0e2b18..abb21c9 100644 --- a/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java +++ b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java @@ -46,6 +46,6 @@ public class FailingMySQLDriverWrapper extends Driver { public Connection connect(final String url, final Properties info) throws SQLException { - return new FailingConnectionWrapper(super.connect(url, info)); + return new FailingConnectionWrapper(super.connect(url, info), FailingHSQLDBDriverWrapper.DEFAULT_FAILURE_PERCENT, null); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java index ee906f4..f340137 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java @@ -24,7 +24,11 @@ import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -47,6 +51,7 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; @@ -65,11 +70,17 @@ import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.Pair; +import org.apache.oozie.util.db.AlwaysFailingHSQLDriverMapper; +import org.apache.oozie.util.db.FailingHSQLDBDriverWrapper; +import org.apache.oozie.util.db.FailingDBHelperForTest; import org.apache.oozie.workflow.WorkflowInstance; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; + public class TestSLACalculatorMemory extends XDataTestCase { private Services services; private JPAService jpaService; @@ -1079,4 +1090,114 @@ public class TestSLACalculatorMemory extends XDataTestCase { get(SLACalculatorMemory.SLA_MAP).getValue(); assertEquals("SLA map size after remove all should be 0", 0, slaMapSize); } + + + public void testWhenSLARegistrationIsAddedBeanIsStoredCorrectly() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); + slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); + Assert.assertNotNull(slaCalcMemory.get(slaRegBean.getId())); + Assert.assertEquals(slaRegBean, slaCalcMemory.get(slaRegBean.getId()).getSLARegistrationBean()); + } + + public void testWhenSLARegistrationIsAddedAndAllDBCallsAreDisruptedBeanIsNotStored() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); + + try { + FailingDBHelperForTest.setDbPredicate(new SLARegistrationDmlPredicate()); + prepareFailingDB(); + slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); + fail("Expected JPAExecutorException not thrown"); + } catch (JPAExecutorException ex) { + Assert.assertNull(slaCalcMemory.get(slaRegBean.getId())); + } finally { + FailingDBHelperForTest.resetDbPredicate(); + System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER); + } + + } + + public void testWhenSLARegistrationIsUpdatedBeanIsStoredCorrectly() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + final String jobId = "job-1-W"; + SLARegistrationBean slaRegBean = _createSLARegistration(jobId, AppType.WORKFLOW_JOB); + SLARegistrationBean slaRegBean2 = _createSLARegistration(jobId, AppType.WORKFLOW_JOB); + + addAndUpdateRegistration(slaCalcMemory, jobId, slaRegBean, slaRegBean2); + + Assert.assertNotNull(slaCalcMemory.get(jobId)); + Assert.assertEquals("The updated SLA registration bean should be in the cache", + slaRegBean2, slaCalcMemory.get(jobId).getSLARegistrationBean()); + } + + public void testWhenSLARegistrationIsUpdatedAndAllDBCallsAreDisruptedBeanIsNotStored() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + final String jobId = "job-1-W"; + SLARegistrationBean slaRegBean = _createSLARegistration(jobId, AppType.WORKFLOW_JOB); + SLARegistrationBean slaRegBean2 = _createSLARegistration(jobId, AppType.WORKFLOW_JOB); + final int expectedDuration = 1000; + slaRegBean2.setExpectedDuration(expectedDuration); + + try { + addAndUpdateRegistrationWithDBCrushSimulation(slaCalcMemory, jobId, slaRegBean, slaRegBean2); + fail("Expected JPAExecutorException not thrown"); + } catch (JPAExecutorException ex) { + Assert.assertNotNull(slaCalcMemory.get(slaRegBean.getId())); + // the update failed + Assert.assertEquals(slaRegBean, slaCalcMemory.get(jobId).getSLARegistrationBean()); + } finally { + FailingDBHelperForTest.resetDbPredicate(); + System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER); + } + + } + + private void addAndUpdateRegistration(final SLACalculatorMemory slaCalcMemory, final String jobId, + final SLARegistrationBean slaRegBean, final SLARegistrationBean slaRegBean2) + throws JPAExecutorException { + slaCalcMemory.addRegistration(jobId, slaRegBean); + slaCalcMemory.updateRegistration(jobId, slaRegBean2); + } + + private void addAndUpdateRegistrationWithDBCrushSimulation(final SLACalculatorMemory slaCalcMemory, final String jobId, + final SLARegistrationBean slaRegBean, + final SLARegistrationBean slaRegBean2) throws Exception { + slaCalcMemory.addRegistration(jobId, slaRegBean); + FailingDBHelperForTest.setDbPredicate(new SLARegistrationDmlPredicate()); + prepareFailingDB(); + slaCalcMemory.updateRegistration(jobId, slaRegBean2); + } + + private void prepareFailingDB() throws Exception { + System.setProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER, Boolean.TRUE.toString()); + Configuration conf = services.get(ConfigurationService.class).getConf(); + conf.set(JPAService.CONF_DRIVER, AlwaysFailingHSQLDriverMapper.class.getCanonicalName()); + conf.setInt(JPAService.MAX_RETRY_COUNT, 2); + jpaService.destroy(); + jpaService.init(services); + } + + static class SLARegistrationDmlPredicate implements com.google.common.base.Predicate<String> { + private static final String TABLE_NAME = "SLA_REGISTRATION"; + private static final Set<String> OPERATIONS = Sets.newHashSet("INSERT INTO ", "UPDATE "); + + @Override + public boolean apply(@Nullable String input) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(input)); + boolean operationMatch = false; + for (String s: OPERATIONS) { + if (input.startsWith(s)) { + operationMatch = true; + break; + } + } + return operationMatch && input.toUpperCase().contains(TABLE_NAME); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 9a4947c..0d79c35 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,7 +1,11 @@ --- Oozie 5.0.0 release (trunk - unreleased) +-- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3134 Potential inconsistency between the in-memory SLA map and the Oozie database (kmarton via andras.piros) OOZIE-3209 XML schema error when submitting pyspark example (kmarton via andras.piros) OOZIE-2937 Remove redundant groupId from the child POMs (Jan Hentschel via andras.piros) + +-- Oozie 5.0.0 release + OOZIE-3176 Oozie-core fails with checkstyle errors (alishap via andras.piros) OOZIE-2726 Flaky test due to daylight saving changes (sasishsaley, andras.piros via gezapeti) OOZIE-2645 Deprecate Instrumentation in favor of Metrics (andras.piros via gezapeti)