Github user choojoyq commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200655178
  
    --- Diff: 
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws 
Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreateMultipleTopics() throws 
Exception {
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(1);
    +
    +        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9092),
    +            TestUtils.buildPartitionInfo("topic2", 1, 9092)));
    +
    +        List<PartitionManager> partitionManagersBeforeRefresh = 
coordinatorList.get(0).getMyManagedPartitions();
    +        assertEquals(2, partitionManagersBeforeRefresh.size());
    +        for (PartitionManager partitionManager : 
partitionManagersBeforeRefresh) {
    +            partitionManager._emittedToOffset = 100L;
    +            partitionManager._committedTo = 100L;
    +        }
    +
    +        waitForRefresh();
    +
    +        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9093),
    +            TestUtils.buildPartitionInfo("topic3", 1, 9093)));
    --- End diff --
    
    The idea of the test that before rebalancing some ``Task`` was responsible 
for 2 partitions from topics 1 and 2 from broker 9092, but after rebalancing of 
Kafka brokers it became responsible for the same partition for topic 1 and 
another partition from topic 3 which are hosted now on broker 9093. So ``Task`` 
should preserve partition information about topic1 as it was managed by this 
task before and it knows how many messages was already emitted, committed, etc. 
While partition from topic3 is a brand new and so it should be created from 
scratch without reusing of any information. Before the fix information for 
topic3 would be reused from topic1 as lookup of previously managed partitions 
was performed only on partition number ignoring topic.


---

Reply via email to