[FLINK-4892] Parameterize HeapInternalTimerServiceTest

This now tests multiple interesting key-group cases.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b5d332
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b5d332
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b5d332

Branch: refs/heads/master
Commit: e3b5d33237430acb65d1a93531448825a76c7ce5
Parents: b673760
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Oct 24 14:39:22 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../operators/HeapInternalTimerServiceTest.java | 243 +++++++++++++------
 1 file changed, 169 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3b5d332/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 09499c2..bba6517 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -28,13 +28,19 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -45,17 +51,21 @@ import static org.mockito.Mockito.*;
 /**
  * Tests for {@link HeapInternalTimerService}.
  */
+@RunWith(Parameterized.class)
 public class HeapInternalTimerServiceTest {
 
-       private static final int startKeyGroupIdx = 0;
-       private static final int endKeyGroupIdx = 10;
-       private static final KeyGroupsList testKeyGroupList =
-               new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx);
+       private final int maxParallelism;
+       private final KeyGroupRange testKeyGroupRange;
 
        private static InternalTimer<Integer, String> anyInternalTimer() {
                return any();
        }
 
+       public HeapInternalTimerServiceTest(int startKeyGroup, int endKeyGroup, 
int maxParallelism) {
+               this.testKeyGroupRange = new KeyGroupRange(startKeyGroup, 
endKeyGroup);
+               this.maxParallelism = maxParallelism;
+       }
+
        @Test
        public void testKeyGroupStartIndexSetting() {
 
@@ -151,9 +161,10 @@ public class HeapInternalTimerServiceTest {
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
 
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
-               keyContext.setCurrentKey(0);
+               int key = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               keyContext.setCurrentKey(key);
 
                timerService.registerProcessingTimeTimer("ciao", 10);
                timerService.registerProcessingTimeTimer("ciao", 20);
@@ -212,9 +223,11 @@ public class HeapInternalTimerServiceTest {
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
 
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
-               keyContext.setCurrentKey(0);
+               int key = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+
+               keyContext.setCurrentKey(key);
 
                timerService.registerProcessingTimeTimer("ciao", 20);
 
@@ -243,9 +256,11 @@ public class HeapInternalTimerServiceTest {
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
 
                final HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
+
+               int key = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
 
-               keyContext.setCurrentKey(0);
+               keyContext.setCurrentKey(key);
 
                timerService.registerProcessingTimeTimer("ciao", 10);
 
@@ -293,7 +308,7 @@ public class HeapInternalTimerServiceTest {
                TestKeyContext keyContext = new TestKeyContext();
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
                processingTimeService.setCurrentTime(17L);
                assertEquals(17, timerService.currentProcessingTime());
@@ -311,7 +326,7 @@ public class HeapInternalTimerServiceTest {
                TestKeyContext keyContext = new TestKeyContext();
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
                timerService.advanceWatermark(17);
                assertEquals(17, timerService.currentWatermark());
@@ -331,14 +346,21 @@ public class HeapInternalTimerServiceTest {
                TestKeyContext keyContext = new TestKeyContext();
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
-               keyContext.setCurrentKey(0);
+               // get two different keys
+               int key1 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               while (key2 == key1) {
+                       key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               }
+
+               keyContext.setCurrentKey(key1);
 
                timerService.registerEventTimeTimer("ciao", 10);
                timerService.registerEventTimeTimer("hello", 10);
 
-               keyContext.setCurrentKey(1);
+               keyContext.setCurrentKey(key2);
 
                timerService.registerEventTimeTimer("ciao", 10);
                timerService.registerEventTimeTimer("hello", 10);
@@ -350,10 +372,10 @@ public class HeapInternalTimerServiceTest {
                timerService.advanceWatermark(10);
 
                verify(mockTriggerable, 
times(4)).onEventTime(anyInternalTimer());
-               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
-               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "hello")));
-               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
-               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key1, "ciao")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key1, "hello")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key2, "ciao")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key2, "hello")));
 
                assertEquals(0, timerService.numEventTimeTimers());
        }
@@ -369,14 +391,21 @@ public class HeapInternalTimerServiceTest {
                TestKeyContext keyContext = new TestKeyContext();
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
-               keyContext.setCurrentKey(0);
+               // get two different keys
+               int key1 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               while (key2 == key1) {
+                       key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               }
+
+               keyContext.setCurrentKey(key1);
 
                timerService.registerProcessingTimeTimer("ciao", 10);
                timerService.registerProcessingTimeTimer("hello", 10);
 
-               keyContext.setCurrentKey(1);
+               keyContext.setCurrentKey(key2);
 
                timerService.registerProcessingTimeTimer("ciao", 10);
                timerService.registerProcessingTimeTimer("hello", 10);
@@ -388,10 +417,10 @@ public class HeapInternalTimerServiceTest {
                processingTimeService.setCurrentTime(10);
 
                verify(mockTriggerable, 
times(4)).onProcessingTime(anyInternalTimer());
-               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
-               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "hello")));
-               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
-               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key1, "ciao")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key1, "hello")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key2, "ciao")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key2, "hello")));
 
                assertEquals(0, timerService.numProcessingTimeTimers());
        }
@@ -409,14 +438,21 @@ public class HeapInternalTimerServiceTest {
                TestKeyContext keyContext = new TestKeyContext();
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
+
+               // get two different keys
+               int key1 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               while (key2 == key1) {
+                       key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               }
 
-               keyContext.setCurrentKey(0);
+               keyContext.setCurrentKey(key1);
 
                timerService.registerEventTimeTimer("ciao", 10);
                timerService.registerEventTimeTimer("hello", 10);
 
-               keyContext.setCurrentKey(1);
+               keyContext.setCurrentKey(key2);
 
                timerService.registerEventTimeTimer("ciao", 10);
                timerService.registerEventTimeTimer("hello", 10);
@@ -425,10 +461,10 @@ public class HeapInternalTimerServiceTest {
                assertEquals(2, timerService.numEventTimeTimers("hello"));
                assertEquals(2, timerService.numEventTimeTimers("ciao"));
 
-               keyContext.setCurrentKey(0);
+               keyContext.setCurrentKey(key1);
                timerService.deleteEventTimeTimer("hello", 10);
 
-               keyContext.setCurrentKey(1);
+               keyContext.setCurrentKey(key2);
                timerService.deleteEventTimeTimer("ciao", 10);
 
                assertEquals(2, timerService.numEventTimeTimers());
@@ -438,10 +474,10 @@ public class HeapInternalTimerServiceTest {
                timerService.advanceWatermark(10);
 
                verify(mockTriggerable, 
times(2)).onEventTime(anyInternalTimer());
-               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
-               verify(mockTriggerable, times(0)).onEventTime(eq(new 
InternalTimer<>(10, 0, "hello")));
-               verify(mockTriggerable, times(0)).onEventTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
-               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key1, "ciao")));
+               verify(mockTriggerable, times(0)).onEventTime(eq(new 
InternalTimer<>(10, key1, "hello")));
+               verify(mockTriggerable, times(0)).onEventTime(eq(new 
InternalTimer<>(10, key2, "ciao")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key2, "hello")));
 
                assertEquals(0, timerService.numEventTimeTimers());
        }
@@ -459,14 +495,21 @@ public class HeapInternalTimerServiceTest {
                TestKeyContext keyContext = new TestKeyContext();
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
-               keyContext.setCurrentKey(0);
+               // get two different keys
+               int key1 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               while (key2 == key1) {
+                       key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               }
+
+               keyContext.setCurrentKey(key1);
 
                timerService.registerProcessingTimeTimer("ciao", 10);
                timerService.registerProcessingTimeTimer("hello", 10);
 
-               keyContext.setCurrentKey(1);
+               keyContext.setCurrentKey(key2);
 
                timerService.registerProcessingTimeTimer("ciao", 10);
                timerService.registerProcessingTimeTimer("hello", 10);
@@ -475,10 +518,10 @@ public class HeapInternalTimerServiceTest {
                assertEquals(2, timerService.numProcessingTimeTimers("hello"));
                assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
 
-               keyContext.setCurrentKey(0);
+               keyContext.setCurrentKey(key1);
                timerService.deleteProcessingTimeTimer("hello", 10);
 
-               keyContext.setCurrentKey(1);
+               keyContext.setCurrentKey(key2);
                timerService.deleteProcessingTimeTimer("ciao", 10);
 
                assertEquals(2, timerService.numProcessingTimeTimers());
@@ -488,10 +531,10 @@ public class HeapInternalTimerServiceTest {
                processingTimeService.setCurrentTime(10);
 
                verify(mockTriggerable, 
times(2)).onProcessingTime(anyInternalTimer());
-               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
-               verify(mockTriggerable, times(0)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "hello")));
-               verify(mockTriggerable, times(0)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
-               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key1, "ciao")));
+               verify(mockTriggerable, times(0)).onProcessingTime(eq(new 
InternalTimer<>(10, key1, "hello")));
+               verify(mockTriggerable, times(0)).onProcessingTime(eq(new 
InternalTimer<>(10, key2, "ciao")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key2, "hello")));
 
                assertEquals(0, timerService.numEventTimeTimers());
        }
@@ -504,14 +547,21 @@ public class HeapInternalTimerServiceTest {
                TestKeyContext keyContext = new TestKeyContext();
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                HeapInternalTimerService<Integer, String> timerService =
-                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
 
-               keyContext.setCurrentKey(0);
+               // get two different keys
+               int key1 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               while (key2 == key1) {
+                       key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               }
+
+               keyContext.setCurrentKey(key1);
 
                timerService.registerProcessingTimeTimer("ciao", 10);
                timerService.registerEventTimeTimer("hello", 10);
 
-               keyContext.setCurrentKey(1);
+               keyContext.setCurrentKey(key2);
 
                timerService.registerEventTimeTimer("ciao", 10);
                timerService.registerProcessingTimeTimer("hello", 10);
@@ -523,11 +573,13 @@ public class HeapInternalTimerServiceTest {
                assertEquals(1, timerService.numEventTimeTimers("hello"));
                assertEquals(1, timerService.numEventTimeTimers("ciao"));
 
-               ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-               for (int keyGroupIdx = startKeyGroupIdx; keyGroupIdx < 
endKeyGroupIdx; keyGroupIdx++) {
-                       timerService.snapshotTimersForKeyGroup(new 
DataOutputViewStreamWrapper(outStream), keyGroupIdx);
+               Map<Integer, byte[]> snapshot = new HashMap<>();
+               for (Integer keyGroupIndex : testKeyGroupRange) {
+                       ByteArrayOutputStream outStream = new 
ByteArrayOutputStream();
+                       timerService.snapshotTimersForKeyGroup(new 
DataOutputViewStreamWrapper(outStream), keyGroupIndex);
+                       outStream.close();
+                       snapshot.put(keyGroupIndex, outStream.toByteArray());
                }
-               outStream.close();
 
                @SuppressWarnings("unchecked")
                Triggerable<Integer, String> mockTriggerable2 = 
mock(Triggerable.class);
@@ -536,20 +588,22 @@ public class HeapInternalTimerServiceTest {
                processingTimeService = new TestProcessingTimeService();
 
                timerService = restoreTimerService(
-                               new 
ByteArrayInputStream(outStream.toByteArray()),
+                               snapshot,
                                mockTriggerable2,
                                keyContext,
-                               processingTimeService);
+                               processingTimeService,
+                               testKeyGroupRange,
+                               maxParallelism);
 
                processingTimeService.setCurrentTime(10);
                timerService.advanceWatermark(10);
 
                verify(mockTriggerable2, 
times(2)).onProcessingTime(anyInternalTimer());
-               verify(mockTriggerable2, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
-               verify(mockTriggerable2, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+               verify(mockTriggerable2, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key1, "ciao")));
+               verify(mockTriggerable2, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key2, "hello")));
                verify(mockTriggerable2, 
times(2)).onEventTime(anyInternalTimer());
-               verify(mockTriggerable2, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "hello")));
-               verify(mockTriggerable2, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
+               verify(mockTriggerable2, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key1, "hello")));
+               verify(mockTriggerable2, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key2, "ciao")));
 
                assertEquals(0, timerService.numEventTimeTimers());
        }
@@ -570,45 +624,86 @@ public class HeapInternalTimerServiceTest {
                }
        }
 
+       private static int getKeyInKeyGroup(int keyGroup, int maxParallelism) {
+               Random rand = new Random(System.currentTimeMillis());
+               int result = rand.nextInt();
+               while (KeyGroupRangeAssignment.assignToKeyGroup(result, 
maxParallelism) != keyGroup) {
+                       result = rand.nextInt();
+               }
+               return result;
+       }
+
+       private static int getKeyInKeyGroupRange(KeyGroupRange range, int 
maxParallelism) {
+               Random rand = new Random(System.currentTimeMillis());
+               int result = rand.nextInt();
+               while 
(!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, 
maxParallelism))) {
+                       result = rand.nextInt();
+               }
+               return result;
+       }
+
        private static HeapInternalTimerService<Integer, String> 
createTimerService(
                        Triggerable<Integer, String> triggerable,
                        KeyContext keyContext,
-                       ProcessingTimeService processingTimeService) {
+                       ProcessingTimeService processingTimeService,
+                       KeyGroupsList keyGroupList,
+                       int maxParallelism) {
                HeapInternalTimerService<Integer, String> service =
                        new HeapInternalTimerService<>(
-                               testKeyGroupList.getNumberOfKeyGroups(),
-                               testKeyGroupList,
-                               keyContext,
-                               processingTimeService);
+                                       maxParallelism,
+                                       keyGroupList,
+                                       keyContext,
+                                       processingTimeService);
 
                service.startTimerService(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, triggerable);
                return service;
        }
 
        private static HeapInternalTimerService<Integer, String> 
restoreTimerService(
-                       InputStream stateStream,
+                       Map<Integer, byte[]> state,
                        Triggerable<Integer, String> triggerable,
                        KeyContext keyContext,
-                       ProcessingTimeService processingTimeService) throws 
Exception {
+                       ProcessingTimeService processingTimeService,
+                       KeyGroupsList keyGroupsList,
+                       int maxParallelism) throws Exception {
 
                // create an empty service
                HeapInternalTimerService<Integer, String> service =
                        new HeapInternalTimerService<>(
-                               testKeyGroupList.getNumberOfKeyGroups(),
-                               testKeyGroupList,
-                               keyContext,
-                               processingTimeService);
+                                       maxParallelism,
+                                       keyGroupsList,
+                                       keyContext,
+                                       processingTimeService);
 
                // restore the timers
-               for (int keyGroupIdx = startKeyGroupIdx; keyGroupIdx < 
endKeyGroupIdx; keyGroupIdx++) {
-                       service.restoreTimersForKeyGroup(
-                               new DataInputViewStreamWrapper(stateStream),
-                               keyGroupIdx,
-                               
HeapInternalTimerServiceTest.class.getClassLoader());
+               for (Integer keyGroupIndex : keyGroupsList) {
+                       if (state.containsKey(keyGroupIndex)) {
+                               service.restoreTimersForKeyGroup(
+                                               new 
DataInputViewStreamWrapper(new ByteArrayInputStream(state.get(keyGroupIndex))),
+                                               keyGroupIndex,
+                                               
HeapInternalTimerServiceTest.class.getClassLoader());
+                       }
                }
 
                // initialize the service
                service.startTimerService(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, triggerable);
                return service;
        }
+
+       // 
------------------------------------------------------------------------
+       //  Parametrization for testing with different key-group ranges
+       // 
------------------------------------------------------------------------
+
+       @Parameterized.Parameters(name = "start = {0}, end = {1}, max = {2}")
+       @SuppressWarnings("unchecked,rawtypes")
+       public static Collection<Object[]> keyRanges(){
+               return Arrays.asList(new Object[][] {
+                                               {0, Short.MAX_VALUE - 1, 
Short.MAX_VALUE},
+                                               {0, 10, Short.MAX_VALUE},
+                                               {0, 10, 10},
+                                               {10, Short.MAX_VALUE - 1, 
Short.MAX_VALUE},
+                                               {2, 5, 100},
+                                               {2, 5, 6}
+               });
+       }
 }

Reply via email to