Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2726#discussion_r200456703
--- Diff:
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
@@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws
Exception {
}
}
+ @Test
+ public void testPartitionManagerRecreateMultipleTopics() throws
Exception {
+ List<ZkCoordinator> coordinatorList = buildCoordinators(1);
+
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9092),
+ TestUtils.buildPartitionInfo("topic2", 1, 9092)));
+
+ List<PartitionManager> partitionManagersBeforeRefresh =
coordinatorList.get(0).getMyManagedPartitions();
+ assertEquals(2, partitionManagersBeforeRefresh.size());
+ for (PartitionManager partitionManager :
partitionManagersBeforeRefresh) {
+ partitionManager._emittedToOffset = 100L;
+ partitionManager._committedTo = 100L;
+ }
+
+ waitForRefresh();
+
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9093),
+ TestUtils.buildPartitionInfo("topic3", 1, 9093)));
--- End diff --
Nit: Seems weird to exclude topic 2, that can't happen in a real setup.
Shouldn't you be checking that topic1 and topic2 can't overwrite each others'
offsets instead (e.g by making them different a few lines up)?
---