Repository: oozie Updated Branches: refs/heads/master a436d9829 -> a54f7c20d
http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 5629a89..832bbe1 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1490,9 +1490,13 @@ will be the requeue interval for the actions which are waiting for a long time w <property> <name>oozie.service.JPAService.connection.data.source</name> - <value>org.apache.commons.dbcp.BasicDataSource</value> + <value>org.apache.oozie.util.db.BasicDataSourceWrapper</value> <description> - DataSource to be used for connection pooling. + DataSource to be used for connection pooling. If you want the property + openJpa.connectionProperties="DriverClassName=..." to have a real effect, set this to + org.apache.oozie.util.db.BasicDataSourceWrapper. + A DBCP bug (https://issues.apache.org/jira/browse/DBCP-333) prevents otherwise the JDBC driver + setting to have a real effect while using custom class loader. </description> </property> @@ -1563,6 +1567,31 @@ will be the requeue interval for the actions which are waiting for a long time w </description> </property> + <property> + <name>oozie.service.JPAService.retry.initial-wait-time.ms</name> + <value>100</value> + <description> + Initial wait time in milliseconds between the first failed database operation and the re-attempted operation. The wait + time is doubled at each retry. + </description> + </property> + + <property> + <name>oozie.service.JPAService.retry.maximum-wait-time.ms</name> + <value>30000</value> + <description> + Maximum wait time between database retry attempts. + </description> + </property> + + <property> + <name>oozie.service.JPAService.retry.max-retries</name> + <value>10</value> + <description> + Maximum number of retries for a failed database operation. + </description> + </property> + <!-- SchemaService --> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/resources/oozie-log4j.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-log4j.properties b/core/src/main/resources/oozie-log4j.properties index c065f3c..ba8d7b9 100644 --- a/core/src/main/resources/oozie-log4j.properties +++ b/core/src/main/resources/oozie-log4j.properties @@ -59,3 +59,4 @@ log4j.logger.org.apache.oozie.sla=DEBUG, test log4j.logger.org.apache.hadoop=INFO, test log4j.logger.org.mortbay=INFO, test log4j.logger.org.hsqldb=INFO, test +log4j.logger.org.apache.oozie.util.db=INFO, test http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/test/java/org/apache/oozie/command/SkipCommitFaultInjection.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/SkipCommitFaultInjection.java b/core/src/test/java/org/apache/oozie/command/SkipCommitFaultInjection.java deleted file mode 100644 index 158fbfa..0000000 --- a/core/src/test/java/org/apache/oozie/command/SkipCommitFaultInjection.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.command; - -import org.apache.oozie.FaultInjection; - -public class SkipCommitFaultInjection extends FaultInjection { - - public static final String ACTION_FAILOVER_FAULT_INJECTION = "oozie.fault.injection.action.failover"; - - private static boolean ACTIVE = false; - - public boolean activate() { - ACTIVE = Boolean.parseBoolean(System.getProperty(ACTION_FAILOVER_FAULT_INJECTION, "false")); - return ACTIVE; - } - - public void deactivate() { - ACTIVE = false; - } - - public boolean isActive() { - return ACTIVE; - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index 2816e5d..0ef4a4b 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -41,7 +41,8 @@ import java.util.concurrent.atomic.AtomicLong; import javax.persistence.EntityManager; import javax.persistence.FlushModeType; -import javax.persistence.Query; +import javax.persistence.PersistenceException; +import javax.persistence.TypedQuery; import junit.framework.TestCase; @@ -96,6 +97,8 @@ import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; +import org.apache.openjpa.persistence.ArgumentException; +import org.apache.openjpa.persistence.RollbackException; /** * Base JUnit <code>TestCase</code> subclass used by all Oozie testcases. @@ -832,71 +835,29 @@ public abstract class XTestCase extends TestCase { entityManager.setFlushMode(FlushModeType.COMMIT); entityManager.getTransaction().begin(); - Query q = entityManager.createNamedQuery("GET_WORKFLOWS"); - List<WorkflowJobBean> wfjBeans = q.getResultList(); - int wfjSize = wfjBeans.size(); - for (WorkflowJobBean w : wfjBeans) { - entityManager.remove(w); - } - - q = entityManager.createNamedQuery("GET_ACTIONS"); - List<WorkflowActionBean> wfaBeans = q.getResultList(); - int wfaSize = wfaBeans.size(); - for (WorkflowActionBean w : wfaBeans) { - entityManager.remove(w); - } - - q = entityManager.createNamedQuery("GET_COORD_JOBS"); - List<CoordinatorJobBean> cojBeans = q.getResultList(); - int cojSize = cojBeans.size(); - for (CoordinatorJobBean w : cojBeans) { - entityManager.remove(w); - } - - q = entityManager.createNamedQuery("GET_COORD_ACTIONS"); - List<CoordinatorActionBean> coaBeans = q.getResultList(); - int coaSize = coaBeans.size(); - for (CoordinatorActionBean w : coaBeans) { - entityManager.remove(w); - } - - q = entityManager.createNamedQuery("GET_BUNDLE_JOBS"); - List<BundleJobBean> bjBeans = q.getResultList(); - int bjSize = bjBeans.size(); - for (BundleJobBean w : bjBeans) { - entityManager.remove(w); - } - - q = entityManager.createNamedQuery("GET_BUNDLE_ACTIONS"); - List<BundleActionBean> baBeans = q.getResultList(); - int baSize = baBeans.size(); - for (BundleActionBean w : baBeans) { - entityManager.remove(w); - } + final int wfjSize = getCountAndRemoveAll(entityManager, "GET_WORKFLOWS", WorkflowJobBean.class); + final int wfaSize = getCountAndRemoveAll(entityManager, "GET_ACTIONS", WorkflowActionBean.class); + final int cojSize = getCountAndRemoveAll(entityManager, "GET_COORD_JOBS", CoordinatorJobBean.class); + final int coaSize = getCountAndRemoveAll(entityManager, "GET_COORD_ACTIONS", CoordinatorActionBean.class); + final int bjSize = getCountAndRemoveAll(entityManager, "GET_BUNDLE_JOBS", BundleJobBean.class); + final int baSize = getCountAndRemoveAll(entityManager, "GET_BUNDLE_ACTIONS", BundleActionBean.class); + final int slaSize = getCountAndRemoveAll(entityManager, "GET_SLA_EVENTS", SLAEventBean.class); + final int slaRegSize = getCountAndRemoveAll(entityManager, "GET_ACTIONS", SLARegistrationBean.class); + final int ssSize = getCountAndRemoveAll(entityManager, "GET_SLA_SUMMARY_ALL", SLASummaryBean.class); - q = entityManager.createNamedQuery("GET_SLA_EVENTS"); - List<SLAEventBean> slaBeans = q.getResultList(); - int slaSize = slaBeans.size(); - for (SLAEventBean w : slaBeans) { - entityManager.remove(w); - } + try { + if (entityManager.getTransaction().isActive()) { + entityManager.getTransaction().commit(); + } - q = entityManager.createQuery("select OBJECT(w) from SLARegistrationBean w"); - List<SLARegistrationBean> slaRegBeans = q.getResultList(); - int slaRegSize = slaRegBeans.size(); - for (SLARegistrationBean w : slaRegBeans) { - entityManager.remove(w); + if (entityManager.isOpen()) { + entityManager.close(); + } } - - q = entityManager.createQuery("select OBJECT(w) from SLASummaryBean w"); - List<SLASummaryBean> sdBeans = q.getResultList(); - int ssSize = sdBeans.size(); - for (SLASummaryBean w : sdBeans) { - entityManager.remove(w); + catch (final RollbackException e) { + log.warn("Cannot commit current transaction. [e.message={0}]", e.getMessage()); } - entityManager.getTransaction().commit(); - entityManager.close(); log.info(wfjSize + " entries in WF_JOBS removed from DB!"); log.info(wfaSize + " entries in WF_ACTIONS removed from DB!"); log.info(cojSize + " entries in COORD_JOBS removed from DB!"); @@ -906,7 +867,30 @@ public abstract class XTestCase extends TestCase { log.info(slaSize + " entries in SLA_EVENTS removed from DB!"); log.info(slaRegSize + " entries in SLA_REGISTRATION removed from DB!"); log.info(ssSize + " entries in SLA_SUMMARY removed from DB!"); + } + private <E> int getCountAndRemoveAll(final EntityManager entityManager, + final String queryName, + final Class<E> entityClass) { + try { + final TypedQuery<E> getAllQuery = entityManager.createNamedQuery(queryName, entityClass); + final List<E> allEntities = getAllQuery.getResultList(); + final int entitiesCount = allEntities.size(); + + for (final E w : allEntities) { + entityManager.remove(w); + } + + return entitiesCount; + } catch (final RollbackException e) { + log.warn("Cannot get count or remove all entities. [queryName={0};entityClass.name={1}]", + queryName, entityClass.getName()); + return 0; + } catch (final PersistenceException | ArgumentException e) { + log.warn("Cannot get count or remove all entities. [queryName={0};entityClass.name={1}]", + queryName, entityClass.getName()); + return 0; + } } private static MiniDFSCluster dfsCluster = null; http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/test/java/org/apache/oozie/util/db/TestOozieDmlStatementPredicate.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/db/TestOozieDmlStatementPredicate.java b/core/src/test/java/org/apache/oozie/util/db/TestOozieDmlStatementPredicate.java new file mode 100644 index 0000000..c2d4b15 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/db/TestOozieDmlStatementPredicate.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestOozieDmlStatementPredicate { + + private final FailingConnectionWrapper.OozieDmlStatementPredicate statementPredicate = + new FailingConnectionWrapper.OozieDmlStatementPredicate(); + + @Test + public void testDmlNotOozieTableDoesNotApply() { + assertFalse("DML statement but not of an Oozie table", + statementPredicate.apply("SELECT * FROM wellhello")); + } + + @Test + public void testNotDmlOozieTableDoesNotApply() { + assertFalse("not a DML statement but of an Oozie table", + statementPredicate.apply("CREATE TABLE WF_JOBS")); + } + + @Test + public void testNotDmlNotOozieTableDoesNotApply() { + assertFalse("not a DML statement and not of an Oozie table", + statementPredicate.apply("CREATE TABLE wellhello")); + } + + @Test + public void testDmlAndOozieTableAppliesIgnoreCase() { + assertTrue("a DML statement and of an Oozie table", + statementPredicate.apply("SELECT * FROM wf_jobs")); + + assertTrue("a DML statement and of an Oozie table", + statementPredicate.apply("insert into WF_JOBS")); + + assertTrue("a DML statement and of an Oozie table", + statementPredicate.apply("update wf_jobs")); + + assertTrue("a DML statement and of an Oozie table", + statementPredicate.apply("DELETE FROM WF_JOBS")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/test/java/org/apache/oozie/util/db/TestOperationRetryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/db/TestOperationRetryHandler.java b/core/src/test/java/org/apache/oozie/util/db/TestOperationRetryHandler.java new file mode 100644 index 0000000..ae94199 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/db/TestOperationRetryHandler.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.Callable; + +import org.apache.commons.lang.mutable.MutableInt; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.base.Predicate; + +/** + * Conventions used in the tests: + * <ul> + * <li>{@code RuntimeException} indicates an SQL error (network failure) where we should retry</li> + * <li>{@code IllegalStateException} is a different SQL error where we should not retry</li> + * </ul> + */ +public class TestOperationRetryHandler { + + private final Predicate<Throwable> testRetryPredicate = new Predicate<Throwable>() { + @Override + public boolean apply(Throwable input) { + return input.getClass() == RuntimeException.class; + } + }; + + private final OperationRetryHandler retryHandler = new OperationRetryHandler(3, 1, 1, testRetryPredicate); + + @Test + public void testWhenConfiguredNotToRetryThenTriesOnce() throws Exception { + tryOnceAndAssert(new OperationRetryHandler(0, 1, 100, testRetryPredicate)); + tryOnceAndAssert(new OperationRetryHandler(10, 1, 0, testRetryPredicate)); + tryOnceAndAssert(new OperationRetryHandler(0, 1, 0, testRetryPredicate)); + } + + private void tryOnceAndAssert(final OperationRetryHandler tryingOnceRetryHandler) throws Exception { + @SuppressWarnings("unchecked") + final Callable<String> operation = mock(Callable.class); + final MutableInt callCount = new MutableInt(0); + + willAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + callCount.increment(); + return null; + } + }).given(operation).call(); + + tryingOnceRetryHandler.executeWithRetry(operation); + + assertCallCount(1, callCount); + assertNoRetryAttemptsAreInProgressOrExhausted(); + } + + @Test + public void testNoRetry() throws Exception { + @SuppressWarnings("unchecked") + Callable<String> operation = mock(Callable.class); + given(operation.call()).willReturn("dummy"); + + retryHandler.executeWithRetry(operation); + } + + @Test + public void testRetriesOnFailure() throws Exception { + @SuppressWarnings("unchecked") + Callable<String> operation = mock(Callable.class); + final MutableInt callCount = new MutableInt(0); + + willAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + callCount.increment(); + throw new RuntimeException(); + } + }).given(operation).call(); + + boolean exceptionThrown = false; + try { + retryHandler.executeWithRetry(operation); + } catch (RuntimeException e) { + exceptionThrown = true; + } + + assertTrue("Exception was not thrown", exceptionThrown); + assertCallCount(3, callCount); + assertNoRetryAttemptsAreInProgressOrExhausted(); + } + + @Test + public void testRetriesOnFailureRecovery() throws Exception { + @SuppressWarnings("unchecked") + Callable<String> operation = mock(Callable.class); + final MutableInt callCount = new MutableInt(0); + + willAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + callCount.increment(); + if (callCount.intValue() < 2) { + throw new RuntimeException(); + } + return null; + } + }).given(operation).call(); + + retryHandler.executeWithRetry(operation); + + assertCallCount(2, callCount); + assertNoRetryAttemptsAreInProgressOrExhausted(); + } + + @Test + public void testNoRetriesOnNonSQLError() throws Exception { + @SuppressWarnings("unchecked") + Callable<String> operation = mock(Callable.class); + final MutableInt callCount = new MutableInt(0); + + willAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + callCount.increment(); + throw new IllegalStateException(); + } + }).given(operation).call(); + + boolean exceptionThrown = false; + try { + retryHandler.executeWithRetry(operation); + } catch (IllegalStateException e) { + exceptionThrown = true; + } + + assertTrue("Exception was not thrown", exceptionThrown); + assertCallCount(1, callCount); + assertNoRetryAttemptsAreInProgressOrExhausted(); + } + + @Test + public void testEmbeddedRetryHandlersWhenInnerHandlerThrowsSQLError() throws Exception { + @SuppressWarnings("unchecked") + final Callable<String> innerOperation = mock(Callable.class); + final MutableInt callCount = new MutableInt(0); + + // internal operation which will be retried + willAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + callCount.increment(); + throw new RuntimeException(); + } + }).given(innerOperation).call(); + + @SuppressWarnings("unchecked") + Callable<String> outerOperation = mock(Callable.class); + + // the outer operation which calls the inner one and this is not + // supposed to be retried + willAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + try { + retryHandler.executeWithRetry(innerOperation); + } catch (Exception e) { + throw new RuntimeException(); + } + return null; + } + }).given(outerOperation).call(); + + boolean exceptionThrown = false; + try { + retryHandler.executeWithRetry(outerOperation); + } catch (Exception e) { + exceptionThrown = true; + } + + assertTrue("Exception was not thrown", exceptionThrown); + assertCallCount(3, callCount); + assertNoRetryAttemptsAreInProgressOrExhausted(); + } + + private void assertCallCount(int expected, MutableInt callCount) { + assertEquals("Number of retries", expected, callCount.intValue()); + } + + private void assertNoRetryAttemptsAreInProgressOrExhausted() { + assertEquals("Nesting level", 0, + OperationRetryHandler.RETRY_ATTEMPT_STATE.getInProgressCount()); + assertEquals("Retries performed", false, + OperationRetryHandler.RETRY_ATTEMPT_STATE.isExhausted()); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/test/java/org/apache/oozie/util/db/TestPersistenceExceptionSubclassFilterRetryPredicate.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/db/TestPersistenceExceptionSubclassFilterRetryPredicate.java b/core/src/test/java/org/apache/oozie/util/db/TestPersistenceExceptionSubclassFilterRetryPredicate.java new file mode 100644 index 0000000..76d2edc --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/db/TestPersistenceExceptionSubclassFilterRetryPredicate.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.XException; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.junit.Test; + +import javax.persistence.EntityExistsException; +import javax.persistence.EntityNotFoundException; +import javax.persistence.LockTimeoutException; +import javax.persistence.NoResultException; +import javax.persistence.NonUniqueResultException; +import javax.persistence.OptimisticLockException; +import javax.persistence.PersistenceException; +import javax.persistence.PessimisticLockException; +import javax.persistence.QueryTimeoutException; +import javax.persistence.RollbackException; +import javax.persistence.TransactionRequiredException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestPersistenceExceptionSubclassFilterRetryPredicate { + private final PersistenceExceptionSubclassFilterRetryPredicate predicate = + new PersistenceExceptionSubclassFilterRetryPredicate(); + + @Test + public void testFilteredJPAExceptions() { + assertFalse(predicate.apply(new EntityExistsException())); + assertFalse(predicate.apply(new EntityNotFoundException())); + assertFalse(predicate.apply(new LockTimeoutException())); + assertFalse(predicate.apply(new NoResultException())); + assertFalse(predicate.apply(new NonUniqueResultException())); + assertFalse(predicate.apply(new OptimisticLockException())); + assertFalse(predicate.apply(new PessimisticLockException())); + assertFalse(predicate.apply(new QueryTimeoutException())); + assertFalse(predicate.apply(new TransactionRequiredException())); + } + + @Test + public void testNotFilteredJPAExceptions() { + assertTrue(predicate.apply(new RollbackException())); + assertTrue(predicate.apply(new PersistenceException())); + } + + @Test + public void testNonJPAExceptions() { + assertTrue(predicate.apply(new RuntimeException())); + assertTrue(predicate.apply(new Exception())); + } + + @Test + public void testNestedFilteredJPAExceptions() { + assertFalse(predicate.apply(wrapCause(new EntityExistsException()))); + assertFalse(predicate.apply(wrapCause(new EntityNotFoundException()))); + assertFalse(predicate.apply(wrapCause(new LockTimeoutException()))); + assertFalse(predicate.apply(wrapCause(new NoResultException()))); + assertFalse(predicate.apply(wrapCause(new NonUniqueResultException()))); + assertFalse(predicate.apply(wrapCause(new OptimisticLockException()))); + assertFalse(predicate.apply(wrapCause(new PessimisticLockException()))); + assertFalse(predicate.apply(wrapCause(new QueryTimeoutException()))); + assertFalse(predicate.apply(wrapCause(new TransactionRequiredException()))); + } + + @Test + public void testNestedNotFilteredJPAExceptions() { + assertTrue(predicate.apply(wrapCause(new RollbackException()))); + assertTrue(predicate.apply(wrapCause(new PersistenceException()))); + } + + @Test + public void testNestedNonJPAExceptions() { + assertTrue(predicate.apply(wrapCause(new RuntimeException()))); + assertTrue(predicate.apply(wrapCause(new Exception()))); + } + + private JPAExecutorException wrapCause(final Throwable cause) { + return new JPAExecutorException(new XException(ErrorCode.E0603, new PersistenceException(cause))); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/test/java/org/apache/oozie/util/db/TestRetryAttemptState.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/db/TestRetryAttemptState.java b/core/src/test/java/org/apache/oozie/util/db/TestRetryAttemptState.java new file mode 100644 index 0000000..0158538 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/db/TestRetryAttemptState.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util.db; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestRetryAttemptState { + private final RetryAttemptState state = new RetryAttemptState(); + + @Test + public void testOneAttemptIsInProgressAndAttemptsAreNotExhausted() { + state.signalStart(); + + assertFalse("retry attempts exhausted", state.isExhausted()); + assertEquals("retry attempt count", 1, state.getInProgressCount()); + } + + @Test + public void testNoAttemptIsInProgressAndAttemptsAreNotExhausted() { + state.signalStart(); + state.signalEnd(); + + assertFalse("retry attempts exhausted", state.isExhausted()); + assertEquals("retry attempt count", 0, state.getInProgressCount()); + } + + @Test + public void testExhaustedFromOneThreadRemainsNotStartedFromAnotherThread() throws InterruptedException { + final Runnable exhaustingAttempt = new ExhaustingAttempt(); + + final Thread exhaustingThread = new Thread(exhaustingAttempt); + + exhaustingThread.start(); + + exhaustingThread.join(); + + assertFalse("retry attempts exhausted", state.isExhausted()); + assertEquals("retry attempt count", 0, state.getInProgressCount()); + } + + @Test + public void testNestedAttemptsFinishSuccessfullyWhenNotExhausted() throws InterruptedException { + final NestedAttempt inner = new NestedAttempt(false); + final NestedAttempt outer = new NestedAttempt(inner, false); + + outer.run(); + + assertFalse("retry attempts exhausted", state.isExhausted()); + assertEquals("retry attempt count", 0, state.getInProgressCount()); + } + + @Test + public void testOuterAttemptRemainsUnfinishedWhenInnerExhausted() throws InterruptedException { + final NestedAttempt inner = new NestedAttempt(true); + final NestedAttempt outer = new NestedAttempt(inner, false); + + outer.run(); + + assertTrue("retry attempts exhausted", state.isExhausted()); + assertEquals("retry attempt count", 1, state.getInProgressCount()); + } + + @Test + public void testInnerAttemptRemainsUnfinishedWhenOuterExhausted() throws InterruptedException { + final NestedAttempt inner = new NestedAttempt(false); + final NestedAttempt outer = new NestedAttempt(inner, true); + + outer.run(); + + assertTrue("retry attempts exhausted", state.isExhausted()); + assertEquals("retry attempt count", 1, state.getInProgressCount()); + } + + private class ExhaustingAttempt implements Runnable { + @Override + public void run() { + state.signalStart(); + + state.signalExhausted(); + } + } + + private class NestedAttempt implements Runnable { + private final NestedAttempt nestedAttempt; + private final boolean shouldExhaust; + + NestedAttempt(final boolean shouldExhaust) { + this(null, shouldExhaust); + } + + NestedAttempt(final NestedAttempt nestedAttempt, final boolean shouldExhaust) { + this.nestedAttempt = nestedAttempt; + this.shouldExhaust = shouldExhaust; + } + + @Override + public void run() { + state.signalStart(); + + if (nestedAttempt != null) { + nestedAttempt.run(); + } + + if (shouldExhaust) { + state.signalExhausted(); + } + else { + state.signalEnd(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/docs/src/site/twiki/AG_Install.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/AG_Install.twiki b/docs/src/site/twiki/AG_Install.twiki index 2125442..3509266 100644 --- a/docs/src/site/twiki/AG_Install.twiki +++ b/docs/src/site/twiki/AG_Install.twiki @@ -198,6 +198,37 @@ following configuration properties in the oozie-site.xml: oozie.service.JPAService.pool.max.active.conn=10 </verbatim> +If you are interested in fine tuning how Oozie can retry database operations on failing database connectivity or errors, you can +set following properties to other values. Here are the default ones: + +<verbatim> + oozie.service.JPAService.retry.initial-wait-time.ms=100 + oozie.service.JPAService.retry.maximum-wait-time.ms=30000 + oozie.service.JPAService.retry.max-retries=10 +</verbatim> + +If you set either =oozie.service.JPAService.retry.max-retries= or =oozie.service.JPAService.retry.maximum-wait-time.ms= to =0=, +no retry attempts will be made on any database connectivity issues. Exact settings for these properties depend also on how much load +is on Oozie regarding workflow and coordinator jobs. + +The database operation retry functionality kicks in when there is a =javax.persistence.PersistenceException= those root cause is not +part of the normal everyday operation - filtered against a blacklist consisting of descendants like =NoSuchResultException=, +=NonUniqueResultException=, and the like. This way Oozie won't retry database operations on errors that are more related to the +current query, or otherwise part of the everyday life. This way it's ensured that this blacklist is database agnostic. + +It has been tested with a MySQL / failing every minute 10 seconds / an Oozie coordinator job of an Oozie workflow consisting of four +workflow actions (some of them are asynchronous). On this setup Oozie was recovering after each and every database outage. + +To set up such a failing MySQL scenario following has to be performed: + + * Set =oozie.service.JPAService.connection.data.source= to =org.apache.oozie.util.db.BasicDataSourceWrapper= + within =oozie-site.xml= + * Set =oozie.service.JPAService.jdbc.driver= to =org.apache.oozie.util.db.FailingMySQLDriverWrapper= within =oozie-site.xml= + * Restart Oozie server + * Submit / start some workflows, coordinators etc. + * See how Oozie is retrying on injected database errors by looking at the Oozie server logs, grepping =JPAException= instances + with following message prefix: <verbatim>Deliberately failing to prepare statement.</verbatim> + ---++ Database Migration Oozie provides an easy way to switch between databases without losing any data. Oozie servers should be stopped during the http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/pom.xml ---------------------------------------------------------------------- diff --git a/minitest/pom.xml b/minitest/pom.xml index 9515284..10a89b0 100644 --- a/minitest/pom.xml +++ b/minitest/pom.xml @@ -48,13 +48,13 @@ <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> - <version>1.5.2</version> + <version>1.8.8</version> <scope>test</scope> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> - <version>1.5.2</version> + <version>1.8.8</version> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/java/org/apache/oozie/test/TestParallelJPAOperationRetries.java ---------------------------------------------------------------------- diff --git a/minitest/src/test/java/org/apache/oozie/test/TestParallelJPAOperationRetries.java b/minitest/src/test/java/org/apache/oozie/test/TestParallelJPAOperationRetries.java new file mode 100644 index 0000000..5442130 --- /dev/null +++ b/minitest/src/test/java/org/apache/oozie/test/TestParallelJPAOperationRetries.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.test; + +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.XLogService; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.db.FailingHSQLDBDriverWrapper; + +import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * To test that when a JDBC driver fails, {@link JPAService} continues working: + * <ul> + * <li>use the {@link java.sql.Connection} wrapper that is used by the wrapper extending {@link org.hsqldb.jdbcDriver}, + * and set the system property {@link FailingHSQLDBDriverWrapper#USE_FAILING_DRIVER}</li> + * <li>initialize {@link JPAService} and a JPA {@link javax.persistence.EntityManagerFactory}</li> + * <li>issue a decent number of JPA queries w/ appropriate parallelism via an {@link java.util.concurrent.Executor}</li> + * <li>an {@link TypedQuery#getResultList()} resulting in a {@code SELECT ⦠FROM WF_JOBS WHERE ...} Oozie database read</li> + * <li>followed by a {@link TypedQuery#executeUpdate()} resulting in an {@code UPDATE WF_ACTIONS SET ⦠WHERE â¦} Oozie database + * write</li> + * </ul> + */ +public class TestParallelJPAOperationRetries extends MiniOozieTestCase { + private static final XLog LOG = XLog.getLog(TestParallelJPAOperationRetries.class); + private static final String ORIGINAL_LOG4J_FILE = System.getProperty(XLogService.LOG4J_FILE); + + private volatile Exception executorException; + + @Override + protected void setUp() throws Exception { + executorException = null; + System.setProperty(XLogService.LOG4J_FILE, "oozie-log4j.properties"); + System.setProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER, Boolean.TRUE.toString()); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + + if (ORIGINAL_LOG4J_FILE != null) { + System.setProperty(XLogService.LOG4J_FILE, ORIGINAL_LOG4J_FILE); + } + } + + public void testParallelJPAOperationsOnWorkflowBeansRetryAndSucceed() throws Exception { + final ExecutorService taskExecutor = Executors.newFixedThreadPool(4); + final int totalTaskCount = 10; + final CountDownLatch countDownLatch = new CountDownLatch(totalTaskCount); + + for (int ixTask = 1; ixTask <= totalTaskCount; ixTask++) { + final int taskCount = ixTask; + taskExecutor.execute(new Runnable() { + @Override + public void run() { + try { + LOG.debug("Task #{0} started.", taskCount); + + getResultListAndExecuteUpdateOnWorkflowBeans(); + + LOG.debug("Task #{0} finished.", taskCount); + } + catch (final SQLException e) { + executorException = e; + } + finally { + countDownLatch.countDown(); + } + } + }); + } + + countDownLatch.await(); + + if (executorException != null) { + fail(String.format("Should not get an SQLException while executing SQL operations. [e.message=%s]", + executorException.getMessage())); + } + + taskExecutor.shutdown(); + + try { + taskExecutor.awaitTermination(1, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + fail("Should not get an interrupt while shutting down ExecutorService."); + } + } + + private void getResultListAndExecuteUpdateOnWorkflowBeans() throws SQLException { + final JPAService jpaService = Services.get().get(JPAService.class); + + final int checkAgeSecs = 86_400; + final Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); + + final EntityManager em = jpaService.getEntityManager(); + final TypedQuery<WorkflowActionBean> q = em.createNamedQuery("GET_RUNNING_ACTIONS", WorkflowActionBean.class); + q.setParameter("lastCheckTime", ts); + final List<WorkflowActionBean> actions = q.getResultList(); + + assertEquals("no WorkflowActionBeans should be present", 0, actions.size()); + + if (!em.getTransaction().isActive()) { + em.getTransaction().begin(); + } + + final int rowsAffected = em.createNamedQuery("UPDATE_ACTION_FOR_LAST_CHECKED_TIME") + .setParameter("lastCheckTime", 0l) + .setParameter("id", "666") + .executeUpdate(); + if (em.getTransaction().isActive()) { + em.getTransaction().commit(); + } + + assertEquals("no rows should be affected when updating WorkflowActionBeans", 0, rowsAffected); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/java/org/apache/oozie/test/TestWorkflow.java ---------------------------------------------------------------------- diff --git a/minitest/src/test/java/org/apache/oozie/test/TestWorkflow.java b/minitest/src/test/java/org/apache/oozie/test/TestWorkflow.java new file mode 100644 index 0000000..4257b60 --- /dev/null +++ b/minitest/src/test/java/org/apache/oozie/test/TestWorkflow.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.test; + +import com.google.common.base.Strings; +import org.apache.oozie.action.hadoop.JavaActionExecutor; +import org.apache.oozie.client.OozieClientException; +import org.apache.oozie.service.XLogService; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.local.LocalOozie; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.Writer; +import java.io.OutputStreamWriter; +import java.util.Date; +import java.util.Properties; + +import static org.junit.Assume.assumeFalse; + +/** + * {@code MiniOozie} integration test for different workflow kinds. + */ +public class TestWorkflow extends MiniOozieTestCase { + + @Override + protected void setUp() throws Exception { + System.setProperty(XLogService.LOG4J_FILE, "oozie-log4j.properties"); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testWorkflowWithStartAndEndCompletesSuccessfully() throws Exception { + final String wfApp = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='test-wf'>" + " <start to='end'/>" + + " <end name='end'/>" + "</workflow-app>"; + + final FileSystem fs = getFileSystem(); + final Path appPath = new Path(getFsTestCaseDir(), "app"); + fs.mkdirs(appPath); + fs.mkdirs(new Path(appPath, "lib")); + + final Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml"))); + writer.write(wfApp); + writer.close(); + + final OozieClient wc = LocalOozie.getClient(); + + final Properties conf = wc.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, new Path(appPath, "workflow.xml").toString()); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + + + final String jobId = wc.submit(conf); + assertNotNull(jobId); + + WorkflowJob wf = wc.getJobInfo(jobId); + assertNotNull(wf); + assertEquals(WorkflowJob.Status.PREP, wf.getStatus()); + + wc.start(jobId); + + waitFor(1000, new Predicate() { + public boolean evaluate() throws Exception { + final WorkflowJob wf = wc.getJobInfo(jobId); + return wf.getStatus() == WorkflowJob.Status.SUCCEEDED; + } + }); + + wf = wc.getJobInfo(jobId); + assertNotNull(wf); + assertEquals(WorkflowJob.Status.SUCCEEDED, wf.getStatus()); + } + + public void testFsDecisionWorkflowCompletesSuccessfully() throws Exception { + final String workflowFileName = "fs-decision.xml"; + final Properties additionalWorkflowProperties = new Properties(); + + runWorkflowFromFile(workflowFileName, additionalWorkflowProperties); + } + + public void testParallelFsAndShellWorkflowCompletesSuccessfully() throws Exception { + final String workflowFileName = "parallel-fs-and-shell.xml"; + final Properties additionalWorkflowProperties = new Properties(); + final boolean isEvenSecond = new Date().getTime() % 2 == 0; + additionalWorkflowProperties.setProperty("choosePath1", Boolean.toString(isEvenSecond)); + + final String envJavaHome = System.getenv("JAVA_HOME"); + assumeFalse("Environment variable JAVA_HOME has to be set", Strings.isNullOrEmpty(envJavaHome)); + + additionalWorkflowProperties.setProperty("oozie.launcher." + JavaActionExecutor.YARN_AM_ENV, envJavaHome); + + runWorkflowFromFile(workflowFileName, additionalWorkflowProperties); + } + + private void runWorkflowFromFile(final String workflowFileName, final Properties additionalWorkflowProperties) + throws IOException, OozieClientException { + final FileSystem fs = getFileSystem(); + final Path appPath = new Path(getFsTestCaseDir(), "app"); + fs.mkdirs(appPath); + fs.mkdirs(new Path(appPath, "lib")); + + final Reader reader = getResourceAsReader(workflowFileName, -1); + final Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml"))); + copyCharStream(reader, writer); + writer.close(); + reader.close(); + + final Path path = getFsTestCaseDir(); + + final OozieClient oozieClient = LocalOozie.getClient(); + + final Properties conf = oozieClient.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, new Path(appPath, "workflow.xml").toString()); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + conf.setProperty("nameNodeBasePath", path.toString()); + conf.setProperty("base", path.toUri().getPath()); + conf.setProperty("nameNode", getNameNodeUri()); + conf.setProperty("jobTracker", getJobTrackerUri()); + + for (final String additionalKey : additionalWorkflowProperties.stringPropertyNames()) { + conf.setProperty(additionalKey, additionalWorkflowProperties.getProperty(additionalKey)); + } + + final String jobId = oozieClient.submit(conf); + assertNotNull(jobId); + + WorkflowJob wf = oozieClient.getJobInfo(jobId); + assertNotNull(wf); + assertEquals(WorkflowJob.Status.PREP, wf.getStatus()); + + oozieClient.start(jobId); + + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + final WorkflowJob wf = oozieClient.getJobInfo(jobId); + return wf.getStatus() == WorkflowJob.Status.SUCCEEDED; + } + }); + + wf = oozieClient.getJobInfo(jobId); + assertNotNull(wf); + assertEquals(WorkflowJob.Status.SUCCEEDED, wf.getStatus()); + } + + /** + * Return a classpath resource as a stream. + * <p/> + * + * @param path classpath for the resource. + * @param maxLen max content length allowed. + * @return the stream for the resource. + * @throws IOException thrown if the resource could not be read. + */ + private InputStream getResourceAsStream(final String path, final int maxLen) throws IOException { + final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(path); + if (is == null) { + throw new IllegalArgumentException("resource " + path + " not found"); + } + return is; + } + + /** + * Return a classpath resource as a reader. + * <p/> + * It is assumed that the resource is a text resource. + * + * @param path classpath for the resource. + * @param maxLen max content length allowed. + * @return the reader for the resource. + * @throws IOException thrown if the resource could not be read. + */ + private Reader getResourceAsReader(final String path, final int maxLen) throws IOException { + return new InputStreamReader(getResourceAsStream(path, maxLen)); + } + + /** + * Copies an char input stream into an char output stream. + * + * @param reader reader to copy from. + * @param writer writer to copy to. + * @throws IOException thrown if the copy failed. + */ + private void copyCharStream(final Reader reader, final Writer writer) throws IOException { + final char[] buffer = new char[4096]; + int read; + while ((read = reader.read(buffer)) > -1) { + writer.write(buffer, 0, read); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/java/org/apache/oozie/test/TestWorkflowRetries.java ---------------------------------------------------------------------- diff --git a/minitest/src/test/java/org/apache/oozie/test/TestWorkflowRetries.java b/minitest/src/test/java/org/apache/oozie/test/TestWorkflowRetries.java new file mode 100644 index 0000000..5ae1bb4 --- /dev/null +++ b/minitest/src/test/java/org/apache/oozie/test/TestWorkflowRetries.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.test; + +import org.apache.oozie.util.db.FailingHSQLDBDriverWrapper; + +/** + * {@code MiniOozie} integration test for different workflow kinds, using a HSQLDB driver that fails sometimes. + */ +public class TestWorkflowRetries extends TestWorkflow { + + @Override + protected void setUp() throws Exception { + System.setProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER, Boolean.TRUE.toString()); + + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/java/org/apache/oozie/test/WorkflowTest.java ---------------------------------------------------------------------- diff --git a/minitest/src/test/java/org/apache/oozie/test/WorkflowTest.java b/minitest/src/test/java/org/apache/oozie/test/WorkflowTest.java deleted file mode 100644 index 2845f0a..0000000 --- a/minitest/src/test/java/org/apache/oozie/test/WorkflowTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.test; - -import org.apache.oozie.service.XLogService; -import org.apache.oozie.test.MiniOozieTestCase; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.local.LocalOozie; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.io.Writer; -import java.io.OutputStreamWriter; -import java.util.Properties; - -/** - * MiniOozie unit test - */ -public class WorkflowTest extends MiniOozieTestCase { - - @Override - protected void setUp() throws Exception { - System.setProperty(XLogService.LOG4J_FILE, "oozie-log4j.properties"); - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - public void testWorkflowRun() throws Exception { - String wfApp = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='test-wf'>" + " <start to='end'/>" - + " <end name='end'/>" + "</workflow-app>"; - - FileSystem fs = getFileSystem(); - Path appPath = new Path(getFsTestCaseDir(), "app"); - fs.mkdirs(appPath); - fs.mkdirs(new Path(appPath, "lib")); - - Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml"))); - writer.write(wfApp); - writer.close(); - - final OozieClient wc = LocalOozie.getClient(); - - Properties conf = wc.createConfiguration(); - conf.setProperty(OozieClient.APP_PATH, new Path(appPath, "workflow.xml").toString()); - conf.setProperty(OozieClient.USER_NAME, getTestUser()); - - - final String jobId = wc.submit(conf); - assertNotNull(jobId); - - WorkflowJob wf = wc.getJobInfo(jobId); - assertNotNull(wf); - assertEquals(WorkflowJob.Status.PREP, wf.getStatus()); - - wc.start(jobId); - - waitFor(1000, new Predicate() { - public boolean evaluate() throws Exception { - WorkflowJob wf = wc.getJobInfo(jobId); - return wf.getStatus() == WorkflowJob.Status.SUCCEEDED; - } - }); - - wf = wc.getJobInfo(jobId); - assertNotNull(wf); - assertEquals(WorkflowJob.Status.SUCCEEDED, wf.getStatus()); - - } - - public void testWorkflowRunFromFile() throws Exception { - FileSystem fs = getFileSystem(); - Path appPath = new Path(getFsTestCaseDir(), "app"); - fs.mkdirs(appPath); - fs.mkdirs(new Path(appPath, "lib")); - - Reader reader = getResourceAsReader("wf-test.xml", -1); - Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml"))); - copyCharStream(reader, writer); - writer.close(); - reader.close(); - - Path path = getFsTestCaseDir(); - - final OozieClient wc = LocalOozie.getClient(); - - Properties conf = wc.createConfiguration(); - conf.setProperty(OozieClient.APP_PATH, new Path(appPath, "workflow.xml").toString()); - conf.setProperty(OozieClient.USER_NAME, getTestUser()); - conf.setProperty("nnbase", path.toString()); - conf.setProperty("base", path.toUri().getPath()); - - - - final String jobId = wc.submit(conf); - assertNotNull(jobId); - - WorkflowJob wf = wc.getJobInfo(jobId); - assertNotNull(wf); - assertEquals(WorkflowJob.Status.PREP, wf.getStatus()); - - wc.start(jobId); - - waitFor(15 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - WorkflowJob wf = wc.getJobInfo(jobId); - return wf.getStatus() == WorkflowJob.Status.SUCCEEDED; - } - }); - - wf = wc.getJobInfo(jobId); - assertNotNull(wf); - assertEquals(WorkflowJob.Status.SUCCEEDED, wf.getStatus()); - - } - - /** - * Return a classpath resource as a stream. - * <p/> - * - * @param path classpath for the resource. - * @param maxLen max content length allowed. - * @return the stream for the resource. - * @throws IOException thrown if the resource could not be read. - */ - public InputStream getResourceAsStream(String path, int maxLen) throws IOException { - InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(path); - if (is == null) { - throw new IllegalArgumentException("resource " + path + " not found"); - } - return is; - } - - /** - * Return a classpath resource as a reader. - * <p/> - * It is assumed that the resource is a text resource. - * - * @param path classpath for the resource. - * @param maxLen max content length allowed. - * @return the reader for the resource. - * @throws IOException thrown if the resource could not be read. - */ - public Reader getResourceAsReader(String path, int maxLen) throws IOException { - return new InputStreamReader(getResourceAsStream(path, maxLen)); - } - - /** - * Copies an char input stream into an char output stream. - * - * @param reader reader to copy from. - * @param writer writer to copy to. - * @throws IOException thrown if the copy failed. - */ - public void copyCharStream(Reader reader, Writer writer) throws IOException { - char[] buffer = new char[4096]; - int read; - while ((read = reader.read(buffer)) > -1) { - writer.write(buffer, 0, read); - } - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/resources/fs-decision.xml ---------------------------------------------------------------------- diff --git a/minitest/src/test/resources/fs-decision.xml b/minitest/src/test/resources/fs-decision.xml new file mode 100644 index 0000000..72ac07a --- /dev/null +++ b/minitest/src/test/resources/fs-decision.xml @@ -0,0 +1,51 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +* +http://www.apache.org/licenses/LICENSE-2.0 +* +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<workflow-app xmlns="uri:oozie:workflow:0.1" name="fs-decision"> + <start to="fs1"/> + <action name="fs1"> + <fs> + <mkdir path="${nameNodeBasePath}/p1"/> + </fs> + <ok to="fs2"/> + <error to="k"/> + </action> + <action name="fs2"> + <fs> + <move source="${nameNodeBasePath}/p1" target="${base}/p2"/> + </fs> + <ok to="dec3"/> + <error to="k"/> + </action> + <decision name="dec3"> + <switch> + <case to="fs4">${fs:exists(concat(nameNodeBasePath, '/p2'))}</case> + <default to="k"/> + </switch> + </decision> + <action name="fs4"> + <fs> + <move source="${nameNodeBasePath}/p2" target="${base}/p3"/> + </fs> + <ok to="end"/> + <error to="k"/> + </action> + <kill name="k"> + <message>kill</message> + </kill> + <end name="end"/> +</workflow-app> http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/resources/hsqldb-oozie-site.xml ---------------------------------------------------------------------- diff --git a/minitest/src/test/resources/hsqldb-oozie-site.xml b/minitest/src/test/resources/hsqldb-oozie-site.xml index fa5fe9c..2cdce74 100644 --- a/minitest/src/test/resources/hsqldb-oozie-site.xml +++ b/minitest/src/test/resources/hsqldb-oozie-site.xml @@ -19,8 +19,12 @@ --> <configuration> <property> + <name>oozie.service.JPAService.connection.data.source</name> + <value>org.apache.oozie.util.db.BasicDataSourceWrapper</value> + </property> + <property> <name>oozie.service.JPAService.jdbc.driver</name> - <value>org.hsqldb.jdbcDriver</value> + <value>org.apache.oozie.util.db.FailingHSQLDBDriverWrapper</value> </property> <property> <name>oozie.service.JPAService.jdbc.url</name> http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/resources/oozie-log4j.properties ---------------------------------------------------------------------- diff --git a/minitest/src/test/resources/oozie-log4j.properties b/minitest/src/test/resources/oozie-log4j.properties index c142d72..2503851 100644 --- a/minitest/src/test/resources/oozie-log4j.properties +++ b/minitest/src/test/resources/oozie-log4j.properties @@ -34,3 +34,4 @@ log4j.logger.org.apache.openjpa=INFO, test log4j.logger.org.apache.hadoop=INFO, test log4j.logger.org.mortbay=INFO, test log4j.logger.org.hsqldb=INFO, test +log4j.logger.org.apache.oozie.util.db=TRACE, test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/resources/parallel-fs-and-shell.xml ---------------------------------------------------------------------- diff --git a/minitest/src/test/resources/parallel-fs-and-shell.xml b/minitest/src/test/resources/parallel-fs-and-shell.xml new file mode 100644 index 0000000..7118fad --- /dev/null +++ b/minitest/src/test/resources/parallel-fs-and-shell.xml @@ -0,0 +1,73 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +* +http://www.apache.org/licenses/LICENSE-2.0 +* +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<workflow-app name="parallel-fs-and-shell" xmlns="uri:oozie:workflow:0.5"> + <start to="decision"/> + <kill name="kill"> + <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <action name="fs1"> + <fs> + <delete path='${nameNodeBasePath}/user/${wf:user()}/fs1'/> + <mkdir path='${nameNodeBasePath}/user/${wf:user()}/fs1'/> + <touchz path='${nameNodeBasePath}/user/${wf:user()}/fs1/fs1.out'/> + </fs> + <ok to="shell1"/> + <error to="kill"/> + </action> + <action name="fs2"> + <fs> + <delete path='${nameNodeBasePath}/user/${wf:user()}/fs2'/> + <mkdir path='${nameNodeBasePath}/user/${wf:user()}/fs2'/> + <touchz path='${nameNodeBasePath}/user/${wf:user()}/fs2/fs2.out'/> + </fs> + <ok to="shell2"/> + <error to="kill"/> + </action> + <decision name="decision"> + <switch> + <case to="fs1"> + ${ choosePath1 } + </case> + <case to="fs2"> + ${ !choosePath1 } + </case> + <default to="End"/> + </switch> + </decision> + <action name="shell1"> + <shell xmlns="uri:oozie:shell-action:0.1"> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <exec>ls</exec> + <capture-output/> + </shell> + <ok to="End"/> + <error to="kill"/> + </action> + <action name="shell2"> + <shell xmlns="uri:oozie:shell-action:0.1"> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <exec>ls</exec> + <capture-output/> + </shell> + <ok to="End"/> + <error to="kill"/> + </action> + <end name="End"/> +</workflow-app> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/minitest/src/test/resources/wf-test.xml ---------------------------------------------------------------------- diff --git a/minitest/src/test/resources/wf-test.xml b/minitest/src/test/resources/wf-test.xml deleted file mode 100644 index 20c4946..0000000 --- a/minitest/src/test/resources/wf-test.xml +++ /dev/null @@ -1,51 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at -* -http://www.apache.org/licenses/LICENSE-2.0 -* -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -<workflow-app xmlns="uri:oozie:workflow:0.1" name="recovery-wf"> - <start to="fs1"/> - <action name="fs1"> - <fs> - <mkdir path="${nnbase}/p1"/> - </fs> - <ok to="fs2"/> - <error to="k"/> - </action> - <action name="fs2"> - <fs> - <move source="${nnbase}/p1" target="${base}/p2"/> - </fs> - <ok to="dec3"/> - <error to="k"/> - </action> - <decision name="dec3"> - <switch> - <case to="fs4">${fs:exists(concat(nnbase, '/p2'))}</case> - <default to="k"/> - </switch> - </decision> - <action name="fs4"> - <fs> - <move source="${nnbase}/p2" target="${base}/p3"/> - </fs> - <ok to="end"/> - <error to="k"/> - </action> - <kill name="k"> - <message>kill</message> - </kill> - <end name="end"/> -</workflow-app> http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 16c5137..0d1105b 100644 --- a/pom.xml +++ b/pom.xml @@ -1282,7 +1282,7 @@ <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> - <version>5.1.6</version> + <version>5.1.34</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index cf5956a..7443f2c 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-2854 Oozie should handle transient database problems (andras.piros via gezapeti) OOZIE-2371 Add docs for state transitions for WF Action states (daniel.becker via gezapeti) OOZIE-2911 Re-add test testWfActionKillChildJob and adapt it to OYA (gezapeti) OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko)
