Repository: kafka Updated Branches: refs/heads/trunk 1d586cb50 -> 6f7ed15da
KAFKA-4510: StreamThread must finish rebalance in state PENDING_SHUTDOWN Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska, Guozhang Wang Closes #2227 from mjsax/kafka-4510-finish-rebalance-on-shutdown Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f7ed15d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f7ed15d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f7ed15d Branch: refs/heads/trunk Commit: 6f7ed15dad8d914ae65a595eef327c0510f11469 Parents: 1d586cb Author: Matthias J. Sax <[email protected]> Authored: Sat Dec 10 21:53:46 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Sat Dec 10 21:53:46 2016 -0800 ---------------------------------------------------------------------- .../internals/InternalTopicManager.java | 2 +- .../internals/StreamPartitionAssignor.java | 2 +- .../processor/internals/StreamThread.java | 21 +++++++++++++++----- 3 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index a65a2ae..c586779 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -52,7 +52,7 @@ public class InternalTopicManager { public static final String RETENTION_MS = "retention.ms"; public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); - private final ZkClient zkClient; + final ZkClient zkClient; private final int replicationFactor; private final long windowChangeLogAdditionalRetention; http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 7e15f70..8a94b7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -159,7 +159,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable private Map<TaskId, Set<TopicPartition>> standbyTasks; private Map<TaskId, Set<TopicPartition>> activeTasks; - private InternalTopicManager internalTopicManager; + InternalTopicManager internalTopicManager; /** * We need to have the PartitionAssignor and its StreamThread to be mutually accessible http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a2cac71..151bfd5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -167,6 +167,13 @@ public class StreamThread extends Thread { } } + private synchronized void setStateWhenNotInPendingShutdown(final State newState) { + if (state == State.PENDING_SHUTDOWN) { + return; + } + setState(newState); + } + public final PartitionGrouper partitionGrouper; private final StreamsMetadataState streamsMetadataState; public final String applicationId; @@ -212,21 +219,21 @@ public class StreamThread extends Thread { final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection<TopicPartition> assignment) { + try { if (state == State.PENDING_SHUTDOWN) { log.info("stream-thread [{}] New partitions [{}] assigned while shutting down.", StreamThread.this.getName(), assignment); - return; } log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.", StreamThread.this.getName(), assignment); - setState(State.ASSIGNING_PARTITIONS); + setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS); addStreamTasks(assignment); addStandbyTasks(); lastCleanMs = time.milliseconds(); // start the cleaning cycle streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata()); - setState(State.RUNNING); + setStateWhenNotInPendingShutdown(State.RUNNING); } catch (Throwable t) { rebalanceException = t; throw t; @@ -239,11 +246,10 @@ public class StreamThread extends Thread { if (state == State.PENDING_SHUTDOWN) { log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.", StreamThread.this.getName(), assignment); - return; } log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.", StreamThread.this.getName(), assignment); - setState(State.PARTITIONS_REVOKED); + setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED); lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned // suspend active tasks suspendTasksAndState(true); @@ -391,6 +397,11 @@ public class StreamThread extends Thread { log.error("{} Failed to close restore consumer: ", logPrefix, e); } + // TODO remove this + // hotfix to improve ZK behavior als long as KAFKA-4060 is not fixed (c.f. KAFKA-4369) + // when removing this, make StreamPartitionAssignor#internalTopicManager "private" again + partitionAssignor.internalTopicManager.zkClient.close(); + // remove all tasks removeStreamTasks(); removeStandbyTasks();
