Repository: oozie Updated Branches: refs/heads/master a9fba2cde -> 68bcd3d38
OOZIE-3352 [tests] TestCallableQueueService#testPriorityExecutionOrder() is flaky (pbacsko) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/68bcd3d3 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/68bcd3d3 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/68bcd3d3 Branch: refs/heads/master Commit: 68bcd3d38f8247f3f1aa06aa0e907d961332bf74 Parents: a9fba2c Author: Andras Piros <[email protected]> Authored: Tue Sep 25 09:48:24 2018 +0200 Committer: Andras Piros <[email protected]> Committed: Tue Sep 25 09:48:24 2018 +0200 ---------------------------------------------------------------------- .../service/TestAsyncXCommandExecutor.java | 64 +++++++++++++++++--- .../oozie/service/TestCallableQueueService.java | 59 ------------------ release-log.txt | 1 + 3 files changed, 55 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/68bcd3d3/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java index f9ec4d6..2dce409 100644 --- a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java +++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java @@ -18,6 +18,7 @@ package org.apache.oozie.service; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.oozie.service.AsyncXCommandExecutor.AccessibleRunnableScheduledFuture; +import org.apache.oozie.service.AsyncXCommandExecutor.PriorityComparator; import org.apache.oozie.service.AsyncXCommandExecutor.ScheduledXCallable; import org.apache.oozie.service.CallableQueueService.CallableWrapper; import org.apache.oozie.util.XCallable; @@ -94,10 +96,11 @@ public class TestAsyncXCommandExecutor { @Before public void setup() { activeCommands = new AtomicInteger(0); - priorityBlockingQueue = new PriorityBlockingQueue<>(); + priorityBlockingQueue = new PriorityBlockingQueue<>(100, new PriorityComparator()); pendingCommandsPerType = new ConcurrentHashMap<>(); delayQueue = new LinkedBlockingQueue<>(); // in reality it's not LBQ, but it's fine here - asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT); + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT, + TEST_PRIORITIES); when(callableWrapper.filterDuplicates()).thenReturn(true); when(callableWrapper.getElement().getKey()).thenReturn("key"); when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE); @@ -155,7 +158,7 @@ public class TestAsyncXCommandExecutor { @Test public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() { - asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT); + asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES); when(callableWrapper.getInitialDelay()).thenReturn(100L); when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L); XCallable<?> wrappedCommand = mock(XCallable.class); @@ -239,7 +242,7 @@ public class TestAsyncXCommandExecutor { @Test public void testSubmissionWhenQueueIsFull() { - asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT); + asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES); callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS); when(callableWrapper.filterDuplicates()).thenReturn(true); when(callableWrapper.getElement().getKey()).thenReturn("key"); @@ -254,7 +257,7 @@ public class TestAsyncXCommandExecutor { @Test public void testSubmissionWhenQueueSizeIsIgnored() { - asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT); + asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES); callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS); when(callableWrapper.filterDuplicates()).thenReturn(true); when(callableWrapper.getElement().getKey()).thenReturn("key"); @@ -366,7 +369,7 @@ public class TestAsyncXCommandExecutor { @Test public void testAntiStarvationWhenDelayIsAboveMaxWait() { - asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500); + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES); when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L); when(callableWrapper.getPriority()).thenReturn(0); pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper)); @@ -391,7 +394,7 @@ public class TestAsyncXCommandExecutor { @Test public void testAntiStarvationWhenPriorityIsHighest() { - asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500); + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES); when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L); when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY); pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper)); @@ -414,6 +417,47 @@ public class TestAsyncXCommandExecutor { verify(scheduledExecutor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS)); } + @Test + public void testPriorityHandling() { + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + CallableWrapper<?> wrapper = (CallableWrapper<?>) invocation.getArguments()[0]; + priorityBlockingQueue.add(wrapper); + return null; + } + }).when(executor).execute(any(Runnable.class)); + + List<CallableWrapper<?>> mockedWrappers = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + CallableWrapper<?> wrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS); + when(wrapper.getPriority()).thenReturn(i); + when(wrapper.getInitialDelay()).thenReturn(0L); + when(wrapper.filterDuplicates()).thenReturn(true); + when(wrapper.getElement().getName()).thenReturn(String.valueOf(i)); + mockedWrappers.add(wrapper); + } + + for (CallableWrapper<?> callable : mockedWrappers) { + asyncExecutor.queue(callable, false); + } + + CallableWrapper<?> firstElement = priorityBlockingQueue.poll(); + + CallableWrapper<?> lastElement = null; + CallableWrapper<?> previous = null; + + do { + previous = lastElement; + lastElement = priorityBlockingQueue.poll(); + } while (lastElement != null); + lastElement = previous; + + assertEquals("Priority - first element", 99, firstElement.getPriority()); + assertEquals("Priority - last element", 0, lastElement.getPriority()); + } + private void testIllegalPriority(int prio) { when(callableWrapper.getPriority()).thenReturn(prio); @@ -446,7 +490,7 @@ public class TestAsyncXCommandExecutor { } private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables, - long maxWait) { + long maxWait, int priorities) { return new AsyncXCommandExecutor(needMaxConcurrencyCheck, callableQueueService, maxActiveCallables, @@ -456,7 +500,7 @@ public class TestAsyncXCommandExecutor { delayQueue, pendingCommandsPerType, activeCommands, - DEFAULT_MAXWAIT, - TEST_PRIORITIES); + maxWait, + priorities); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/68bcd3d3/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java index 5d546ff..aec1765 100644 --- a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java +++ b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java @@ -944,65 +944,6 @@ public class TestCallableQueueService extends XTestCase { assertTrue(uniquesAfter.toString(), uniquesAfter.isEmpty()); } - public void testPriorityExecutionOrder() throws InterruptedException, ServiceException { - Services.get().destroy(); - setSystemProperty(CallableQueueService.CONF_THREADS, "1"); - setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "1000000"); - new Services().init(); - - final int taskCount = 999_999; - Multimap<Integer, Long> executions = Multimaps.synchronizedMultimap(ArrayListMultimap.create()); - List<BookingCallable> callables = new ArrayList<>(taskCount); - - for (int i = 2; i >= 0; i--) { - String type = String.valueOf(i); - for (int j = 0; j < taskCount / 3; j++) { - String key = type + "_" + UUID.randomUUID().toString(); - BookingCallable dc = new BookingCallable(executions, taskCount, key, type, i, 0); - callables.add(dc); - } - } - - CallableQueueService queueservice = Services.get().get(CallableQueueService.class); - - for (int i = 0; i < taskCount; i++) { - queueservice.queue(callables.get(i)); - } - - try { - finished.await(10, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("Error", e); - fail("Exception during test: " + e.getMessage()); - } - // It's necessary because after finished.await() returns, the last XCallable - // could still be running - waitFor(1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return queueservice.queueSize() == 0; - } - }); - - Map<Integer, Long> minTime = new HashMap<>(); - Map<Integer, Long> maxTime = new HashMap<>(); - - for (Map.Entry<Integer, Collection<Long>> entry : executions.asMap().entrySet()) { - int prio = entry.getKey(); - Collection<Long> values = entry.getValue(); - minTime.put(prio, Collections.min(values)); - maxTime.put(prio, Collections.max(values)); - } - - // Expected timeline of execution times: - // --> [min] Prio #2 [max] --> [min] Prio #1 [max] --> [min] Prio #0 [max] - - assertTrue("Failed: maxTime prio #2: " + maxTime.get(2) + " / minTime prio #1: " + minTime.get(1), - maxTime.get(2) <= minTime.get(1)); - assertTrue("Failed: maxTime prio #1: " + maxTime.get(1) + " / minTime prio #0: " + minTime.get(0), - maxTime.get(1) <= minTime.get(0)); - } - public void testMaxConcurrencyReached() throws Exception { Services.get().destroy(); setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "100000"); http://git-wip-us.apache.org/repos/asf/oozie/blob/68bcd3d3/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index bb8702a..a99c399 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3352 [tests] TestCallableQueueService#testPriorityExecutionOrder() is flaky (pbacsko) OOZIE-3351 [tests] Flaky test TestMemoryLocks#testWriteLockSameThreadNoWait() (pbacsko) OOZIE-3229 [client] [ui] Improved SLA filtering options (asalamon74, andras.piros) OOZIE-3346 [examples] [action] Fix Git example. PrepareActionsHandler should support XML namespace prefixes (asalamon74, andras.piros)
