shusj created STORM-3214:
----------------------------
Summary: 使用 kafka.topic.wildcard.match
=true的时候,ZkCoordinator.refresh中deletedManagers会出现逻辑错误
Key: STORM-3214
URL: https://issues.apache.org/jira/browse/STORM-3214
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka
Affects Versions: 1.2.2, 1.1.3
Reporter: shusj
使用 kafka.topic.wildcard.match
=true的时候,如果topic数目大于1,ZkCoordinator.refresh中deletedManagers会出现逻辑错误
只需要将ZkCoordinator@L91: Map<Integer, PartitionManager> deletedManagers = new
HashMap<>();
将key修改为topic+partition
在org.apache.storm.kafka.ZkCoordinatorTest中添加了如下测试
{code:java}
//代码占位符
public static GlobalPartitionInformation buildPartitionInfo(int numPartitions,
int brokerPort, String topic) {
GlobalPartitionInformation globalPartitionInformation = new
GlobalPartitionInformation(topic);
for (int i = 0; i < numPartitions; i++) {
globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + "
:" + brokerPort));
}
return globalPartitionInformation;
}
@Test
public void testTwoTopicPartitionsChange() throws Exception {
int numPartitions = 2;
int partitionsPerTask = 1;
final Set<Partition> unregisterList = new HashSet<>();
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] arguments = invocation.getArguments();
Partition partition = new Partition((Broker) arguments[0], (String)
arguments[1], (int) arguments[2], false);
unregisterList.add(partition);
return null;
}
}).when(dynamicPartitionConnections).unregister(any(Broker.class),
any(String.class), anyInt());
List<ZkCoordinator> coordinatorList = buildCoordinators(partitionsPerTask);
ArrayList<GlobalPartitionInformation> prePartitionInformations =
Lists.newArrayList(buildPartitionInfo(numPartitions, 9092, "TOPIC1"),
buildPartitionInfo(numPartitions, 9092, "TOPIC2"));
when(reader.getBrokerInfo()).thenReturn(prePartitionInformations);
List<List<PartitionManager>> partitionManagersBeforeRefresh =
getPartitionManagers(coordinatorList);
waitForRefresh();
when(reader.getBrokerInfo()).thenReturn(Lists.newArrayList(buildPartitionInfo(numPartitions,
9093, "TOPIC1"), buildPartitionInfo(numPartitions, 9093, "TOPIC2")));
List<List<PartitionManager>> partitionManagersAfterRefresh =
getPartitionManagers(coordinatorList);
List<Partition> allPrePartition =
KafkaUtils.calculatePartitionsForTask(prePartitionInformations, 1, 0, 0);
assertEquals(unregisterList.size(), allPrePartition.size());
for (Partition partition : allPrePartition) {
assertTrue(unregisterList.contains(partition));
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)