[ https://issues.apache.org/jira/browse/KAFKA-6593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382469#comment-16382469 ]
ASF GitHub Bot commented on KAFKA-6593: --------------------------------------- hachikuji closed pull request #4625: KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync URL: https://github.com/apache/kafka/pull/4625 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 6884ff0dff7..2daadddee63 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -231,7 +231,7 @@ protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long tim } else if (coordinator != null && client.connectionFailed(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery - coordinatorDead(); + markCoordinatorUnknown(); time.sleep(retryBackoffMs); } @@ -487,7 +487,7 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry with backoff - coordinatorDead(); + markCoordinatorUnknown(); log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message()); future.raise(error); } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL @@ -550,7 +550,7 @@ public void handle(SyncGroupResponse syncResponse, if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.debug("SyncGroup failed due to group rebalance"); + log.debug("SyncGroup failed because the group began another rebalance"); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { @@ -559,8 +559,8 @@ public void handle(SyncGroupResponse syncResponse, future.raise(error); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { - log.debug("SyncGroup failed:", error.message()); - coordinatorDead(); + log.debug("SyncGroup failed: {}", error.message()); + markCoordinatorUnknown(); future.raise(error); } else { future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); @@ -627,27 +627,34 @@ public void onFailure(RuntimeException e, RequestFuture<Void> future) { * @return true if the coordinator is unknown */ public boolean coordinatorUnknown() { - return coordinator() == null; + return checkAndGetCoordinator() == null; } /** - * Get the current coordinator + * Get the coordinator if its connection is still active. Otherwise mark it unknown and + * return null. + * * @return the current coordinator or null if it is unknown */ - protected synchronized Node coordinator() { + protected synchronized Node checkAndGetCoordinator() { if (coordinator != null && client.connectionFailed(coordinator)) { - coordinatorDead(); + markCoordinatorUnknown(true); return null; } return this.coordinator; } - /** - * Mark the current coordinator as dead. - */ - protected synchronized void coordinatorDead() { + private synchronized Node coordinator() { + return this.coordinator; + } + + protected synchronized void markCoordinatorUnknown() { + markCoordinatorUnknown(false); + } + + protected synchronized void markCoordinatorUnknown(boolean isDisconnected) { if (this.coordinator != null) { - log.info("Marking the coordinator {} dead", this.coordinator); + log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator); Node oldCoordinator = this.coordinator; // Mark the coordinator dead before disconnecting requests since the callbacks for any pending @@ -656,8 +663,9 @@ protected synchronized void coordinatorDead() { this.coordinator = null; // Disconnect from the coordinator to ensure that there are no in-flight requests remaining. - // Pending callbacks will be invoked with a DisconnectException. - client.disconnect(oldCoordinator); + // Pending callbacks will be invoked with a DisconnectException on the next call to poll. + if (!isDisconnected) + client.disconnectAsync(oldCoordinator); } } @@ -708,7 +716,7 @@ protected void close(long timeoutMs) { // interrupted using wakeup) and the leave group request which have been queued, but not // yet sent to the broker. Wait up to close timeout for these pending requests to be processed. // If coordinator is not known, requests are aborted. - Node coordinator = coordinator(); + Node coordinator = checkAndGetCoordinator(); if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", client.pendingRequestCount(coordinator)); @@ -769,7 +777,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> futu || error == Errors.NOT_COORDINATOR) { log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.", coordinator()); - coordinatorDead(); + markCoordinatorUnknown(); future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.debug("Attempt to heartbeat failed since group is rebalancing"); @@ -800,7 +808,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> futu public void onFailure(RuntimeException e, RequestFuture<T> future) { // mark the coordinator as dead if (e instanceof DisconnectException) { - coordinatorDead(); + markCoordinatorUnknown(true); } future.raise(e); } @@ -948,7 +956,7 @@ public void run() { } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should // probably make sure the coordinator is still healthy. - coordinatorDead(); + markCoordinatorUnknown(); } else if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(), so we explicitly leave the group. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 0aa61bbf442..2afa1ff9236 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -684,7 +684,7 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception if (offsets.isEmpty()) return RequestFuture.voidSuccess(); - Node coordinator = coordinator(); + Node coordinator = checkAndGetCoordinator(); if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); @@ -762,7 +762,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { - coordinatorDead(); + markCoordinatorUnknown(); future.raise(error); return; } else if (error == Errors.UNKNOWN_MEMBER_ID @@ -799,7 +799,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu * @return A request future containing the committed offsets. */ private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) { - Node coordinator = coordinator(); + Node coordinator = checkAndGetCoordinator(); if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); @@ -825,7 +825,7 @@ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartitio future.raise(error); } else if (error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry - coordinatorDead(); + markCoordinatorUnknown(); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 0747e8db146..fb393a54c7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; /** * Higher level consumer access to the network layer with basic support for request futures. This class @@ -65,10 +66,15 @@ private final long unsentExpiryMs; private final AtomicBoolean wakeupDisabled = new AtomicBoolean(); + // We do not need high throughput, so use a fair lock to try to avoid starvation + private final ReentrantLock lock = new ReentrantLock(true); + // when requests complete, they are transferred to this queue prior to invocation. The purpose // is to avoid invoking them while holding this object's monitor which can open the door for deadlocks. private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Node> pendingDisconnects = new ConcurrentLinkedQueue<>(); + // this flag allows the client to be safely woken up without waiting on the lock above. It is // atomic to avoid the need to acquire the lock above in order to enable it concurrently. private final AtomicBoolean wakeup = new AtomicBoolean(false); @@ -113,12 +119,22 @@ public ConsumerNetworkClient(LogContext logContext, return completionHandler.future; } - public synchronized Node leastLoadedNode() { - return client.leastLoadedNode(time.milliseconds()); + public Node leastLoadedNode() { + lock.lock(); + try { + return client.leastLoadedNode(time.milliseconds()); + } finally { + lock.unlock(); + } } - public synchronized boolean hasReadyNodes() { - return client.hasReadyNodes(); + public boolean hasReadyNodes() { + lock.lock(); + try { + return client.hasReadyNodes(); + } finally { + lock.unlock(); + } } /** @@ -227,14 +243,18 @@ public void poll(long timeout, long now, PollCondition pollCondition, boolean di // there may be handlers which need to be invoked if we woke up the previous call to poll firePendingCompletedRequests(); - synchronized (this) { + lock.lock(); + try { + // Handle async disconnects prior to attempting any sends + handlePendingDisconnects(); + // send all the requests we can send now trySend(now); // check whether the poll is still needed by the caller. Note that if the expected completion // condition becomes satisfied after the call to shouldBlock() (because of a fired completion // handler), the client will be woken up. - if (pollCondition == null || pollCondition.shouldBlock()) { + if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) { // if there are no requests in flight, do not block longer than the retry backoff if (client.inFlightRequestCount() == 0) timeout = Math.min(timeout, retryBackoffMs); @@ -265,6 +285,8 @@ public void poll(long timeout, long now, PollCondition pollCondition, boolean di // clean unsent requests collection to keep the map from growing indefinitely unsent.clean(); + } finally { + lock.unlock(); } // called without the lock to avoid deadlock potential if handlers need to acquire locks @@ -303,8 +325,11 @@ public boolean awaitPendingRequests(Node node, long timeoutMs) { * @return The number of pending requests */ public int pendingRequestCount(Node node) { - synchronized (this) { + lock.lock(); + try { return unsent.requestCount(node) + client.inFlightRequestCount(node.idString()); + } finally { + lock.unlock(); } } @@ -317,8 +342,11 @@ public int pendingRequestCount(Node node) { public boolean hasPendingRequests(Node node) { if (unsent.hasRequests(node)) return true; - synchronized (this) { + lock.lock(); + try { return client.hasInFlightRequests(node.idString()); + } finally { + lock.unlock(); } } @@ -328,8 +356,11 @@ public boolean hasPendingRequests(Node node) { * @return The total count of pending requests */ public int pendingRequestCount() { - synchronized (this) { + lock.lock(); + try { return unsent.requestCount() + client.inFlightRequestCount(); + } finally { + lock.unlock(); } } @@ -341,8 +372,11 @@ public int pendingRequestCount() { public boolean hasPendingRequests() { if (unsent.hasRequests()) return true; - synchronized (this) { + lock.lock(); + try { return client.hasInFlightRequests(); + } finally { + lock.unlock(); } } @@ -386,15 +420,25 @@ private void checkDisconnects(long now) { } } - public void disconnect(Node node) { - failUnsentRequests(node, DisconnectException.INSTANCE); + private void handlePendingDisconnects() { + lock.lock(); + try { + while (true) { + Node node = pendingDisconnects.poll(); + if (node == null) + break; - synchronized (this) { - client.disconnect(node.idString()); + failUnsentRequests(node, DisconnectException.INSTANCE); + client.disconnect(node.idString()); + } + } finally { + lock.unlock(); } + } - // We need to poll to ensure callbacks from in-flight requests on the disconnected socket are fired - pollNoWakeup(); + public void disconnectAsync(Node node) { + pendingDisconnects.offer(node); + client.wakeup(); } private void failExpiredRequests(long now) { @@ -408,16 +452,16 @@ private void failExpiredRequests(long now) { private void failUnsentRequests(Node node, RuntimeException e) { // clear unsent requests to node and fail their corresponding futures - synchronized (this) { + lock.lock(); + try { Collection<ClientRequest> unsentRequests = unsent.remove(node); for (ClientRequest unsentRequest : unsentRequests) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback(); handler.onFailure(e); } + } finally { + lock.unlock(); } - - // called without the lock to avoid deadlock potential - firePendingCompletedRequests(); } private boolean trySend(long now) { @@ -458,8 +502,11 @@ public void disableWakeups() { @Override public void close() throws IOException { - synchronized (this) { + lock.lock(); + try { client.close(); + } finally { + lock.unlock(); } } @@ -469,8 +516,11 @@ public void close() throws IOException { * @param node Node to connect to if possible */ public boolean connectionFailed(Node node) { - synchronized (this) { + lock.lock(); + try { return client.connectionFailed(node); + } finally { + lock.unlock(); } } @@ -481,8 +531,11 @@ public boolean connectionFailed(Node node) { * @param node The node to connect to */ public void tryConnect(Node node) { - synchronized (this) { + lock.lock(); + try { client.ready(node, time.milliseconds()); + } finally { + lock.unlock(); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index ed45e3af56d..faf9240537c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; @@ -86,6 +87,7 @@ public FutureResponse(Node node, private final Queue<FutureResponse> futureResponses = new ArrayDeque<>(); private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>(); private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create(); + private volatile int numBlockingWakeups = 0; public MockClient(Time time) { this(time, null); @@ -195,8 +197,40 @@ public void send(ClientRequest request, long now) { this.requests.add(request); } + /** + * Simulate a blocking poll in order to test wakeup behavior. + * + * @param numBlockingWakeups The number of polls which will block until woken up + */ + public synchronized void enableBlockingUntilWakeup(int numBlockingWakeups) { + this.numBlockingWakeups = numBlockingWakeups; + } + + @Override + public synchronized void wakeup() { + if (numBlockingWakeups > 0) { + numBlockingWakeups--; + notify(); + } + } + + private synchronized void maybeAwaitWakeup() { + try { + int remainingBlockingWakeups = numBlockingWakeups; + if (remainingBlockingWakeups <= 0) + return; + + while (numBlockingWakeups == remainingBlockingWakeups) + wait(); + } catch (InterruptedException e) { + throw new InterruptException(e); + } + } + @Override public List<ClientResponse> poll(long timeoutMs, long now) { + maybeAwaitWakeup(); + List<ClientResponse> copy = new ArrayList<>(this.responses); if (metadata != null && metadata.updateRequested()) { @@ -427,10 +461,6 @@ public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> expectResponse, callback); } - @Override - public void wakeup() { - } - @Override public void close() { } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 2e5c96019b9..ac392055f9b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -221,7 +221,8 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception }); } - coordinator.coordinatorDead(); + coordinator.markCoordinatorUnknown(); + consumerClient.pollNoWakeup(); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(numRequests, responses.get()); } @@ -238,7 +239,7 @@ public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() throws final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false); Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = Collections.singletonMap( new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L, "")); - consumerClient.send(coordinator.coordinator(), new OffsetCommitRequest.Builder(groupId, offsets)) + consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(groupId, offsets)) .compose(new RequestFutureAdapter<ClientResponse, Object>() { @Override public void onSuccess(ClientResponse value, RequestFuture<Object> future) {} @@ -251,7 +252,8 @@ public void onFailure(RuntimeException e, RequestFuture<Object> future) { } }); - coordinator.coordinatorDead(); + coordinator.markCoordinatorUnknown(); + consumerClient.pollNoWakeup(); assertTrue(asyncCallbackInvoked.get()); } @@ -1033,6 +1035,7 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) client.respond(offsetCommitResponse(Collections.singletonMap(t1p, error))); consumerClient.pollNoWakeup(); + consumerClient.pollNoWakeup(); // second poll since coordinator disconnect is async coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(coordinator.coordinatorUnknown()); @@ -1678,7 +1681,7 @@ public void testAutoCommitAfterCoordinatorBackToService() { subscriptions.assignFromUser(Collections.singleton(t1p)); subscriptions.seek(t1p, 100L); - coordinator.coordinatorDead(); + coordinator.markCoordinatorUnknown(); assertTrue(coordinator.coordinatorUnknown()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 904270ec489..d0888fa5655 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -107,7 +107,8 @@ public void testDisconnectWithUnsentRequests() { RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat()); assertTrue(consumerClient.hasPendingRequests(node)); assertFalse(client.hasInFlightRequests(node.idString())); - consumerClient.disconnect(node); + consumerClient.disconnectAsync(node); + consumerClient.pollNoWakeup(); assertTrue(future.failed()); assertTrue(future.exception() instanceof DisconnectException); } @@ -118,7 +119,8 @@ public void testDisconnectWithInFlightRequests() { consumerClient.pollNoWakeup(); assertTrue(consumerClient.hasPendingRequests(node)); assertTrue(client.hasInFlightRequests(node.idString())); - consumerClient.disconnect(node); + consumerClient.disconnectAsync(node); + consumerClient.pollNoWakeup(); assertTrue(future.failed()); assertTrue(future.exception() instanceof DisconnectException); } @@ -205,6 +207,66 @@ public void wakeup() { assertTrue(future.isDone()); } + @Test + public void testDisconnectWakesUpPoll() throws Exception { + final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat()); + + client.enableBlockingUntilWakeup(1); + Thread t = new Thread() { + @Override + public void run() { + consumerClient.poll(future); + } + }; + t.start(); + + consumerClient.disconnectAsync(node); + t.join(); + assertTrue(future.failed()); + assertTrue(future.exception() instanceof DisconnectException); + } + + @Test + public void testFutureCompletionOutsidePoll() throws Exception { + // Tests the scenario in which the request that is being awaited in one thread + // is received and completed in another thread. + + final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat()); + consumerClient.pollNoWakeup(); // dequeue and send the request + + client.enableBlockingUntilWakeup(2); + Thread t1 = new Thread() { + @Override + public void run() { + consumerClient.pollNoWakeup(); + } + }; + t1.start(); + + // Sleep a little so that t1 is blocking in poll + Thread.sleep(50); + + Thread t2 = new Thread() { + @Override + public void run() { + consumerClient.poll(future); + } + }; + t2.start(); + + // Sleep a little so that t2 is awaiting the network client lock + Thread.sleep(50); + + // Simulate a network response and return from the poll in t1 + client.respond(heartbeatResponse(Errors.NONE)); + client.wakeup(); + + // Both threads should complete since t1 should wakeup t2 + t1.join(); + t2.join(); + assertTrue(future.succeeded()); + } + @Test public void testAwaitForMetadataUpdateWithTimeout() { assertFalse(consumerClient.awaitMetadataUpdate(10L)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Coordinator disconnect in heartbeat thread can cause commitSync to block > indefinitely > ------------------------------------------------------------------------------------- > > Key: KAFKA-6593 > URL: https://issues.apache.org/jira/browse/KAFKA-6593 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 1.0.0, 0.11.0.2 > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Critical > Fix For: 1.1.0 > > Attachments: consumer.log > > > If a coordinator disconnect is observed in the heartbeat thread, it can cause > a pending offset commit to be cancelled just before the foreground thread > begins waiting on its response in poll(). Since the poll timeout is > Long.MAX_VALUE, this will cause the consumer to effectively hang until some > other network event causes the poll() to return. We try to protect this case > with a poll condition on the future, but this isn't bulletproof since the > future can be completed outside of the lock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)