shusj created STORM-3214:
----------------------------

             Summary: 使用 kafka.topic.wildcard.match 
=true的时候,ZkCoordinator.refresh中deletedManagers会出现逻辑错误
                 Key: STORM-3214
                 URL: https://issues.apache.org/jira/browse/STORM-3214
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka
    Affects Versions: 1.2.2, 1.1.3
            Reporter: shusj


使用 kafka.topic.wildcard.match 
=true的时候,如果topic数目大于1,ZkCoordinator.refresh中deletedManagers会出现逻辑错误

只需要将ZkCoordinator@L91:    Map<Integer, PartitionManager> deletedManagers = new 
HashMap<>();

将key修改为topic+partition

 

在org.apache.storm.kafka.ZkCoordinatorTest中添加了如下测试
{code:java}
//代码占位符
public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, 
int brokerPort, String topic) {
GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(topic);
for (int i = 0; i < numPartitions; i++) {
globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " 
:" + brokerPort));
}
return globalPartitionInformation;
}

@Test
public void testTwoTopicPartitionsChange() throws Exception {
int numPartitions = 2;
int partitionsPerTask = 1;
final Set<Partition> unregisterList = new HashSet<>();
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] arguments = invocation.getArguments();
Partition partition = new Partition((Broker) arguments[0], (String) 
arguments[1], (int) arguments[2], false);
unregisterList.add(partition);
return null;
}
}).when(dynamicPartitionConnections).unregister(any(Broker.class), 
any(String.class), anyInt());

List<ZkCoordinator> coordinatorList = buildCoordinators(partitionsPerTask);
ArrayList<GlobalPartitionInformation> prePartitionInformations = 
Lists.newArrayList(buildPartitionInfo(numPartitions, 9092, "TOPIC1"), 
buildPartitionInfo(numPartitions, 9092, "TOPIC2"));
when(reader.getBrokerInfo()).thenReturn(prePartitionInformations);
List<List<PartitionManager>> partitionManagersBeforeRefresh = 
getPartitionManagers(coordinatorList);
waitForRefresh();
when(reader.getBrokerInfo()).thenReturn(Lists.newArrayList(buildPartitionInfo(numPartitions,
 9093, "TOPIC1"), buildPartitionInfo(numPartitions, 9093, "TOPIC2")));
List<List<PartitionManager>> partitionManagersAfterRefresh = 
getPartitionManagers(coordinatorList);
List<Partition> allPrePartition = 
KafkaUtils.calculatePartitionsForTask(prePartitionInformations, 1, 0, 0);
assertEquals(unregisterList.size(), allPrePartition.size());
for (Partition partition : allPrePartition) {
assertTrue(unregisterList.contains(partition));
}
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to