Repository: oozie Updated Branches: refs/heads/master 4aa7bbde3 -> cf6540977
OOZIE-3160 amend PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cf654097 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cf654097 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cf654097 Branch: refs/heads/master Commit: cf654097755c3c4997614e1a9844c79193b5bc48 Parents: 4aa7bbd Author: Andras Piros <[email protected]> Authored: Wed Oct 3 14:42:40 2018 +0200 Committer: Andras Piros <[email protected]> Committed: Wed Oct 3 14:42:40 2018 +0200 ---------------------------------------------------------------------- .../oozie/service/AsyncXCommandExecutor.java | 21 ++++- .../oozie/service/CallableQueueService.java | 85 +++++++++++--------- core/src/main/resources/oozie-default.xml | 24 ++++-- .../service/TestAsyncXCommandExecutor.java | 23 +++--- ...TestHAPartitionDependencyManagerService.java | 1 + .../java/org/apache/oozie/test/XTestCase.java | 3 + .../java/org/apache/oozie/test/ZKXTestCase.java | 4 +- release-log.txt | 1 + 8 files changed, 108 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java index b18a37a..1f37622 100644 --- a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java +++ b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java @@ -63,6 +63,7 @@ public class AsyncXCommandExecutor { private final long maxActiveCommands; // equivalent of "queueSize" in CQS private final long maxWait; private final long maxPriority; + private final int awaitTerminationTimeoutSeconds; private final BlockingQueue<CallableWrapper<?>> priorityBlockingQueue; private final BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayWorkQueue; @@ -78,7 +79,8 @@ public class AsyncXCommandExecutor { CallableQueueService callableAccess, long maxActiveCommands, long maxWait, - int priorities) { + int priorities, + int awaitTerminationTimeoutSeconds) { priorityBlockingQueue = new PriorityBlockingQueue<CallableWrapper<?>>(100, new PriorityComparator()); @@ -115,6 +117,9 @@ public class AsyncXCommandExecutor { this.pendingCommandsPerType = new ConcurrentHashMap<>(); Preconditions.checkArgument(priorities > 0, "Number of priorities must be >0"); this.maxPriority = priorities - 1; + Preconditions.checkArgument(awaitTerminationTimeoutSeconds > 0, + String.format("Await termination timeout must be >0, is %s", awaitTerminationTimeoutSeconds)); + this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds; } @VisibleForTesting @@ -128,7 +133,8 @@ public class AsyncXCommandExecutor { ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType, AtomicInteger activeCommands, long maxWait, - long priorities) { + long priorities, + int awaitTerminationTimeoutSeconds) { this.priorityBlockingQueue = priorityBlockingQueue; this.delayWorkQueue = delayQueue; @@ -141,6 +147,7 @@ public class AsyncXCommandExecutor { this.activeCommands = activeCommands; this.maxWait = maxWait; this.maxPriority = priorities - 1; + this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds; } public synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) { @@ -266,6 +273,14 @@ public class AsyncXCommandExecutor { } } + public boolean isShutDown() { + return executor.isShutdown() || scheduledExecutor.isShutdown(); + } + + public boolean isTerminated() { + return executor.isTerminated() || scheduledExecutor.isTerminated(); + } + public List<String> getQueueDump() { List<CallableWrapper<?>> copyOfPending = new ArrayList<>(100); List<String> queueDump = new ArrayList<>(100); @@ -399,7 +414,7 @@ public class AsyncXCommandExecutor { } private void shutdownExecutor(ExecutorService executor, String name) throws InterruptedException { - long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30); + long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.awaitTerminationTimeoutSeconds); executor.shutdown(); while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { log.info("Waiting for [{0}] to shutdown", name); http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/java/org/apache/oozie/service/CallableQueueService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java index dc9a099..a942600 100644 --- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java +++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java @@ -86,7 +86,9 @@ public class CallableQueueService implements Service, Instrumentable { public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; public static final String CONF_THREADS = CONF_PREFIX + "threads"; - public static final String CONF_OLDIMPL = CONF_PREFIX + "queue.oldImpl"; + public static final String CONF_NEWIMPL = CONF_PREFIX + "queue.newImpl"; + public static final String CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS = + CONF_PREFIX + "queue.awaitTermination.timeout.seconds"; public static final String CONF_DELAYED_CALLABLE_THREADS = CONF_PREFIX + "delayedcallable.threads"; public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; @@ -110,6 +112,8 @@ public class CallableQueueService implements Service, Instrumentable { private int maxCallableConcurrency; + private int queueAwaitTerminationTimeoutSeconds; + private boolean callableBegin(XCallable<?> callable) { synchronized (activeCallables) { AtomicInteger counter = activeCallables.get(callable.getType()); @@ -136,7 +140,7 @@ public class CallableQueueService implements Service, Instrumentable { } } - if (!oldImpl) { + if (newImpl) { asyncXCommandExecutor.commandFinished(); asyncXCommandExecutor.checkMaxConcurrency(callable.getType()); } @@ -441,7 +445,7 @@ public class CallableQueueService implements Service, Instrumentable { private PriorityDelayQueue<CallableWrapper<?>> queue; private ThreadPoolExecutor executor; private Instrumentation instrumentation; - private boolean oldImpl = false; + private boolean newImpl = false; private AsyncXCommandExecutor asyncXCommandExecutor; /** @@ -525,21 +529,11 @@ public class CallableQueueService implements Service, Instrumentable { interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE); - oldImpl = ConfigurationService.getBoolean(CONF_OLDIMPL, false); - log.info("Using old queue implementation: [{0}]", oldImpl); + newImpl = ConfigurationService.getBoolean(CONF_NEWIMPL, true); + log.info("Using new queue implementation: [{0}]", newImpl); + queueAwaitTerminationTimeoutSeconds = ConfigurationService.getInt(conf, CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS); - if (oldImpl) { - executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue, - new NamedThreadFactory("CallableQueue")) { - protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t,r); - XLog.Info.get().clear(); - } - protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - return (RunnableFuture<T>)callable; - } - }; - } else { + if (newImpl) { int delayedCallableThreads = ConfigurationService.getInt(CONF_DELAYED_CALLABLE_THREADS, 1); asyncXCommandExecutor = new AsyncXCommandExecutor(threads, @@ -548,9 +542,21 @@ public class CallableQueueService implements Service, Instrumentable { this, queueSize, MAX_CALLABLE_WAITTIME_MS, - PRIORITIES); + PRIORITIES, + queueAwaitTerminationTimeoutSeconds); executor = asyncXCommandExecutor.getExecutorService(); + } else { + executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue, + new NamedThreadFactory("CallableQueue")) { + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t,r); + XLog.Info.get().clear(); + } + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + return (RunnableFuture<T>)callable; + } + }; } // IMPORTANT: The ThreadPoolExecutor does not always the execute @@ -586,20 +592,22 @@ public class CallableQueueService implements Service, Instrumentable { @Override public void destroy() { try { - long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds - executor.shutdown(); queue.clear(); - while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { - log.info("Waiting for executor to shutdown"); - if (System.currentTimeMillis() > limit) { - log.warn("Gave up, continuing without waiting for executor to shutdown"); - break; - } - } - - if (!oldImpl) { + if (newImpl) { asyncXCommandExecutor.shutdown(); } + else { + long limit = System.currentTimeMillis() + queueAwaitTerminationTimeoutSeconds * 1000; + executor.shutdown(); + queue.clear(); + while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { + log.info("Waiting for executor to shutdown"); + if (System.currentTimeMillis() > limit) { + log.warn("Gave up, continuing without waiting for executor to shutdown"); + break; + } + } + } } catch (InterruptedException ex) { log.warn(ex); @@ -620,11 +628,18 @@ public class CallableQueueService implements Service, Instrumentable { * @return int size of queue */ public synchronized int queueSize() { - return oldImpl ? queue.size() : asyncXCommandExecutor.getSize(); + return newImpl ? asyncXCommandExecutor.getSize() : queue.size(); } private synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) { - if (oldImpl) { + if (newImpl) { + if (asyncXCommandExecutor.isShutDown() || asyncXCommandExecutor.isTerminated()) { + log.warn("Async executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); + } + else { + asyncXCommandExecutor.queue(wrapper, ignoreQueueSize); + } + } else { if (!ignoreQueueSize && queue.size() >= queueSize) { log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); return false; @@ -644,8 +659,6 @@ public class CallableQueueService implements Service, Instrumentable { else { log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); } - } else { - asyncXCommandExecutor.queue(wrapper, ignoreQueueSize); } return true; @@ -820,7 +833,9 @@ public class CallableQueueService implements Service, Instrumentable { * @return the list of string that representing each CallableWrapper */ public List<String> getQueueDump() { - if (oldImpl) { + if (newImpl) { + return asyncXCommandExecutor.getQueueDump(); + } else { List<String> list = new ArrayList<String>(); for (QueueElement<CallableWrapper<?>> qe : queue) { if (qe.toString() == null) { @@ -829,8 +844,6 @@ public class CallableQueueService implements Service, Instrumentable { list.add(qe.toString()); } return list; - } else { - return asyncXCommandExecutor.getQueueDump(); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index f292ad3..6c7fc9d 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -518,13 +518,27 @@ </property> <property> - <name>oozie.service.CallableQueueService.queue.oldImpl</name> + <name>oozie.service.CallableQueueService.queue.newImpl</name> <value>true</value> <description> - If set to false, then CallableQueueService will use a more performant, less CPU-intensive - queuing mechanism to execute asynchronous tasks internally. The old implementation generates - noticeable CPU load even if Oozie is completely idle, especially when oozie.service.CallableQueueService.threads - set to a large number. The previous queuing mechanism is kept as a fallback option. + If set to true, then CallableQueueService will use a faster, less CPU-intensive queuing mechanism to execute + asynchronous tasks internally. + The old implementation generates noticeable CPU load even if Oozie is completely idle, especially when + oozie.service.CallableQueueService.threads is set to a large number. The previous queuing mechanism is kept as a + fallback option. + This is an experimental feature in Oozie 5.1.0 that needs to be re-evaluated upon an upcoming minor release, + meaning the old implementation and this feature flag will also be removed. + </description> + </property> + + <property> + <name>oozie.service.CallableQueueService.queue.awaitTermination.timeout.seconds</name> + <value>30</value> + <description> + Number of seconds while awaiting termination of ThreadPoolExecutor instances when CallableQueueService#destroy() + is called, in seconds. + The more elements you tend to have in your callable queue, the more you want CallableQueueService to wait + before shutting down its thread pools. </description> </property> http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java index 2dce409..e0d14d6 100644 --- a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java +++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java @@ -74,6 +74,7 @@ public class TestAsyncXCommandExecutor { private static final long DEFAULT_MAXWAIT = 30_000; private static final int TEST_PRIORITIES = 5; private static final int MAX_PRIORITY = TEST_PRIORITIES - 1; + private static final int AWAIT_TERMINATION_TIMEOUT_SECONDS = 1; @Mock private ThreadPoolExecutor executor; @@ -100,7 +101,7 @@ public class TestAsyncXCommandExecutor { pendingCommandsPerType = new ConcurrentHashMap<>(); delayQueue = new LinkedBlockingQueue<>(); // in reality it's not LBQ, but it's fine here asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT, - TEST_PRIORITIES); + TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS); when(callableWrapper.filterDuplicates()).thenReturn(true); when(callableWrapper.getElement().getKey()).thenReturn("key"); when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE); @@ -158,7 +159,7 @@ public class TestAsyncXCommandExecutor { @Test public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() { - asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES); + asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS); when(callableWrapper.getInitialDelay()).thenReturn(100L); when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L); XCallable<?> wrappedCommand = mock(XCallable.class); @@ -242,7 +243,7 @@ public class TestAsyncXCommandExecutor { @Test public void testSubmissionWhenQueueIsFull() { - asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES); + asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS); callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS); when(callableWrapper.filterDuplicates()).thenReturn(true); when(callableWrapper.getElement().getKey()).thenReturn("key"); @@ -257,7 +258,7 @@ public class TestAsyncXCommandExecutor { @Test public void testSubmissionWhenQueueSizeIsIgnored() { - asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES); + asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS); callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS); when(callableWrapper.filterDuplicates()).thenReturn(true); when(callableWrapper.getElement().getKey()).thenReturn("key"); @@ -369,7 +370,8 @@ public class TestAsyncXCommandExecutor { @Test public void testAntiStarvationWhenDelayIsAboveMaxWait() { - asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES); + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES, + AWAIT_TERMINATION_TIMEOUT_SECONDS); when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L); when(callableWrapper.getPriority()).thenReturn(0); pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper)); @@ -394,7 +396,8 @@ public class TestAsyncXCommandExecutor { @Test public void testAntiStarvationWhenPriorityIsHighest() { - asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES); + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES, + AWAIT_TERMINATION_TIMEOUT_SECONDS); when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L); when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY); pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper)); @@ -419,7 +422,8 @@ public class TestAsyncXCommandExecutor { @Test public void testPriorityHandling() { - asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100); + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100, + AWAIT_TERMINATION_TIMEOUT_SECONDS); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { @@ -490,7 +494,7 @@ public class TestAsyncXCommandExecutor { } private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables, - long maxWait, int priorities) { + long maxWait, int priorities, int awaitTerminationTimeoutSeconds) { return new AsyncXCommandExecutor(needMaxConcurrencyCheck, callableQueueService, maxActiveCallables, @@ -501,6 +505,7 @@ public class TestAsyncXCommandExecutor { pendingCommandsPerType, activeCommands, maxWait, - priorities); + priorities, + awaitTerminationTimeoutSeconds); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java index 3e1df07..a7c7e07 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java @@ -57,6 +57,7 @@ public class TestHAPartitionDependencyManagerService extends ZKXTestCase { } protected void tearDown() throws Exception { + services.destroy(); super.tearDown(); } http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index 3048dda..1f0cf17 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -81,6 +81,7 @@ import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.hadoop.LauncherMain; import org.apache.oozie.dependency.FSURIHandler; import org.apache.oozie.dependency.HCatURIHandler; +import org.apache.oozie.service.CallableQueueService; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HCatAccessorService; import org.apache.oozie.service.HadoopAccessorException; @@ -409,6 +410,8 @@ public abstract class XTestCase extends TestCase { oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,","")); // Make sure to create the Oozie DB during unit tests oozieSiteConf.set(JPAService.CONF_CREATE_DB_SCHEMA, "true"); + // Make sure thread pools shut down in a timely manner + oozieSiteConf.set(CallableQueueService.CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS, "1"); File target = new File(testCaseConfDir, "oozie-site.xml"); oozieSiteConf.writeXml(new FileOutputStream(target)); http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java index abc7c9f..bfa2d85 100644 --- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java @@ -97,7 +97,9 @@ public abstract class ZKXTestCase extends XDataTestCase { @Override protected void tearDown() throws Exception { super.tearDown(); - Services.get().destroy(); + if (Services.get() != null) { + Services.get().destroy(); + } sDiscovery.close(); sDiscovery = null; client.close(); http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index d4d0b1e..8a72d5e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3160 amend PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (andras.piros) OOZIE-3354 [core] [SSH action] SSH action gets hung (andras.piros) OOZIE-3343 amend [build] [tests] Add the first five test errors per module to the report (kmarton via andras.piros) OOZIE-3348 [Hive action] Remove dependency hive-contrib (kmarton via andras.piros)
