Repository: oozie
Updated Branches:
  refs/heads/master 2521e89ac -> a4732f9e9


OOZIE-3134 Potential inconsistency between the in-memory SLA map and the Oozie 
database (kmarton via andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a4732f9e
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a4732f9e
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a4732f9e

Branch: refs/heads/master
Commit: a4732f9e9ec19d4d5cddc9bdebd804486166e0a4
Parents: 2521e89
Author: Andras Piros <andras.pi...@cloudera.com>
Authored: Thu Apr 5 10:09:11 2018 +0200
Committer: Andras Piros <andras.pi...@cloudera.com>
Committed: Thu Apr 5 10:09:11 2018 +0200

----------------------------------------------------------------------
 .../apache/oozie/sla/SLACalculatorMemory.java   |   4 +-
 .../util/db/AlwaysFailingHSQLDriverMapper.java  |  25 ++++
 .../oozie/util/db/FailingConnectionWrapper.java |  19 +--
 .../oozie/util/db/FailingDBHelperForTest.java   |  50 ++++++++
 .../util/db/FailingHSQLDBDriverWrapper.java     |  14 ++-
 .../util/db/FailingMySQLDriverWrapper.java      |   4 +-
 .../oozie/sla/TestSLACalculatorMemory.java      | 121 +++++++++++++++++++
 release-log.txt                                 |   6 +-
 8 files changed, 229 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java 
b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index ef019e7..a6ed0ff 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -369,7 +369,6 @@ public class SLACalculatorMemory implements SLACalculator {
                 SLACalcStatus slaCalc = new SLACalcStatus(reg);
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
                 slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
-                putAndIncrement(jobId, slaCalc);
                 List<JsonBean> insertList = new ArrayList<JsonBean>();
                 final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc);
                 final Timestamp currentTime = 
DateUtils.convertDateToTimestamp(new Date());
@@ -378,6 +377,7 @@ public class SLACalculatorMemory implements SLACalculator {
                 insertList.add(reg);
                 insertList.add(summaryBean);
                 
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, 
null, null);
+                putAndIncrement(jobId, slaCalc);
                 LOG.trace("SLA Registration Event - Job:" + jobId);
                 return true;
             }
@@ -423,7 +423,6 @@ public class SLACalculatorMemory implements SLACalculator {
                 SLACalcStatus slaCalc = new SLACalcStatus(reg);
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
                 slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
-                putAndIncrement(jobId, slaCalc);
 
                 @SuppressWarnings("rawtypes")
                 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
@@ -431,6 +430,7 @@ public class SLACalculatorMemory implements SLACalculator {
                 updateList.add(new 
UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
                         new SLASummaryBean(slaCalc)));
                 
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, 
updateList, null);
+                putAndIncrement(jobId, slaCalc);
                 LOG.trace("SLA Registration Event - Job:" + jobId);
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java
 
b/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java
new file mode 100644
index 0000000..a55088e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/util/db/AlwaysFailingHSQLDriverMapper.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+public class AlwaysFailingHSQLDriverMapper extends FailingHSQLDBDriverWrapper{
+    public AlwaysFailingHSQLDriverMapper() {
+        super(100);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java 
b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java
index 0e31025..3b87cc9 100644
--- a/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java
+++ b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java
@@ -50,13 +50,18 @@ public class FailingConnectionWrapper implements Connection 
{
     private static final XLog LOG = 
XLog.getLog(FailingConnectionWrapper.class);
 
     private final Connection delegate;
-    private static final RuntimeExceptionInjector<PersistenceException> 
injector =
-            new RuntimeExceptionInjector<>(PersistenceException.class, 5);
-    private static final OozieDmlStatementPredicate oozieDmlStatementPredicate 
=
-            new OozieDmlStatementPredicate();
+    private RuntimeExceptionInjector<PersistenceException> injector;
+    private Predicate<String> predicate;
 
-    public FailingConnectionWrapper(final Connection delegate) throws 
SQLException {
+    public FailingConnectionWrapper(final Connection delegate, final int 
failurePercent,
+                                    @Nullable final Predicate<String> 
predicate) {
         this.delegate = delegate;
+        injector = new RuntimeExceptionInjector<>(PersistenceException.class, 
failurePercent);
+        if (predicate == null) {
+            this.predicate = new OozieDmlStatementPredicate();
+        } else {
+            this.predicate = predicate;
+        }
     }
 
     @Override
@@ -162,8 +167,8 @@ public class FailingConnectionWrapper implements Connection 
{
     @Override
     public PreparedStatement prepareStatement(final String sql, final int 
resultSetType, final int resultSetConcurrency)
             throws SQLException {
-        if (oozieDmlStatementPredicate.apply(sql)) {
-            LOG.trace("Injecting random failure. It's a DML statement of an 
Oozie table, preparing this statement might fail.");
+        if (predicate.apply(sql)) {
+            LOG.trace("Injecting random failure. Preparing this statement 
might fail.");
             injector.inject(String.format("Deliberately failing to prepare 
statement. [sql=%s]", sql));
         }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java 
b/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java
new file mode 100644
index 0000000..af55b06
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/FailingDBHelperForTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+
+/**
+ * Helper class for simulating DB failures.
+ */
+public class FailingDBHelperForTest {
+    static final Predicate<String> DEFAULT_DB_PREDICATE = new 
FailingConnectionWrapper.OozieDmlStatementPredicate();
+    // predicate which defines when the failure should be injected.
+    // By default the DML statements for Oozie tables witt be the targeted 
statements
+    @VisibleForTesting
+    static Predicate<String> dbPredicate = DEFAULT_DB_PREDICATE;
+
+    /**
+     * change the used predicate value
+     * @param predicate
+     */
+    @VisibleForTesting
+    public static void setDbPredicate (final Predicate predicate) {
+        FailingDBHelperForTest.dbPredicate = predicate;
+    }
+
+    /**
+     * reset the predicate, to the default Oozie DML predicate.
+     */
+    @VisibleForTesting
+    public static void resetDbPredicate() {
+        FailingDBHelperForTest.dbPredicate = DEFAULT_DB_PREDICATE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java 
b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java
index fe9f08b..5f5d56f 100644
--- 
a/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java
+++ 
b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java
@@ -21,17 +21,27 @@ package org.apache.oozie.util.db;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Properties;
+import com.google.common.base.Predicate;
 
 public class FailingHSQLDBDriverWrapper extends org.hsqldb.jdbcDriver {
 
     public static final String USE_FAILING_DRIVER = 
"oozie.sql.use.failing.driver";
+    public static final int DEFAULT_FAILURE_PERCENT = 5;
+    private int failurePercent;
+
+    public FailingHSQLDBDriverWrapper () {
+        this(DEFAULT_FAILURE_PERCENT);
+    }
+
+    public FailingHSQLDBDriverWrapper(int failurePercent) {
+        this.failurePercent = failurePercent;
+    }
 
     public Connection connect(final String url,
                               final Properties info) throws SQLException {
         if (Boolean.parseBoolean(System.getProperty(USE_FAILING_DRIVER, 
"false"))) {
-            return new FailingConnectionWrapper(super.connect(url, info));
+            return new FailingConnectionWrapper(super.connect(url, info), 
failurePercent, FailingDBHelperForTest.dbPredicate);
         }
-
         return super.connect(url, info);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java 
b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java
index f0e2b18..abb21c9 100644
--- a/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java
+++ b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java
@@ -46,6 +46,6 @@ public class FailingMySQLDriverWrapper extends Driver {
 
     public Connection connect(final String url,
                               final Properties info) throws SQLException {
-        return new FailingConnectionWrapper(super.connect(url, info));
+        return new FailingConnectionWrapper(super.connect(url, info), 
FailingHSQLDBDriverWrapper.DEFAULT_FAILURE_PERCENT, null);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java 
b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index ee906f4..f340137 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -24,7 +24,11 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -47,6 +51,7 @@ import 
org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
@@ -65,11 +70,17 @@ import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.JobUtils;
 import org.apache.oozie.util.Pair;
+import org.apache.oozie.util.db.AlwaysFailingHSQLDriverMapper;
+import org.apache.oozie.util.db.FailingHSQLDBDriverWrapper;
+import org.apache.oozie.util.db.FailingDBHelperForTest;
 import org.apache.oozie.workflow.WorkflowInstance;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 public class TestSLACalculatorMemory extends XDataTestCase {
     private Services services;
     private JPAService jpaService;
@@ -1079,4 +1090,114 @@ public class TestSLACalculatorMemory extends 
XDataTestCase {
                 get(SLACalculatorMemory.SLA_MAP).getValue();
         assertEquals("SLA map size after remove all should be 0", 0, 
slaMapSize);
     }
+
+
+    public void testWhenSLARegistrationIsAddedBeanIsStoredCorrectly() throws 
Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", 
AppType.WORKFLOW_JOB);
+        slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+        Assert.assertNotNull(slaCalcMemory.get(slaRegBean.getId()));
+        Assert.assertEquals(slaRegBean, 
slaCalcMemory.get(slaRegBean.getId()).getSLARegistrationBean());
+    }
+
+    public void 
testWhenSLARegistrationIsAddedAndAllDBCallsAreDisruptedBeanIsNotStored() throws 
Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", 
AppType.WORKFLOW_JOB);
+
+        try {
+            FailingDBHelperForTest.setDbPredicate(new 
SLARegistrationDmlPredicate());
+            prepareFailingDB();
+            slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+            fail("Expected JPAExecutorException not thrown");
+        } catch (JPAExecutorException ex) {
+            Assert.assertNull(slaCalcMemory.get(slaRegBean.getId()));
+        } finally {
+            FailingDBHelperForTest.resetDbPredicate();
+            
System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER);
+        }
+
+    }
+
+    public void testWhenSLARegistrationIsUpdatedBeanIsStoredCorrectly() throws 
Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        final String jobId = "job-1-W";
+        SLARegistrationBean slaRegBean = _createSLARegistration(jobId, 
AppType.WORKFLOW_JOB);
+        SLARegistrationBean slaRegBean2 = _createSLARegistration(jobId, 
AppType.WORKFLOW_JOB);
+
+        addAndUpdateRegistration(slaCalcMemory, jobId, slaRegBean, 
slaRegBean2);
+
+        Assert.assertNotNull(slaCalcMemory.get(jobId));
+        Assert.assertEquals("The updated SLA registration bean should be in 
the cache",
+                slaRegBean2, 
slaCalcMemory.get(jobId).getSLARegistrationBean());
+    }
+
+    public void 
testWhenSLARegistrationIsUpdatedAndAllDBCallsAreDisruptedBeanIsNotStored() 
throws Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        final String jobId = "job-1-W";
+        SLARegistrationBean slaRegBean = _createSLARegistration(jobId, 
AppType.WORKFLOW_JOB);
+        SLARegistrationBean slaRegBean2 = _createSLARegistration(jobId, 
AppType.WORKFLOW_JOB);
+        final int expectedDuration = 1000;
+        slaRegBean2.setExpectedDuration(expectedDuration);
+
+        try {
+            addAndUpdateRegistrationWithDBCrushSimulation(slaCalcMemory, 
jobId, slaRegBean, slaRegBean2);
+            fail("Expected JPAExecutorException not thrown");
+        } catch (JPAExecutorException ex) {
+            Assert.assertNotNull(slaCalcMemory.get(slaRegBean.getId()));
+            // the update failed
+            Assert.assertEquals(slaRegBean, 
slaCalcMemory.get(jobId).getSLARegistrationBean());
+        } finally {
+            FailingDBHelperForTest.resetDbPredicate();
+            
System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER);
+        }
+
+    }
+
+    private void addAndUpdateRegistration(final SLACalculatorMemory 
slaCalcMemory, final String jobId,
+                                          final SLARegistrationBean 
slaRegBean, final SLARegistrationBean slaRegBean2)
+            throws JPAExecutorException {
+        slaCalcMemory.addRegistration(jobId, slaRegBean);
+        slaCalcMemory.updateRegistration(jobId, slaRegBean2);
+    }
+
+    private void addAndUpdateRegistrationWithDBCrushSimulation(final 
SLACalculatorMemory slaCalcMemory, final String jobId,
+                                                               final 
SLARegistrationBean slaRegBean,
+                                                               final 
SLARegistrationBean slaRegBean2) throws Exception {
+        slaCalcMemory.addRegistration(jobId, slaRegBean);
+        FailingDBHelperForTest.setDbPredicate(new 
SLARegistrationDmlPredicate());
+        prepareFailingDB();
+        slaCalcMemory.updateRegistration(jobId, slaRegBean2);
+    }
+
+    private void prepareFailingDB() throws Exception {
+        System.setProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER, 
Boolean.TRUE.toString());
+        Configuration conf = 
services.get(ConfigurationService.class).getConf();
+        conf.set(JPAService.CONF_DRIVER, 
AlwaysFailingHSQLDriverMapper.class.getCanonicalName());
+        conf.setInt(JPAService.MAX_RETRY_COUNT, 2);
+        jpaService.destroy();
+        jpaService.init(services);
+    }
+
+    static class SLARegistrationDmlPredicate implements 
com.google.common.base.Predicate<String> {
+        private static final String TABLE_NAME = "SLA_REGISTRATION";
+        private static final Set<String> OPERATIONS = Sets.newHashSet("INSERT 
INTO ", "UPDATE ");
+
+        @Override
+        public boolean apply(@Nullable String input) {
+            Preconditions.checkArgument(!Strings.isNullOrEmpty(input));
+            boolean operationMatch = false;
+            for (String s: OPERATIONS) {
+                if (input.startsWith(s)) {
+                    operationMatch = true;
+                    break;
+                }
+            }
+            return operationMatch && input.toUpperCase().contains(TABLE_NAME);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a4732f9e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 9a4947c..0d79c35 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,7 +1,11 @@
--- Oozie 5.0.0 release (trunk - unreleased)
+-- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3134 Potential inconsistency between the in-memory SLA map and the Oozie 
database (kmarton via andras.piros)
 OOZIE-3209 XML schema error when submitting pyspark example (kmarton via 
andras.piros)
 OOZIE-2937 Remove redundant groupId from the child POMs (Jan Hentschel via 
andras.piros)
+
+-- Oozie 5.0.0 release
+
 OOZIE-3176 Oozie-core fails with checkstyle errors (alishap via andras.piros)
 OOZIE-2726 Flaky test due to daylight saving changes (sasishsaley, 
andras.piros via gezapeti)
 OOZIE-2645 Deprecate Instrumentation in favor of Metrics (andras.piros via 
gezapeti)

Reply via email to