OOZIE-2854 Oozie should handle transient database problems (andras.piros via gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a54f7c20 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a54f7c20 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a54f7c20 Branch: refs/heads/master Commit: a54f7c20d7c5d1d72f195d6d92001edd147bc304 Parents: a436d98 Author: Gezapeti Cseh <[email protected]> Authored: Wed Jul 12 09:16:10 2017 +0200 Committer: Gezapeti Cseh <[email protected]> Committed: Wed Jul 12 09:16:25 2017 +0200 ---------------------------------------------------------------------- core/pom.xml | 6 + .../oozie/command/SkipCommitFaultInjection.java | 41 ++ .../executor/jpa/JsonBeanPersisterExecutor.java | 42 +++ .../oozie/executor/jpa/QueryExecutor.java | 25 +- .../org/apache/oozie/service/JPAService.java | 371 +++++++++++++------ .../org/apache/oozie/sla/SLASummaryBean.java | 5 +- .../org/apache/oozie/store/WorkflowStore.java | 1 - .../oozie/util/db/BasicDataSourceWrapper.java | 123 ++++++ .../oozie/util/db/DatabaseRetryPredicate.java | 51 +++ .../oozie/util/db/FailingConnectionWrapper.java | 371 +++++++++++++++++++ .../util/db/FailingHSQLDBDriverWrapper.java | 37 ++ .../util/db/FailingMySQLDriverWrapper.java | 51 +++ .../oozie/util/db/OperationRetryHandler.java | 130 +++++++ ...ceExceptionSubclassFilterRetryPredicate.java | 73 ++++ .../apache/oozie/util/db/RetryAttemptState.java | 84 +++++ .../oozie/util/db/RuntimeExceptionInjector.java | 77 ++++ .../src/main/resources/META-INF/persistence.xml | 12 +- core/src/main/resources/oozie-default.xml | 33 +- core/src/main/resources/oozie-log4j.properties | 1 + .../oozie/command/SkipCommitFaultInjection.java | 41 -- .../java/org/apache/oozie/test/XTestCase.java | 106 +++--- .../util/db/TestOozieDmlStatementPredicate.java | 63 ++++ .../util/db/TestOperationRetryHandler.java | 218 +++++++++++ ...ceExceptionSubclassFilterRetryPredicate.java | 98 +++++ .../oozie/util/db/TestRetryAttemptState.java | 132 +++++++ docs/src/site/twiki/AG_Install.twiki | 31 ++ minitest/pom.xml | 4 +- .../test/TestParallelJPAOperationRetries.java | 144 +++++++ .../org/apache/oozie/test/TestWorkflow.java | 215 +++++++++++ .../apache/oozie/test/TestWorkflowRetries.java | 34 ++ .../org/apache/oozie/test/WorkflowTest.java | 188 ---------- minitest/src/test/resources/fs-decision.xml | 51 +++ .../src/test/resources/hsqldb-oozie-site.xml | 6 +- .../src/test/resources/oozie-log4j.properties | 1 + .../test/resources/parallel-fs-and-shell.xml | 73 ++++ minitest/src/test/resources/wf-test.xml | 51 --- pom.xml | 2 +- release-log.txt | 1 + 38 files changed, 2492 insertions(+), 501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index acddf34..7275775 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -280,6 +280,12 @@ </dependency> <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <scope>compile</scope> http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java b/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java new file mode 100644 index 0000000..158fbfa --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command; + +import org.apache.oozie.FaultInjection; + +public class SkipCommitFaultInjection extends FaultInjection { + + public static final String ACTION_FAILOVER_FAULT_INJECTION = "oozie.fault.injection.action.failover"; + + private static boolean ACTIVE = false; + + public boolean activate() { + ACTIVE = Boolean.parseBoolean(System.getProperty(ACTION_FAILOVER_FAULT_INJECTION, "false")); + return ACTIVE; + } + + public void deactivate() { + ACTIVE = false; + } + + public boolean isActive() { + return ACTIVE; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java new file mode 100644 index 0000000..217e442 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import javax.persistence.EntityManager; + +import org.apache.oozie.client.rest.JsonBean; + +public class JsonBeanPersisterExecutor implements JPAExecutor<Void> { + private final JsonBean bean; + + public JsonBeanPersisterExecutor(JsonBean bean) { + this.bean = bean; + } + + @Override + public String getName() { + return "JsonBeanPersisterExecutor"; + } + + @Override + public Void execute(EntityManager em) throws JPAExecutorException { + em.persist(bean); + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java index 8d94c23..dfafea0 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java @@ -21,47 +21,26 @@ package org.apache.oozie.executor.jpa; import java.util.List; import javax.persistence.EntityManager; -import javax.persistence.PersistenceException; import javax.persistence.Query; -import org.apache.oozie.ErrorCode; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; -import org.apache.oozie.util.XLog; /** * Base Class of Query Executor */ public abstract class QueryExecutor<T, E extends Enum<E>> { - private static XLog LOG; protected QueryExecutor() { } public abstract int executeUpdate(E namedQuery, T jobBean) throws JPAExecutorException; - public void insert(JsonBean bean) throws JPAExecutorException { + public void insert(final JsonBean bean) throws JPAExecutorException { if (bean != null) { JPAService jpaService = Services.get().get(JPAService.class); - EntityManager em = jpaService.getEntityManager(); - try { - em.getTransaction().begin(); - em.persist(bean); - em.getTransaction().commit(); - } - catch (PersistenceException e) { - throw new JPAExecutorException(ErrorCode.E0603, e); - } - finally { - if (em.getTransaction().isActive()) { - LOG.warn("insert ended with an active transaction, rolling back"); - em.getTransaction().rollback(); - } - if (em.isOpen()) { - em.close(); - } - } + jpaService.execute(new JsonBeanPersisterExecutor(bean)); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/service/JPAService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/JPAService.java b/core/src/main/java/org/apache/oozie/service/JPAService.java index 028381d..93fe9da 100644 --- a/core/src/main/java/org/apache/oozie/service/JPAService.java +++ b/core/src/main/java/org/apache/oozie/service/JPAService.java @@ -23,14 +23,16 @@ import java.text.MessageFormat; import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.concurrent.Callable; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; +import javax.persistence.EntityTransaction; import javax.persistence.NoResultException; import javax.persistence.Persistence; -import javax.persistence.PersistenceException; import javax.persistence.Query; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -45,6 +47,7 @@ import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.rest.JsonSLAEvent; +import org.apache.oozie.command.SkipCommitFaultInjection; import org.apache.oozie.compression.CodecFactory; import org.apache.oozie.executor.jpa.JPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; @@ -54,7 +57,10 @@ import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.XLog; +import org.apache.oozie.util.db.OperationRetryHandler; +import org.apache.oozie.util.db.PersistenceExceptionSubclassFilterRetryPredicate; import org.apache.openjpa.lib.jdbc.DecoratingDataSource; +import org.apache.openjpa.persistence.InvalidStateException; import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; /** @@ -64,6 +70,10 @@ import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; public class JPAService implements Service, Instrumentable { private static final String INSTRUMENTATION_GROUP_JPA = "jpa"; + public static final long DEFAULT_INITIAL_WAIT_TIME = 100; + public static final long DEFAULT_MAX_WAIT_TIME = 30_000; + public static final int DEFAULT_MAX_RETRY_COUNT = 1; + public static final String CONF_DB_SCHEMA = "oozie.db.schema.name"; public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService."; @@ -79,13 +89,16 @@ public class JPAService implements Service, Instrumentable { public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval"; public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num"; public static final String CONF_OPENJPA_BROKER_IMPL = CONF_PREFIX + "openjpa.BrokerImpl"; - - + public static final String INITIAL_WAIT_TIME = CONF_PREFIX + "retry.initial-wait-time.ms"; + public static final String MAX_WAIT_TIME = CONF_PREFIX + "maximum-wait-time.ms"; + public static final String MAX_RETRY_COUNT = CONF_PREFIX + "retry.max-retries"; + public static final String SKIP_COMMIT_FAULT_INJECTION_CLASS = SkipCommitFaultInjection.class.getName(); private EntityManagerFactory factory; private Instrumentation instr; private static XLog LOG; + private OperationRetryHandler retryHandler; /** * Return the public interface of the service. @@ -97,7 +110,7 @@ public class JPAService implements Service, Instrumentable { } @Override - public void instrument(Instrumentation instr) { + public void instrument(final Instrumentation instr) { this.instr = instr; final BasicDataSource dataSource = getBasicDataSource(); @@ -121,10 +134,10 @@ public class JPAService implements Service, Instrumentable { // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource // It might also not be a BasicDataSource if the user configured something different BasicDataSource basicDataSource = null; - OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; - Object connectionFactory = spi.getConfiguration().getConnectionFactory(); + final OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; + final Object connectionFactory = spi.getConfiguration().getConnectionFactory(); if (connectionFactory instanceof DecoratingDataSource) { - DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory; + final DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory; basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate(); } else if (connectionFactory instanceof BasicDataSource) { basicDataSource = (BasicDataSource) connectionFactory; @@ -137,22 +150,22 @@ public class JPAService implements Service, Instrumentable { * * @param services services instance. */ - public void init(Services services) throws ServiceException { + public void init(final Services services) throws ServiceException { LOG = XLog.getLog(JPAService.class); - Configuration conf = services.getConf(); - String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA); + final Configuration conf = services.getConf(); + final String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA); String url = ConfigurationService.get(conf, CONF_URL); - String driver = ConfigurationService.get(conf, CONF_DRIVER); - String user = ConfigurationService.get(conf, CONF_USERNAME); - String password = ConfigurationService.getPassword(conf, CONF_PASSWORD).trim(); - String maxConn = ConfigurationService.get(conf, CONF_MAX_ACTIVE_CONN).trim(); - String dataSource = ConfigurationService.get(conf, CONF_CONN_DATA_SOURCE); - String connPropsConfig = ConfigurationService.get(conf, CONF_CONN_PROPERTIES); - String brokerImplConfig = ConfigurationService.get(conf, CONF_OPENJPA_BROKER_IMPL); - boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA); - boolean validateDbConn = ConfigurationService.getBoolean(conf, CONF_VALIDATE_DB_CONN); - String evictionInterval = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim(); - String evictionNum = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim(); + final String driver = ConfigurationService.get(conf, CONF_DRIVER); + final String user = ConfigurationService.get(conf, CONF_USERNAME); + final String password = ConfigurationService.getPassword(conf, CONF_PASSWORD).trim(); + final String maxConn = ConfigurationService.get(conf, CONF_MAX_ACTIVE_CONN).trim(); + final String dataSource = ConfigurationService.get(conf, CONF_CONN_DATA_SOURCE); + final String connPropsConfig = ConfigurationService.get(conf, CONF_CONN_PROPERTIES); + final String brokerImplConfig = ConfigurationService.get(conf, CONF_OPENJPA_BROKER_IMPL); + final boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA); + final boolean validateDbConn = ConfigurationService.getBoolean(conf, CONF_VALIDATE_DB_CONN); + final String evictionInterval = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim(); + final String evictionNum = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim(); if (!url.startsWith("jdbc:")) { throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'"); @@ -163,14 +176,14 @@ public class JPAService implements Service, Instrumentable { } dbType = dbType.substring(0, dbType.indexOf(":")); - String persistentUnit = "oozie-" + dbType; + final String persistentUnit = "oozie-" + dbType; // Checking existince of ORM file for DB type - String ormFile = "META-INF/" + persistentUnit + "-orm.xml"; + final String ormFile = "META-INF/" + persistentUnit + "-orm.xml"; try { IOUtils.getResourceAsStream(ormFile, -1); } - catch (IOException ex) { + catch (final IOException ex) { throw new ServiceException(ErrorCode.E0609, dbType, ormFile); } @@ -182,7 +195,7 @@ public class JPAService implements Service, Instrumentable { String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); - Properties props = new Properties(); + final Properties props = new Properties(); if (autoSchemaCreation) { connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); @@ -190,8 +203,8 @@ public class JPAService implements Service, Instrumentable { else if (validateDbConn) { // validation can be done only if the schema already exist, else a // connection cannot be obtained to create the schema. - String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; - String num = "numTestsPerEvictionRun=" + evictionNum; + final String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; + final String num = "numTestsPerEvictionRun=" + evictionNum; connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN"; connProps = MessageFormat.format(connProps, dbSchema); @@ -210,36 +223,76 @@ public class JPAService implements Service, Instrumentable { LOG.info("Setting openjpa.BrokerImpl to {0}", brokerImplConfig); } + initRetryHandler(); + factory = Persistence.createEntityManagerFactory(persistentUnit, props); - EntityManager entityManager = getEntityManager(); - entityManager.find(WorkflowActionBean.class, 1); - entityManager.find(WorkflowJobBean.class, 1); - entityManager.find(CoordinatorActionBean.class, 1); - entityManager.find(CoordinatorJobBean.class, 1); - entityManager.find(SLAEventBean.class, 1); - entityManager.find(JsonSLAEvent.class, 1); - entityManager.find(BundleJobBean.class, 1); - entityManager.find(BundleActionBean.class, 1); - entityManager.find(SLARegistrationBean.class, 1); - entityManager.find(SLASummaryBean.class, 1); + final EntityManager entityManager = getEntityManager(); + findRetrying(entityManager, WorkflowActionBean.class, 1); + findRetrying(entityManager, WorkflowJobBean.class, 1); + findRetrying(entityManager, CoordinatorActionBean.class, 1); + findRetrying(entityManager, CoordinatorJobBean.class, 1); + findRetrying(entityManager, SLAEventBean.class, 1); + findRetrying(entityManager, JsonSLAEvent.class, 1); + findRetrying(entityManager, BundleActionBean.class, 1); + findRetrying(entityManager, BundleJobBean.class, 1); + findRetrying(entityManager, SLARegistrationBean.class, 1); + findRetrying(entityManager, SLASummaryBean.class, 1); LOG.info(XLog.STD, "All entities initialized"); // need to use a pseudo no-op transaction so all entities, datasource // and connection pool are initialized one time only entityManager.getTransaction().begin(); - OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; + final OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; // Mask the password with '***' - String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); + final String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); LOG.info("JPA configuration: {0}", logMsg); entityManager.getTransaction().commit(); entityManager.close(); try { CodecFactory.initialize(conf); } - catch (Exception ex) { + catch (final Exception ex) { throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex); } + + } + + private void initRetryHandler() { + final long initialWaitTime = ConfigurationService.getInt(INITIAL_WAIT_TIME, (int) DEFAULT_INITIAL_WAIT_TIME); + final long maxWaitTime = ConfigurationService.getInt(MAX_WAIT_TIME, (int) DEFAULT_MAX_WAIT_TIME); + final int maxRetryCount = ConfigurationService.getInt(MAX_RETRY_COUNT, DEFAULT_MAX_RETRY_COUNT); + + LOG.info(XLog.STD, "Failing database operations will be retried {0} times, with an initial sleep time of {1} ms," + + "max sleep time {2} ms", maxRetryCount, initialWaitTime, maxWaitTime); + retryHandler = new OperationRetryHandler(maxRetryCount, + initialWaitTime, + maxWaitTime, + new PersistenceExceptionSubclassFilterRetryPredicate()); + } + + private void findRetrying(final EntityManager entityManager, final Class entityClass, final int primaryKey) + throws ServiceException { + try { + retryHandler.executeWithRetry(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!entityManager.getTransaction().isActive()) { + entityManager.getTransaction().begin(); + } + + entityManager.find(entityClass, primaryKey); + + if (entityManager.getTransaction().isActive()) { + entityManager.getTransaction().commit(); + } + return null; + } + }); + } + catch (final Exception e) { + throw new ServiceException(ErrorCode.E0603, e); + } } /** @@ -247,7 +300,12 @@ public class JPAService implements Service, Instrumentable { */ public void destroy() { if (factory != null && factory.isOpen()) { - factory.close(); + try { + factory.close(); + } + catch (final InvalidStateException ise) { + LOG.warn("Cannot close EntityManagerFactory. [ise.message={0}]", ise.getMessage()); + } } } @@ -258,28 +316,33 @@ public class JPAService implements Service, Instrumentable { * @return return value of the JPAExecutor. * @throws JPAExecutorException thrown if an jpa executor failed */ - public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException { - EntityManager em = getEntityManager(); - Instrumentation.Cron cron = new Instrumentation.Cron(); + public <T> T execute(final JPAExecutor<T> executor) throws JPAExecutorException { + final EntityManager em = getEntityManager(); + final Instrumentation.Cron cron = new Instrumentation.Cron(); try { LOG.trace("Executing JPAExecutor [{0}]", executor.getName()); if (instr != null) { instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1); } cron.start(); - em.getTransaction().begin(); - T t = executor.execute(em); - if (em.getTransaction().isActive()) { - if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { - throw new RuntimeException("Skipping Commit for Failover Testing"); - } - em.getTransaction().commit(); - } - return t; + return retryHandler.executeWithRetry(new Callable<T>() { + @Override + public T call() throws Exception { + if (!em.getTransaction().isActive()) { + em.getTransaction().begin(); + } + + final T t = executor.execute(em); + + checkAndCommit(em.getTransaction()); + + return t; + } + }); } - catch (PersistenceException e) { - throw new JPAExecutorException(ErrorCode.E0603, e); + catch (final Exception e) { + throw getTargetException(e); } finally { cron.stop(); @@ -292,7 +355,7 @@ public class JPAService implements Service, Instrumentable { em.getTransaction().rollback(); } } - catch (Exception ex) { + catch (final Exception ex) { LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex .getMessage(), ex); } @@ -304,13 +367,23 @@ public class JPAService implements Service, Instrumentable { LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName()); } } - catch (Exception ex) { + catch (final Exception ex) { LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex .getMessage(), ex); } } } + private void checkAndCommit(final EntityTransaction tx) { + if (tx.isActive()) { + if (FaultInjection.isActive(SKIP_COMMIT_FAULT_INJECTION_CLASS)) { + throw new RuntimeException("Skipping Commit for Failover Testing"); + } + + tx.commit(); + } + } + /** * Execute an UPDATE query * @param namedQueryName the name of query to be executed @@ -319,8 +392,8 @@ public class JPAService implements Service, Instrumentable { * @return Integer that query returns, which corresponds to the number of rows updated * @throws JPAExecutorException */ - public int executeUpdate(String namedQueryName, Query query, EntityManager em) throws JPAExecutorException { - Instrumentation.Cron cron = new Instrumentation.Cron(); + public int executeUpdate(final String namedQueryName, final Query query, final EntityManager em) throws JPAExecutorException { + final Instrumentation.Cron cron = new Instrumentation.Cron(); try { LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName); @@ -328,18 +401,23 @@ public class JPAService implements Service, Instrumentable { instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); } cron.start(); - em.getTransaction().begin(); - int ret = query.executeUpdate(); - if (em.getTransaction().isActive()) { - if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { - throw new RuntimeException("Skipping Commit for Failover Testing"); + + return retryHandler.executeWithRetry(new Callable<Integer>() { + @Override + public Integer call() throws Exception { + if (!em.getTransaction().isActive()) { + em.getTransaction().begin(); + } + final int ret = query.executeUpdate(); + + checkAndCommit(em.getTransaction()); + + return ret; } - em.getTransaction().commit(); - } - return ret; + }); } - catch (PersistenceException e) { - throw new JPAExecutorException(ErrorCode.E0603, e); + catch (final Exception e) { + throw getTargetException(e); } finally { processFinally(em, cron, namedQueryName, true); @@ -350,7 +428,7 @@ public class JPAService implements Service, Instrumentable { E namedQuery; Query query; - public QueryEntry(E namedQuery, Query query) { + public QueryEntry(final E namedQuery, final Query query) { this.namedQuery = namedQuery; this.query = query; } @@ -364,7 +442,10 @@ public class JPAService implements Service, Instrumentable { } } - private void processFinally(EntityManager em, Instrumentation.Cron cron, String name, boolean checkActive) { + private void processFinally(final EntityManager em, + final Instrumentation.Cron cron, + final String name, + final boolean checkActive) { cron.stop(); if (instr != null) { instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron); @@ -376,7 +457,7 @@ public class JPAService implements Service, Instrumentable { em.getTransaction().rollback(); } } - catch (Exception ex) { + catch (final Exception ex) { LOG.warn("Could not check/rollback transaction after [{0}], {1}", name, ex.getMessage(), ex); } @@ -389,7 +470,7 @@ public class JPAService implements Service, Instrumentable { LOG.warn("[{0}] closed the EntityManager, it should not!", name); } } - catch (Exception ex) { + catch (final Exception ex) { LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex); } } @@ -402,41 +483,57 @@ public class JPAService implements Service, Instrumentable { * @param em Entity Manager * @throws JPAExecutorException */ - public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertBeans, List<QueryEntry> updateQueryList, - Collection<JsonBean> deleteBeans, EntityManager em) throws JPAExecutorException { - Instrumentation.Cron cron = new Instrumentation.Cron(); + public void executeBatchInsertUpdateDelete(final Collection<JsonBean> insertBeans, final List<QueryEntry> updateQueryList, + final Collection<JsonBean> deleteBeans, final EntityManager em) throws JPAExecutorException { + final Instrumentation.Cron cron = new Instrumentation.Cron(); try { LOG.trace("Executing Queries in Batch"); cron.start(); - em.getTransaction().begin(); - if (updateQueryList != null && updateQueryList.size() > 0) { - for (QueryEntry q : updateQueryList) { - if (instr != null) { - instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1); + + retryHandler.executeWithRetry(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (em.getTransaction().isActive()) { + try { + em.getTransaction().rollback(); + } + catch (final Exception e) { + LOG.warn("Rollback failed - ignoring"); + } } - q.getQuery().executeUpdate(); - } - } - if (insertBeans != null && insertBeans.size() > 0) { - for (JsonBean bean : insertBeans) { - em.persist(bean); - } - } - if (deleteBeans != null && deleteBeans.size() > 0) { - for (JsonBean bean : deleteBeans) { - em.remove(em.merge(bean)); - } - } - if (em.getTransaction().isActive()) { - if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { - throw new RuntimeException("Skipping Commit for Failover Testing"); + + em.getTransaction().begin(); + + if (CollectionUtils.isNotEmpty(updateQueryList)) { + for (final QueryEntry q : updateQueryList) { + if (instr != null) { + instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1); + } + q.getQuery().executeUpdate(); + } + } + + if (CollectionUtils.isNotEmpty(insertBeans)) { + for (final JsonBean bean : insertBeans) { + em.persist(bean); + } + } + + if (CollectionUtils.isNotEmpty(deleteBeans)) { + for (final JsonBean bean : deleteBeans) { + em.remove(em.merge(bean)); + } + } + + checkAndCommit(em.getTransaction()); + + return null; } - em.getTransaction().commit(); - } + }); } - catch (PersistenceException e) { - throw new JPAExecutorException(ErrorCode.E0603, e); + catch (final Exception e) { + throw getTargetException(e); } finally { processFinally(em, cron, "batchqueryexecutor", true); @@ -450,24 +547,33 @@ public class JPAService implements Service, Instrumentable { * @param em Entity Manager * @return object that matches the query */ - public Object executeGet(String namedQueryName, Query query, EntityManager em) { - Instrumentation.Cron cron = new Instrumentation.Cron(); + public Object executeGet(final String namedQueryName, final Query query, final EntityManager em) throws JPAExecutorException { + final Instrumentation.Cron cron = new Instrumentation.Cron(); try { - LOG.trace("Executing Select Query to Get a Single row [{0}]", namedQueryName); if (instr != null) { instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); } cron.start(); - Object obj = null; - try { - obj = query.getSingleResult(); - } - catch (NoResultException e) { - // return null when no matched result - } - return obj; + + return retryHandler.executeWithRetry(new Callable<Object>() { + @Override + public Object call() throws Exception { + Object obj = null; + try { + obj = query.getSingleResult(); + } + catch (final NoResultException e) { + LOG.info("No results found"); + // return null when no matched result + } + return obj; + } + }); + } + catch (final Exception e) { + throw getTargetException(e); } finally { processFinally(em, cron, namedQueryName, false); @@ -481,8 +587,9 @@ public class JPAService implements Service, Instrumentable { * @param em Entity Manager * @return list containing results that match the query */ - public List<?> executeGetList(String namedQueryName, Query query, EntityManager em) { - Instrumentation.Cron cron = new Instrumentation.Cron(); + public List<?> executeGetList(final String namedQueryName, final Query query, final EntityManager em) + throws JPAExecutorException { + final Instrumentation.Cron cron = new Instrumentation.Cron(); try { LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName); @@ -491,14 +598,24 @@ public class JPAService implements Service, Instrumentable { } cron.start(); - List<?> resultList = null; - try { - resultList = query.getResultList(); - } - catch (NoResultException e) { - // return null when no matched result - } - return resultList; + + return retryHandler.executeWithRetry(new Callable<List<?>>() { + @Override + public List<?> call() throws Exception { + List<?> resultList = null; + try { + resultList = query.getResultList(); + } + catch (final NoResultException e) { + LOG.info("No results found"); + // return null when no matched result + } + return resultList; + } + }); + } + catch (final Exception e) { + throw getTargetException(e); } finally { processFinally(em, cron, namedQueryName, false); @@ -514,4 +631,12 @@ public class JPAService implements Service, Instrumentable { return factory.createEntityManager(); } + private JPAExecutorException getTargetException(final Exception e) { + if (e instanceof JPAExecutorException) { + return (JPAExecutorException) e; + } + else { + return new JPAExecutorException(ErrorCode.E0603, e.getMessage()); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java index cfe1522..e9ea9ba 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java +++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java @@ -66,7 +66,10 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select w.eventProcessed from SLASummaryBean w where w.jobId = :id"), - @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED", query = "select w.eventProcessed, w.lastModifiedTS from SLASummaryBean w where w.jobId = :id") + @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED", + query = "select w.eventProcessed, w.lastModifiedTS from SLASummaryBean w where w.jobId = :id"), + + @NamedQuery(name = "GET_SLA_SUMMARY_ALL", query = "select OBJECT(w) from SLASummaryBean w") }) http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/store/WorkflowStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/store/WorkflowStore.java b/core/src/main/java/org/apache/oozie/store/WorkflowStore.java index c565e74..821abc5 100644 --- a/core/src/main/java/org/apache/oozie/store/WorkflowStore.java +++ b/core/src/main/java/org/apache/oozie/store/WorkflowStore.java @@ -22,7 +22,6 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java b/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java new file mode 100644 index 0000000..c972ea2 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.commons.dbcp.ConnectionFactory; +import org.apache.commons.dbcp.DriverConnectionFactory; +import org.apache.commons.dbcp.SQLNestedException; + +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class BasicDataSourceWrapper extends BasicDataSource { + + /** + * Fixing a bug within {@link BasicDataSource#createConnectionFactory()} for {@code driverClassName} to have real effect. + * <p/> + * Because we use currently Apache Commons DBCP 1.4.0 that has a bug not considering {@code driverClassName}, thus, we're unable + * to create a JDBC driver using a user-provided driver class name (we try to do that by setting explicitly a value for + * {@code openJpa.connectionProperties="DriverClassName=..."}), unless we perform the exact same fix that is applied by the DBCP + * patch. + * <p/> + * Note: when DBCP 1.4.1 will be released, and Oozie will update to 1.4.1, we can remove this class. + * <p/> + * Please see {@link https://issues.apache.org/jira/browse/DBCP-333 the DBCP bug} and + * {@link https://github.com/apache/commons-dbcp/blob/DBCP_1_4_x_BRANCH/src/java/org/apache/commons/dbcp/BasicDataSource.java + * #L1588-L1660 the fixed method} + * for details. + * <p/> + * Please also see how OpenJPA + * {@linkplain http://openjpa.apache.org/builds/2.2.1/apache-openjpa/docs/ref_guide_integration_dbcp.html is integrated} + * with DBCP. + */ + protected ConnectionFactory createConnectionFactory() throws SQLException { + // Load the JDBC driver class + Class driverFromCCL = null; + if (driverClassName != null) { + try { + try { + if (driverClassLoader == null) { + driverFromCCL = Class.forName(driverClassName); + } else { + driverFromCCL = Class.forName(driverClassName, true, driverClassLoader); + } + } catch (ClassNotFoundException cnfe) { + driverFromCCL = Thread.currentThread( + ).getContextClassLoader().loadClass( + driverClassName); + } + } catch (Throwable t) { + String message = "Cannot load JDBC driver class '" + + driverClassName + "'"; + logWriter.println(message); + t.printStackTrace(logWriter); + throw new SQLNestedException(message, t); + } + } + + // Create a JDBC driver instance + Driver driver = null; + try { + if (driverFromCCL == null) { + driver = DriverManager.getDriver(url); + } else { + // Usage of DriverManager is not possible, as it does not + // respect the ContextClassLoader + driver = (Driver) driverFromCCL.newInstance(); + if (!driver.acceptsURL(url)) { + throw new SQLException("No suitable driver", "08001"); + } + } + } catch (Throwable t) { + String message = "Cannot create JDBC driver of class '" + + (driverClassName != null ? driverClassName : "") + + "' for connect URL '" + url + "'"; + logWriter.println(message); + t.printStackTrace(logWriter); + throw new SQLNestedException(message, t); + } + + // Can't test without a validationQuery + if (validationQuery == null) { + setTestOnBorrow(false); + setTestOnReturn(false); + setTestWhileIdle(false); + } + + // Set up the driver connection factory we will use + String user = username; + if (user != null) { + connectionProperties.put("user", user); + } else { + log("DBCP DataSource configured without a 'username'"); + } + + String pwd = password; + if (pwd != null) { + connectionProperties.put("password", pwd); + } else { + log("DBCP DataSource configured without a 'password'"); + } + + ConnectionFactory driverConnectionFactory = new DriverConnectionFactory(driver, url, connectionProperties); + return driverConnectionFactory; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java b/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java new file mode 100644 index 0000000..c89aabe --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import java.util.HashSet; +import java.util.Set; + +import com.google.common.base.Predicate; + +public abstract class DatabaseRetryPredicate implements Predicate<Throwable> { + + @Override + public abstract boolean apply(Throwable input); + + /* + * Helper method for subclasses to retrieve all exceptions in a set. "All exceptions" means the exception + * hierarchy that can be walked by calling getCause() repeatedly. + * + * Subclasses either check if a particular exception was raised or not, or need the SQLException to extract the + * error code. In both cases the exception has to be found. + */ + protected Set<Class<?>> getAllExceptions(final Throwable t) { + final Set<Class<?>> exceptions = new HashSet<>(); + + exceptions.add(t.getClass()); + + Throwable ex = t; + while (ex.getCause() != null) { + exceptions.add(ex.getCause().getClass()); + ex = ex.getCause(); + } + + return exceptions; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java new file mode 100644 index 0000000..0e31025 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Sets; +import org.apache.directory.api.util.Strings; +import org.apache.oozie.util.XLog; + +import javax.annotation.Nullable; +import javax.persistence.PersistenceException; +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Executor; + +public class FailingConnectionWrapper implements Connection { + private static final XLog LOG = XLog.getLog(FailingConnectionWrapper.class); + + private final Connection delegate; + private static final RuntimeExceptionInjector<PersistenceException> injector = + new RuntimeExceptionInjector<>(PersistenceException.class, 5); + private static final OozieDmlStatementPredicate oozieDmlStatementPredicate = + new OozieDmlStatementPredicate(); + + public FailingConnectionWrapper(final Connection delegate) throws SQLException { + this.delegate = delegate; + } + + @Override + public Statement createStatement() throws SQLException { + return delegate.createStatement(); + } + + @Override + public PreparedStatement prepareStatement(final String sql) throws SQLException { + return delegate.prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(final String sql) throws SQLException { + return delegate.prepareCall(sql); + } + + @Override + public String nativeSQL(final String sql) throws SQLException { + return delegate.nativeSQL(sql); + } + + @Override + public void setAutoCommit(final boolean autoCommit) throws SQLException { + delegate.setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return delegate.getAutoCommit(); + } + + @Override + public void commit() throws SQLException { + delegate.commit(); + } + + @Override + public void rollback() throws SQLException { + delegate.rollback(); + } + + @Override + public void close() throws SQLException { + delegate.close(); + } + + @Override + public boolean isClosed() throws SQLException { + return delegate.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return delegate.getMetaData(); + } + + @Override + public void setReadOnly(final boolean readOnly) throws SQLException { + delegate.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return delegate.isReadOnly(); + } + + @Override + public void setCatalog(final String catalog) throws SQLException { + delegate.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return delegate.getCatalog(); + } + + @Override + public void setTransactionIsolation(final int level) throws SQLException { + delegate.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return delegate.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return delegate.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + delegate.clearWarnings(); + } + + @Override + public Statement createStatement(final int resultSetType, final int resultSetConcurrency) throws SQLException { + return delegate.createStatement(resultSetType, resultSetConcurrency); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) + throws SQLException { + if (oozieDmlStatementPredicate.apply(sql)) { + LOG.trace("Injecting random failure. It's a DML statement of an Oozie table, preparing this statement might fail."); + injector.inject(String.format("Deliberately failing to prepare statement. [sql=%s]", sql)); + } + + LOG.trace("Preparing statement. [sql={0}]", sql); + return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(final String sql, final int resultSetType, final int resultSetConcurrency) + throws SQLException { + return delegate.prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return delegate.getTypeMap(); + } + + @Override + public void setTypeMap(final Map<String, Class<?>> map) throws SQLException { + delegate.setTypeMap(map); + } + + @Override + public void setHoldability(final int holdability) throws SQLException { + delegate.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return delegate.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return delegate.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(final String name) throws SQLException { + return delegate.setSavepoint(name); + } + + @Override + public void rollback(final Savepoint savepoint) throws SQLException { + delegate.rollback(); + } + + @Override + public void releaseSavepoint(final Savepoint savepoint) throws SQLException { + delegate.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) + throws SQLException { + return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, + final int resultSetHoldability) throws SQLException { + return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CallableStatement prepareCall(final String sql, final int resultSetType, final int resultSetConcurrency, + final int resultSetHoldability) throws SQLException { + return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { + return delegate.prepareStatement(sql, autoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException { + return delegate.prepareStatement(sql, columnIndexes); + } + + @Override + public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException { + return delegate.prepareStatement(sql, columnNames); + } + + @Override + public Clob createClob() throws SQLException { + return delegate.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return delegate.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return delegate.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return delegate.createSQLXML(); + } + + @Override + public boolean isValid(final int timeout) throws SQLException { + return delegate.isValid(timeout); + } + + @Override + public void setClientInfo(final String name, final String value) throws SQLClientInfoException { + delegate.setClientInfo(name, value); + } + + @Override + public void setClientInfo(final Properties properties) throws SQLClientInfoException { + delegate.setClientInfo(properties); + } + + @Override + public String getClientInfo(final String name) throws SQLException { + return delegate.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return delegate.getClientInfo(); + } + + @Override + public Array createArrayOf(final String typeName, final Object[] elements) throws SQLException { + return delegate.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(final String typeName, final Object[] attributes) throws SQLException { + return delegate.createStruct(typeName, attributes); + } + + @Override + public void setSchema(final String schema) throws SQLException { + delegate.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return delegate.getSchema(); + } + + @Override + public void abort(final Executor executor) throws SQLException { + delegate.abort(executor); + } + + @Override + public void setNetworkTimeout(final Executor executor, final int milliseconds) throws SQLException { + delegate.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return delegate.getNetworkTimeout(); + } + + @Override + public <T> T unwrap(final Class<T> iface) throws SQLException { + return delegate.unwrap(iface); + } + + @Override + public boolean isWrapperFor(final Class<?> iface) throws SQLException { + return delegate.isWrapperFor(iface); + } + + static class OozieDmlStatementPredicate implements Predicate<String> { + private static final Set<String> DML_PREFIXES = Sets.newHashSet( + "SELECT ", "INSERT INTO ", "UPDATE ", "DELETE FROM "); + private static final Set<String> OOZIE_TABLE_NAMES = Sets.newHashSet( + "BUNDLE_ACTIONS", "BUNDLE_JOBS", "COORD_ACTIONS", "COORD_JOBS", "SLA_REGISTRATION", "SLA_SUMMARY", + "WF_ACTIONS", "WF_JOBS"); + + @Override + public boolean apply(@Nullable String input) { + Preconditions.checkArgument(Strings.isNotEmpty(input)); + + boolean isDmlStatement = false; + for (final String dmlPrefix : DML_PREFIXES) { + if (input.toUpperCase().startsWith(dmlPrefix)) { + isDmlStatement = true; + } + } + + boolean isOozieTable = false; + for (final String oozieTableName : OOZIE_TABLE_NAMES) { + if (input.toUpperCase().contains(oozieTableName)) { + isOozieTable = true; + } + } + + return isDmlStatement && isOozieTable; + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java new file mode 100644 index 0000000..fe9f08b --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +public class FailingHSQLDBDriverWrapper extends org.hsqldb.jdbcDriver { + + public static final String USE_FAILING_DRIVER = "oozie.sql.use.failing.driver"; + + public Connection connect(final String url, + final Properties info) throws SQLException { + if (Boolean.parseBoolean(System.getProperty(USE_FAILING_DRIVER, "false"))) { + return new FailingConnectionWrapper(super.connect(url, info)); + } + + return super.connect(url, info); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java new file mode 100644 index 0000000..f7af57c --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import com.mysql.jdbc.Driver; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +/** + * Used for runtime database error injection on MySQL. + * <p/> + * Necessary steps: + * <ul> + * <li>set {@code oozie.service.JPAService.connection.data.source} to + * {@code org.apache.oozie.util.db.BasicDataSourceWrapper} within {@code oozie-site.xml}</li> + * <li>set {@code oozie.service.JPAService.jdbc.driver} to {@code org.apache.oozie.util.db.FailingMySQLDriverWrapper} + * within {@code oozie-site.xml}</li> + * <li>restart Oozie server</li> + * <li>submit / start some workflows, coordinators etc.</li> + * <li>see any of those {@code JPAException} instances with following message prefix: + * {@code Deliberately failing to prepare statement.}</li> + * </ul> + */ +public class FailingMySQLDriverWrapper extends Driver { + public FailingMySQLDriverWrapper() throws SQLException { + super(); + } + + public Connection connect(final String url, + final Properties info) throws SQLException { + return new FailingConnectionWrapper(super.connect(url, info)); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java b/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java new file mode 100644 index 0000000..16a0a82 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import java.util.concurrent.Callable; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.oozie.util.XLog; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; + +public class OperationRetryHandler { + private static XLog LOG = XLog.getLog(OperationRetryHandler.class); + @VisibleForTesting + static final RetryAttemptState RETRY_ATTEMPT_STATE = new RetryAttemptState(); + + private final int maxRetryCount; + private final long initialWaitTime; + private final long maxWaitTime; + private final Predicate<Throwable> retryPredicate; + private final boolean shouldRetry; + + public OperationRetryHandler(final int maxRetryCount, final long initialWaitTime, final long maxWaitTime, + final Predicate<Throwable> retryPredicate) { + Preconditions.checkArgument(maxRetryCount >= 0, "Retry count must not be less than zero"); + Preconditions.checkArgument(initialWaitTime > 0, "Initial wait time must be greater than zero"); + Preconditions.checkArgument(maxWaitTime >= 0, "Maximum wait time must not be less than zero"); + + this.maxRetryCount = maxRetryCount; + this.initialWaitTime = initialWaitTime; + this.maxWaitTime = maxWaitTime; + this.retryPredicate = Preconditions.checkNotNull(retryPredicate, "Retry predicate must not be null"); + this.shouldRetry = !(maxRetryCount == 0 || maxWaitTime == 0); + + LOG.trace("Retry handler parameters are set." + + "[maxRetryCount={0};initialWaitTime={1};maxWaitTime={2};retryPredicate.class={3};shouldRetry={4}]", + this.maxRetryCount, this.initialWaitTime, this.maxWaitTime, this.retryPredicate.getClass().getName(), shouldRetry); + } + + public <V> V executeWithRetry(final Callable<V> operation) throws Exception { + int retries = 0; + long waitTime = initialWaitTime; + Exception lastException = null; + + if (!shouldRetry) { + try { + LOG.trace("Configured not to retry, calling operation once."); + + final V result = operation.call(); + + LOG.trace("Operation called once successfully."); + + return result; + } + catch (final Exception e) { + LOG.error("An error occurred while calling the operation once. [e.message={0}]", e.getMessage()); + throw e; + } + } + + try { + RETRY_ATTEMPT_STATE.signalStart(); + + while (retries < maxRetryCount) { + try { + LOG.trace("Calling operation. [retries={0}]", retries); + + retries++; + final V result = operation.call(); + + LOG.trace("Operation called successfully."); + + return result; + } catch (final Exception e) { + LOG.warn("Database error", e); + + // if retries have been done by an inner retry handler, + // then we won't make any effort to do it again + if (RETRY_ATTEMPT_STATE.isExhausted()) { + LOG.error("Retry attempts have been exhausted. [e.message={0}]", e.getMessage()); + throw e; + } + + if (retryPredicate.apply(e)) { + LOG.trace("Exception is not on blacklist, handling retry. [retries={0};e.class={1}]", + retries, e.getClass().getName()); + waitTime = handleRetry(waitTime, retries); + lastException = e; + } + else { + LOG.warn("Exception is on blacklist, not handling retry. [retries={0};e.class={1}]", + retries, e.getClass().getName()); + throw e; + } + } + } + + LOG.error("Number of maximum retry attempts exhausted"); + RETRY_ATTEMPT_STATE.signalExhausted(); // signal to possible outer retry handlers + throw lastException; + } finally { + RETRY_ATTEMPT_STATE.signalEnd(); + } + } + + private long handleRetry(long sleepBeforeRetryMs, final int retries) throws InterruptedException { + LOG.warn("Operation failed, sleeping {0} milliseconds before retry #{1}", sleepBeforeRetryMs, retries); + Thread.sleep(sleepBeforeRetryMs); + sleepBeforeRetryMs *= 2; + + return sleepBeforeRetryMs > maxWaitTime ? maxWaitTime : sleepBeforeRetryMs; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java b/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java new file mode 100644 index 0000000..b742ca7 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import com.google.common.collect.Sets; +import org.apache.oozie.util.XLog; + +import javax.persistence.EntityExistsException; +import javax.persistence.EntityNotFoundException; +import javax.persistence.LockTimeoutException; +import javax.persistence.NoResultException; +import javax.persistence.NonUniqueResultException; +import javax.persistence.OptimisticLockException; +import javax.persistence.PersistenceException; +import javax.persistence.PessimisticLockException; +import javax.persistence.QueryTimeoutException; +import javax.persistence.TransactionRequiredException; +import java.util.Set; + +/** + * A {@link DatabaseRetryPredicate} which applies when a given {@link Exception} (or its causes) are NOT blacklisted. + * <p/> + * Blacklisted exceptions in this class do not indicate a network failure, therefore no retry should take place. + */ +public class PersistenceExceptionSubclassFilterRetryPredicate extends DatabaseRetryPredicate { + private static final XLog LOG = XLog.getLog(PersistenceExceptionSubclassFilterRetryPredicate.class); + private static final Set<Class<? extends PersistenceException>> BLACKLIST = Sets.newHashSet( + EntityExistsException.class, + EntityNotFoundException.class, + LockTimeoutException.class, + NoResultException.class, + NonUniqueResultException.class, + OptimisticLockException.class, + PessimisticLockException.class, + QueryTimeoutException.class, + TransactionRequiredException.class + ); + + @Override + public boolean apply(final Throwable throwable) { + LOG.trace("Retry predicate investigation started. [throwable.class={0}]", throwable.getClass().getName()); + + boolean applies = true; + + for (final Class<?> classDownTheStackTrace : getAllExceptions(throwable)) { + for (final Class<? extends PersistenceException> blacklistElement : BLACKLIST) { + if (blacklistElement.isAssignableFrom(classDownTheStackTrace)) { + applies = false; + } + } + } + + LOG.trace("Retry predicate investigation finished. [applies={0}]", applies); + + return applies; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java b/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java new file mode 100644 index 0000000..0413ee0 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * This class tracks nested {@link OperationRetryHandler} calls. Some {@code JPAExecutor} implementations call other + * {@code JPAExecutor}s. This results in two (or possibly more) {@link OperationRetryHandler#executeWithRetry(Callable)} calls. + * <p/> + * If the innermost retry handler has exhausted all attempts and rethrows the {@link Exception}, the outer handler catches that + * and would restart the JPA operation again. + * <p/> + * In order to avoid this, retry handlers must communicate with each other on the same thread by incrementing / decrementing the + * nesting level and signalling whether the maximum number of attempts have been reached. + * <p/> + * We use {@link ThreadLocal}s because retry handlers might be called from different threads in parallel. If the nesting level is + * zero, it's important to reset the {@link #exhausted} back to {@code false} since this variable is reused in the + * thread pool. + */ +final class RetryAttemptState { + private final ThreadLocal<Boolean> exhausted = new ThreadLocal<Boolean>() { + @Override + protected Boolean initialValue() { + return Boolean.FALSE; + } + }; + + private final ThreadLocal<Integer> inProgressCount = new ThreadLocal<Integer>() { + @Override + protected Integer initialValue() { + return 0; + } + }; + + void signalStart() { + Preconditions.checkState(!isExhausted(), "retry attempts exhausted"); + + inProgressCount.set(inProgressCount.get() + 1); + } + + void signalEnd() { + int currentLevel = inProgressCount.get() - 1; + inProgressCount.set(currentLevel); + if (currentLevel == 0) { + // state must be reset + exhausted.set(false); + } + } + + void signalExhausted() { + exhausted.set(true); + } + + boolean isExhausted() { + return exhausted.get(); + } + + @VisibleForTesting + int getInProgressCount() { + return inProgressCount.get(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java b/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java new file mode 100644 index 0000000..2b55e85 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import com.google.common.base.Preconditions; +import org.apache.oozie.util.XLog; + +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +public class RuntimeExceptionInjector<E extends RuntimeException> { + private static final XLog LOG = XLog.getLog(RuntimeExceptionInjector.class); + private static final AtomicLong failureCounter = new AtomicLong(0); + + private final Class<E> runtimeExceptionClass; + private final int failurePercent; + + public RuntimeExceptionInjector(final Class<E> runtimeExceptionClass, final int failurePercent) { + Preconditions.checkArgument(failurePercent <= 100 && failurePercent >= 0, + "illegal value for failure %: " + failurePercent); + + this.runtimeExceptionClass = runtimeExceptionClass; + this.failurePercent = failurePercent; + } + + public void inject(final String errorMessage) { + LOG.trace("Trying to inject random failure. [errorMessage={0}]", errorMessage); + + final ThreadLocalRandom random = ThreadLocalRandom.current(); + final int randomVal = random.nextInt(0, 100); // range: [0..99] + + if (randomVal < failurePercent) { + final long count = failureCounter.incrementAndGet(); + LOG.warn("Injecting random failure. [runtimeExceptionClass.name={0};count={1};errorMessage={2}]", + runtimeExceptionClass.getName(), count, errorMessage); + E injected; + + try { + injected = runtimeExceptionClass.getConstructor(String.class).newInstance( + "injected random failure #" + count + " ." + errorMessage); + } catch (final InstantiationException | IllegalAccessException | InvocationTargetException + | NoSuchMethodException outer) { + try { + LOG.warn("Instantiating without error message. [runtimeExceptionClass.name={0};outer.message={1}]", + runtimeExceptionClass.getName(), outer.getMessage()); + injected = runtimeExceptionClass.newInstance(); + } catch (final InstantiationException | IllegalAccessException inner) { + LOG.error("Could not instantiate. [runtimeExceptionClass.name={0};inner.message={1}]", + runtimeExceptionClass.getName(), inner.getMessage()); + throw new RuntimeException(inner); + } + + } + + throw injected; + } + + LOG.trace("Did not inject random failure."); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/META-INF/persistence.xml b/core/src/main/resources/META-INF/persistence.xml index bad9278..a74078a 100644 --- a/core/src/main/resources/META-INF/persistence.xml +++ b/core/src/main/resources/META-INF/persistence.xml @@ -43,7 +43,7 @@ <class>org.apache.oozie.util.db.ValidateConnectionBean</class> <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + <property name="openjpa.ConnectionDriverName" value="org.apache.oozie.util.db.BasicDataSourceWrapper"/> <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> @@ -102,7 +102,7 @@ <class>org.apache.oozie.util.db.ValidateConnectionBean</class> <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + <property name="openjpa.ConnectionDriverName" value="org.apache.oozie.util.db.BasicDataSourceWrapper"/> <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> @@ -163,7 +163,7 @@ <class>org.apache.oozie.util.db.ValidateConnectionBean</class> <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + <property name="openjpa.ConnectionDriverName" value="org.apache.oozie.util.db.BasicDataSourceWrapper"/> <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> @@ -222,7 +222,7 @@ <class>org.apache.oozie.util.db.ValidateConnectionBean</class> <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + <property name="openjpa.ConnectionDriverName" value="org.apache.oozie.util.db.BasicDataSourceWrapper"/> <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> @@ -282,7 +282,7 @@ <class>org.apache.oozie.util.db.ValidateConnectionBean</class> <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + <property name="openjpa.ConnectionDriverName" value="org.apache.oozie.util.db.BasicDataSourceWrapper"/> <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> @@ -342,7 +342,7 @@ <class>org.apache.oozie.util.db.ValidateConnectionBean</class> <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + <property name="openjpa.ConnectionDriverName" value="org.apache.oozie.util.db.BasicDataSourceWrapper"/> <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
