[FLINK-4892] Parameterize HeapInternalTimerServiceTest This now tests multiple interesting key-group cases.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b5d332 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b5d332 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b5d332 Branch: refs/heads/master Commit: e3b5d33237430acb65d1a93531448825a76c7ce5 Parents: b673760 Author: Aljoscha Krettek <[email protected]> Authored: Mon Oct 24 14:39:22 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 26 23:26:28 2016 +0200 ---------------------------------------------------------------------- .../operators/HeapInternalTimerServiceTest.java | 243 +++++++++++++------ 1 file changed, 169 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e3b5d332/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java index 09499c2..bba6517 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java @@ -28,13 +28,19 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.InputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.Random; import java.util.Set; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -45,17 +51,21 @@ import static org.mockito.Mockito.*; /** * Tests for {@link HeapInternalTimerService}. */ +@RunWith(Parameterized.class) public class HeapInternalTimerServiceTest { - private static final int startKeyGroupIdx = 0; - private static final int endKeyGroupIdx = 10; - private static final KeyGroupsList testKeyGroupList = - new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx); + private final int maxParallelism; + private final KeyGroupRange testKeyGroupRange; private static InternalTimer<Integer, String> anyInternalTimer() { return any(); } + public HeapInternalTimerServiceTest(int startKeyGroup, int endKeyGroup, int maxParallelism) { + this.testKeyGroupRange = new KeyGroupRange(startKeyGroup, endKeyGroup); + this.maxParallelism = maxParallelism; + } + @Test public void testKeyGroupStartIndexSetting() { @@ -151,9 +161,10 @@ public class HeapInternalTimerServiceTest { TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); - keyContext.setCurrentKey(0); + int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + keyContext.setCurrentKey(key); timerService.registerProcessingTimeTimer("ciao", 10); timerService.registerProcessingTimeTimer("ciao", 20); @@ -212,9 +223,11 @@ public class HeapInternalTimerServiceTest { TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); - keyContext.setCurrentKey(0); + int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + + keyContext.setCurrentKey(key); timerService.registerProcessingTimeTimer("ciao", 20); @@ -243,9 +256,11 @@ public class HeapInternalTimerServiceTest { TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); final HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); + + int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); - keyContext.setCurrentKey(0); + keyContext.setCurrentKey(key); timerService.registerProcessingTimeTimer("ciao", 10); @@ -293,7 +308,7 @@ public class HeapInternalTimerServiceTest { TestKeyContext keyContext = new TestKeyContext(); TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); processingTimeService.setCurrentTime(17L); assertEquals(17, timerService.currentProcessingTime()); @@ -311,7 +326,7 @@ public class HeapInternalTimerServiceTest { TestKeyContext keyContext = new TestKeyContext(); TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); timerService.advanceWatermark(17); assertEquals(17, timerService.currentWatermark()); @@ -331,14 +346,21 @@ public class HeapInternalTimerServiceTest { TestKeyContext keyContext = new TestKeyContext(); TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); - keyContext.setCurrentKey(0); + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + } + + keyContext.setCurrentKey(key1); timerService.registerEventTimeTimer("ciao", 10); timerService.registerEventTimeTimer("hello", 10); - keyContext.setCurrentKey(1); + keyContext.setCurrentKey(key2); timerService.registerEventTimeTimer("ciao", 10); timerService.registerEventTimeTimer("hello", 10); @@ -350,10 +372,10 @@ public class HeapInternalTimerServiceTest { timerService.advanceWatermark(10); verify(mockTriggerable, times(4)).onEventTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -369,14 +391,21 @@ public class HeapInternalTimerServiceTest { TestKeyContext keyContext = new TestKeyContext(); TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); - keyContext.setCurrentKey(0); + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + } + + keyContext.setCurrentKey(key1); timerService.registerProcessingTimeTimer("ciao", 10); timerService.registerProcessingTimeTimer("hello", 10); - keyContext.setCurrentKey(1); + keyContext.setCurrentKey(key2); timerService.registerProcessingTimeTimer("ciao", 10); timerService.registerProcessingTimeTimer("hello", 10); @@ -388,10 +417,10 @@ public class HeapInternalTimerServiceTest { processingTimeService.setCurrentTime(10); verify(mockTriggerable, times(4)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numProcessingTimeTimers()); } @@ -409,14 +438,21 @@ public class HeapInternalTimerServiceTest { TestKeyContext keyContext = new TestKeyContext(); TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); + + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + } - keyContext.setCurrentKey(0); + keyContext.setCurrentKey(key1); timerService.registerEventTimeTimer("ciao", 10); timerService.registerEventTimeTimer("hello", 10); - keyContext.setCurrentKey(1); + keyContext.setCurrentKey(key2); timerService.registerEventTimeTimer("ciao", 10); timerService.registerEventTimeTimer("hello", 10); @@ -425,10 +461,10 @@ public class HeapInternalTimerServiceTest { assertEquals(2, timerService.numEventTimeTimers("hello")); assertEquals(2, timerService.numEventTimeTimers("ciao")); - keyContext.setCurrentKey(0); + keyContext.setCurrentKey(key1); timerService.deleteEventTimeTimer("hello", 10); - keyContext.setCurrentKey(1); + keyContext.setCurrentKey(key2); timerService.deleteEventTimeTimer("ciao", 10); assertEquals(2, timerService.numEventTimeTimers()); @@ -438,10 +474,10 @@ public class HeapInternalTimerServiceTest { timerService.advanceWatermark(10); verify(mockTriggerable, times(2)).onEventTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao"))); - verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 0, "hello"))); - verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -459,14 +495,21 @@ public class HeapInternalTimerServiceTest { TestKeyContext keyContext = new TestKeyContext(); TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); - keyContext.setCurrentKey(0); + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + } + + keyContext.setCurrentKey(key1); timerService.registerProcessingTimeTimer("ciao", 10); timerService.registerProcessingTimeTimer("hello", 10); - keyContext.setCurrentKey(1); + keyContext.setCurrentKey(key2); timerService.registerProcessingTimeTimer("ciao", 10); timerService.registerProcessingTimeTimer("hello", 10); @@ -475,10 +518,10 @@ public class HeapInternalTimerServiceTest { assertEquals(2, timerService.numProcessingTimeTimers("hello")); assertEquals(2, timerService.numProcessingTimeTimers("ciao")); - keyContext.setCurrentKey(0); + keyContext.setCurrentKey(key1); timerService.deleteProcessingTimeTimer("hello", 10); - keyContext.setCurrentKey(1); + keyContext.setCurrentKey(key2); timerService.deleteProcessingTimeTimer("ciao", 10); assertEquals(2, timerService.numProcessingTimeTimers()); @@ -488,10 +531,10 @@ public class HeapInternalTimerServiceTest { processingTimeService.setCurrentTime(10); verify(mockTriggerable, times(2)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao"))); - verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello"))); - verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -504,14 +547,21 @@ public class HeapInternalTimerServiceTest { TestKeyContext keyContext = new TestKeyContext(); TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); HeapInternalTimerService<Integer, String> timerService = - createTimerService(mockTriggerable, keyContext, processingTimeService); + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); - keyContext.setCurrentKey(0); + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + } + + keyContext.setCurrentKey(key1); timerService.registerProcessingTimeTimer("ciao", 10); timerService.registerEventTimeTimer("hello", 10); - keyContext.setCurrentKey(1); + keyContext.setCurrentKey(key2); timerService.registerEventTimeTimer("ciao", 10); timerService.registerProcessingTimeTimer("hello", 10); @@ -523,11 +573,13 @@ public class HeapInternalTimerServiceTest { assertEquals(1, timerService.numEventTimeTimers("hello")); assertEquals(1, timerService.numEventTimeTimers("ciao")); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - for (int keyGroupIdx = startKeyGroupIdx; keyGroupIdx < endKeyGroupIdx; keyGroupIdx++) { - timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIdx); + Map<Integer, byte[]> snapshot = new HashMap<>(); + for (Integer keyGroupIndex : testKeyGroupRange) { + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIndex); + outStream.close(); + snapshot.put(keyGroupIndex, outStream.toByteArray()); } - outStream.close(); @SuppressWarnings("unchecked") Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class); @@ -536,20 +588,22 @@ public class HeapInternalTimerServiceTest { processingTimeService = new TestProcessingTimeService(); timerService = restoreTimerService( - new ByteArrayInputStream(outStream.toByteArray()), + snapshot, mockTriggerable2, keyContext, - processingTimeService); + processingTimeService, + testKeyGroupRange, + maxParallelism); processingTimeService.setCurrentTime(10); timerService.advanceWatermark(10); verify(mockTriggerable2, times(2)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao"))); - verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello"))); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); verify(mockTriggerable2, times(2)).onEventTime(anyInternalTimer()); - verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello"))); - verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao"))); + verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -570,45 +624,86 @@ public class HeapInternalTimerServiceTest { } } + private static int getKeyInKeyGroup(int keyGroup, int maxParallelism) { + Random rand = new Random(System.currentTimeMillis()); + int result = rand.nextInt(); + while (KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism) != keyGroup) { + result = rand.nextInt(); + } + return result; + } + + private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) { + Random rand = new Random(System.currentTimeMillis()); + int result = rand.nextInt(); + while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism))) { + result = rand.nextInt(); + } + return result; + } + private static HeapInternalTimerService<Integer, String> createTimerService( Triggerable<Integer, String> triggerable, KeyContext keyContext, - ProcessingTimeService processingTimeService) { + ProcessingTimeService processingTimeService, + KeyGroupsList keyGroupList, + int maxParallelism) { HeapInternalTimerService<Integer, String> service = new HeapInternalTimerService<>( - testKeyGroupList.getNumberOfKeyGroups(), - testKeyGroupList, - keyContext, - processingTimeService); + maxParallelism, + keyGroupList, + keyContext, + processingTimeService); service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable); return service; } private static HeapInternalTimerService<Integer, String> restoreTimerService( - InputStream stateStream, + Map<Integer, byte[]> state, Triggerable<Integer, String> triggerable, KeyContext keyContext, - ProcessingTimeService processingTimeService) throws Exception { + ProcessingTimeService processingTimeService, + KeyGroupsList keyGroupsList, + int maxParallelism) throws Exception { // create an empty service HeapInternalTimerService<Integer, String> service = new HeapInternalTimerService<>( - testKeyGroupList.getNumberOfKeyGroups(), - testKeyGroupList, - keyContext, - processingTimeService); + maxParallelism, + keyGroupsList, + keyContext, + processingTimeService); // restore the timers - for (int keyGroupIdx = startKeyGroupIdx; keyGroupIdx < endKeyGroupIdx; keyGroupIdx++) { - service.restoreTimersForKeyGroup( - new DataInputViewStreamWrapper(stateStream), - keyGroupIdx, - HeapInternalTimerServiceTest.class.getClassLoader()); + for (Integer keyGroupIndex : keyGroupsList) { + if (state.containsKey(keyGroupIndex)) { + service.restoreTimersForKeyGroup( + new DataInputViewStreamWrapper(new ByteArrayInputStream(state.get(keyGroupIndex))), + keyGroupIndex, + HeapInternalTimerServiceTest.class.getClassLoader()); + } } // initialize the service service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable); return service; } + + // ------------------------------------------------------------------------ + // Parametrization for testing with different key-group ranges + // ------------------------------------------------------------------------ + + @Parameterized.Parameters(name = "start = {0}, end = {1}, max = {2}") + @SuppressWarnings("unchecked,rawtypes") + public static Collection<Object[]> keyRanges(){ + return Arrays.asList(new Object[][] { + {0, Short.MAX_VALUE - 1, Short.MAX_VALUE}, + {0, 10, Short.MAX_VALUE}, + {0, 10, 10}, + {10, Short.MAX_VALUE - 1, Short.MAX_VALUE}, + {2, 5, 100}, + {2, 5, 6} + }); + } }
