Repository: storm Updated Branches: refs/heads/1.0.x-branch 331764faf -> 23ebe9033
STORM-2296 Kafka spout no dup on leader changes Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fcbee343 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fcbee343 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fcbee343 Branch: refs/heads/1.0.x-branch Commit: fcbee343b4e69cd71e3953c53c0180045d397479 Parents: 331764f Author: Ernestas Vaiciukevicius <[email protected]> Authored: Thu Jan 12 16:54:59 2017 +0200 Committer: Jungtaek Lim <[email protected]> Committed: Fri Mar 10 01:29:47 2017 +0900 ---------------------------------------------------------------------- .../apache/storm/kafka/PartitionManager.java | 125 ++++++++++++------- .../org/apache/storm/kafka/ZkCoordinator.java | 16 ++- .../apache/storm/kafka/ZkCoordinatorTest.java | 42 ++++++- 3 files changed, 131 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fcbee343/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 79e7c3d..bc355ba 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -66,7 +66,29 @@ public class PartitionManager { ZkState _state; Map _stormConf; long numberFailed, numberAcked; - public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { + + public PartitionManager( + DynamicPartitionConnections connections, + String topologyInstanceId, + ZkState state, + Map stormConf, + SpoutConfig spoutConfig, + Partition id) + { + this(connections, topologyInstanceId, state, stormConf, spoutConfig, id, null); + } + + /** + * @param previousManager previous partition manager if manager for partition is being recreated + */ + public PartitionManager( + DynamicPartitionConnections connections, + String topologyInstanceId, + ZkState state, + Map stormConf, + SpoutConfig spoutConfig, + Partition id, + PartitionManager previousManager) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; @@ -76,53 +98,64 @@ public class PartitionManager { _stormConf = stormConf; numberAcked = numberFailed = 0; - try { - _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); - _failedMsgRetryManager.prepare(spoutConfig, _stormConf); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>", - FailedMsgRetryManager.class, - spoutConfig.failedMsgRetryManagerClass), e); - } + if (previousManager != null) { + _failedMsgRetryManager = previousManager._failedMsgRetryManager; + _committedTo = previousManager._committedTo; + _emittedToOffset = previousManager._emittedToOffset; + _waitingToEmit = previousManager._waitingToEmit; + _pending = previousManager._pending; + LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}", + _waitingToEmit.size(), + _pending.size()); + } else { + try { + _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); + _failedMsgRetryManager.prepare(spoutConfig, _stormConf); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>", + FailedMsgRetryManager.class, + spoutConfig.failedMsgRetryManagerClass), e); + } - String jsonTopologyId = null; - Long jsonOffset = null; - String path = committedPath(); - try { - Map<Object, Object> json = _state.readJSON(path); - LOG.info("Read partition information from: " + path + " --> " + json ); - if (json != null) { - jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id"); - jsonOffset = (Long) json.get("offset"); + String jsonTopologyId = null; + Long jsonOffset = null; + String path = committedPath(); + try { + Map<Object, Object> json = _state.readJSON(path); + LOG.info("Read partition information from: " + path + " --> " + json); + if (json != null) { + jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id"); + jsonOffset = (Long) json.get("offset"); + } + } catch (Throwable e) { + LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } - } catch (Throwable e) { - LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); - } - String topic = _partition.topic; - Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); + String topic = _partition.topic; + Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); - if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? - _committedTo = currentOffset; - LOG.info("No partition information found, using configuration to determine offset"); - } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { - _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime); - LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); - } else { - _committedTo = jsonOffset; - LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); - } + if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? + _committedTo = currentOffset; + LOG.info("No partition information found, using configuration to determine offset"); + } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { + _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime); + LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); + } else { + _committedTo = jsonOffset; + LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId); + } - if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { - LOG.info("Last commit offset from zookeeper: " + _committedTo); - Long lastCommittedOffset = _committedTo; - _committedTo = currentOffset; - LOG.info("Commit offset " + lastCommittedOffset + " is more than " + - spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime); - } + if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { + LOG.info("Last commit offset from zookeeper: " + _committedTo); + Long lastCommittedOffset = _committedTo; + _committedTo = currentOffset; + LOG.info("Commit offset " + lastCommittedOffset + " is more than " + + spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime); + } - LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo); - _emittedToOffset = _committedTo; + LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo); + _emittedToOffset = _committedTo; + } _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); @@ -160,7 +193,7 @@ public class PartitionManager { } else { tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic); } - + if ((tups != null) && tups.iterator().hasNext()) { if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { for (List<Object> tup : tups) { @@ -201,7 +234,7 @@ public class PartitionManager { } catch (TopicOffsetOutOfRangeException e) { offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); // fetch failed, so don't update the fetch metrics - + //fix bug [STORM-643] : remove outdated failed offsets if (!processingNewTuples) { // For the case of EarliestTime it would be better to discard @@ -214,7 +247,7 @@ public class PartitionManager { if (null != omitted) { _lostMessageCount.incrBy(omitted.size()); } - + LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted); } @@ -223,7 +256,7 @@ public class PartitionManager { _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset); } - + return; } long end = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/storm/blob/fcbee343/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index 98bf8a0..14be584 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -88,14 +88,24 @@ public class ZkCoordinator implements PartitionCoordinator { LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); + Map<Integer, PartitionManager> deletedManagers = new HashMap<>(); for (Partition id : deletedPartitions) { - PartitionManager man = _managers.remove(id); - man.close(); + deletedManagers.put(id.partition, _managers.remove(id)); + } + for (PartitionManager manager : deletedManagers.values()) { + if (manager != null) manager.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { - PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); + PartitionManager man = new PartitionManager( + _connections, + _topologyInstanceId, + _state, + _stormConf, + _spoutConfig, + id, + deletedManagers.get(id.partition)); _managers.put(id, man); } http://git-wip-us.apache.org/repos/asf/storm/blob/fcbee343/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index 65bf0b4..adef740 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -28,8 +28,7 @@ import org.mockito.MockitoAnnotations; import java.util.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.when; @@ -106,7 +105,7 @@ public class ZkCoordinatorTest { waitForRefresh(); when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); - assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); + assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size()); Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator(); for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) { List<PartitionManager> partitionManagersAfter = iterator.next(); @@ -114,6 +113,43 @@ public class ZkCoordinatorTest { } } + @Test + public void testPartitionManagerRecreate() throws Exception { + final int totalTasks = 2; + int partitionsPerTask = 2; + List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092))); + List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList); + waitForRefresh(); + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); + List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); + assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size()); + + HashMap<Integer, PartitionManager> managersAfterRefresh = new HashMap<Integer, PartitionManager>(); + for (List<PartitionManager> partitionManagersAfter : partitionManagersAfterRefresh) { + for (PartitionManager manager : partitionManagersAfter) { + assertFalse("Multiple PartitionManagers for same partition", managersAfterRefresh.containsKey(manager.getPartition().partition)); + managersAfterRefresh.put(manager.getPartition().partition, manager); + } + } + + for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) { + for (PartitionManager manager : partitionManagersBefore) { + assertStateIsTheSame(manager, managersAfterRefresh.get(manager.getPartition().partition)); + } + } + } + + private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) { + // check if state was actually moved from old PartitionManager + assertNotNull(managerBefore); + assertNotNull(managerAfter); + assertNotSame(managerBefore, managerAfter); + assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit); + assertSame(managerBefore._emittedToOffset, managerAfter._emittedToOffset); + assertSame(managerBefore._committedTo, managerAfter._committedTo); + } + private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) { assertEquals(partitionsPerTask, partitionManagersBefore.size()); assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size());
