Repository: kafka Updated Branches: refs/heads/trunk 7c7becd4c -> 8e9e17767
HOTFIX: Fix HerderRequest.compareTo() With KAFKA-3008 (#1788), the implementation does not respect the contract that 'sgn(x.compareTo(y)) == -sgn(y.compareTo(x))' This fix addresses the hang with JDK8 in DistributedHerderTest.compareTo() Author: Shikhar Bhushan <[email protected]> Reviewers: Ismael Juma <[email protected]>, Konstantine Karantasis <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #2232 from shikhar/herderreq-compareto Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e9e1776 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e9e1776 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e9e1776 Branch: refs/heads/trunk Commit: 8e9e1776790abb0a9b103f75d0231dd66d09e68f Parents: 7c7becd Author: Shikhar Bhushan <[email protected]> Authored: Sat Dec 10 12:48:51 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Sat Dec 10 12:48:51 2016 -0800 ---------------------------------------------------------------------- .../runtime/distributed/DistributedHerder.java | 25 ++++++++++++-------- .../distributed/DistributedHerderTest.java | 13 ++++++++++ 2 files changed, 28 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8e9e1776/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index a8799c6..25dfc6b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -67,6 +67,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** * <p> @@ -103,6 +104,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; private static final int START_STOP_THREAD_POOL_SIZE = 8; + private final AtomicLong requestSeqNum = new AtomicLong(); + private final Time time; private final String workerGroupId; @@ -125,7 +128,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // To handle most external requests, like creating or destroying a connector, we can use a generic request where // the caller specifies all the code that should be executed. - private final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>(); + final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>(); // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits). private Set<String> connectorConfigUpdates = new HashSet<>(); @@ -1016,15 +1019,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { return false; } - private void addRequest(Callable<Void> action, Callback<Void> callback) { - addRequest(0, action, callback); + HerderRequest addRequest(Callable<Void> action, Callback<Void> callback) { + return addRequest(0, action, callback); } - private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { - HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback); + HerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { + HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback); requests.add(req); if (peekWithoutException() == req) member.wakeup(); + return req; } private HerderRequest peekWithoutException() { @@ -1091,13 +1095,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } - private static class HerderRequest implements Comparable<HerderRequest> { + static class HerderRequest implements Comparable<HerderRequest> { private final long at; + private final long seq; private final Callable<Void> action; private final Callback<Void> callback; - public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) { + public HerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) { this.at = at; + this.seq = seq; this.action = action; this.callback = callback; } @@ -1112,9 +1118,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @Override public int compareTo(HerderRequest o) { - final int soonest = Long.compare(at, o.at); - // If tied, returning a positive value should respect insertion order. - return soonest != 0 ? soonest : 1; + final int cmp = Long.compare(at, o.at); + return cmp == 0 ? Long.compare(seq, o.seq) : cmp; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8e9e1776/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 5be2044..6a36957 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -732,6 +732,19 @@ public class DistributedHerderTest { } @Test + public void testRequestProcessingOrder() throws Exception { + final DistributedHerder.HerderRequest req1 = herder.addRequest(100, null, null); + final DistributedHerder.HerderRequest req2 = herder.addRequest(10, null, null); + final DistributedHerder.HerderRequest req3 = herder.addRequest(200, null, null); + final DistributedHerder.HerderRequest req4 = herder.addRequest(200, null, null); + + assertEquals(req2, herder.requests.pollFirst()); // lowest delay + assertEquals(req1, herder.requests.pollFirst()); // next lowest delay + assertEquals(req3, herder.requests.pollFirst()); // same delay as req4, but added first + assertEquals(req4, herder.requests.pollFirst()); + } + + @Test public void testRestartTaskRedirectToLeader() throws Exception { // get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("member");
