Repository: kafka Updated Branches: refs/heads/trunk 9855bb9c6 -> 8838fa801
MINOR: KAFKA-2371 follow-up, DistributedHerder should wakeup WorkerGroupMember after assignment to ensure work is started immediately Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Gwen Shapira Closes #360 from ewencp/minor-kafka-2371-follow-up-wakeup-after-rebalance Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8838fa80 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8838fa80 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8838fa80 Branch: refs/heads/trunk Commit: 8838fa8010c146af6aab014a41bc7e68318b4eb0 Parents: 9855bb9 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Wed Oct 28 12:42:03 2015 -0700 Committer: Gwen Shapira <csh...@gmail.com> Committed: Wed Oct 28 12:42:03 2015 -0700 ---------------------------------------------------------------------- .../kafka/copycat/runtime/distributed/DistributedHerder.java | 4 ++++ .../kafka/copycat/runtime/distributed/DistributedHerderTest.java | 2 ++ 2 files changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8838fa80/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java index 17bf7b7..46c7686 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java @@ -614,6 +614,10 @@ public class DistributedHerder implements Herder, Runnable { log.info("Joined group and got assignment: {}", assignment); DistributedHerder.this.assignment = assignment; rebalanceResolved = false; + // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then + // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the + // main thread. + member.wakeup(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/8838fa80/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java index 1213656..c8b4874 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java @@ -371,6 +371,8 @@ public class DistributedHerderTest { return null; } }); + member.wakeup(); + PowerMock.expectLastCall(); } private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) {