[ https://issues.apache.org/jira/browse/KAFKA-6299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471322#comment-16471322 ]
ASF GitHub Bot commented on KAFKA-6299: --------------------------------------- hachikuji closed pull request #4989: MINOR: A few small cleanups from KAFKA-6299 URL: https://github.com/apache/kafka/pull/4989 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 70e9fbd6fbb..c9e0e186316 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -26,7 +26,7 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; -import org.apache.kafka.clients.admin.internal.AdminMetadataManager; +import org.apache.kafka.clients.admin.internals.AdminMetadataManager; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; @@ -737,6 +737,30 @@ int nextTimeoutMs() { } private final class AdminClientRunnable implements Runnable { + /** + * Calls which have not yet been assigned to a node. + * Only accessed from this thread. + */ + private final ArrayList<Call> pendingCalls = new ArrayList<>(); + + /** + * Maps nodes to calls that we want to send. + * Only accessed from this thread. + */ + private final Map<Node, List<Call>> callsToSend = new HashMap<>(); + + /** + * Maps node ID strings to calls that have been sent. + * Only accessed from this thread. + */ + private final Map<String, List<Call>> callsInFlight = new HashMap<>(); + + /** + * Maps correlation IDs to calls that have been sent. + * Only accessed from this thread. + */ + private final Map<Integer, Call> correlationIdToCalls = new HashMap<>(); + /** * Pending calls. Protected by the object monitor. * This will be null only if the thread has shut down. @@ -748,9 +772,8 @@ int nextTimeoutMs() { * * @param processor The timeout processor. */ - private void timeoutPendingCalls(TimeoutProcessor processor, List<Call> pendingCalls) { - int numTimedOut = processor.handleTimeouts(pendingCalls, - "Timed out waiting for a node assignment."); + private void timeoutPendingCalls(TimeoutProcessor processor) { + int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment."); if (numTimedOut > 0) log.debug("Timed out {} pending calls.", numTimedOut); } @@ -759,9 +782,8 @@ private void timeoutPendingCalls(TimeoutProcessor processor, List<Call> pendingC * Time out calls which have been assigned to nodes. * * @param processor The timeout processor. - * @param callsToSend A map of nodes to the calls they need to handle. */ - private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) { + private int timeoutCallsToSend(TimeoutProcessor processor) { int numTimedOut = 0; for (List<Call> callList : callsToSend.values()) { numTimedOut += processor.handleTimeouts(callList, @@ -778,7 +800,7 @@ private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> * This function holds the lock for the minimum amount of time, to avoid blocking * users of AdminClient who will also take the lock to add new calls. */ - private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) { + private synchronized void drainNewCalls() { if (!newCalls.isEmpty()) { pendingCalls.addAll(newCalls); newCalls.clear(); @@ -790,11 +812,8 @@ private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) { * * @param now The current time in milliseconds. * @param pendingIter An iterator yielding pending calls. - * @param callsToSend A map of nodes to the calls they need to handle. - * */ - private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter, - Map<Node, List<Call>> callsToSend) { + private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) { while (pendingIter.hasNext()) { Call call = pendingIter.next(); Node node = null; @@ -821,14 +840,9 @@ private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter, * Send the calls which are ready. * * @param now The current time in milliseconds. - * @param callsToSend The calls to send, by node. - * @param correlationIdToCalls A map of correlation IDs to calls. - * @param callsInFlight A map of nodes to the calls they have in flight. - * * @return The minimum timeout we need for poll(). */ - private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend, - Map<Integer, Call> correlationIdToCalls, Map<String, List<Call>> callsInFlight) { + private long sendEligibleCalls(long now) { long pollTimeout = Long.MAX_VALUE; for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) { @@ -872,9 +886,8 @@ private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend, * to time them out is to close the entire connection. * * @param processor The timeout processor. - * @param callsInFlight A map of nodes to the calls they have in flight. */ - private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<Call>> callsInFlight) { + private void timeoutCallsInFlight(TimeoutProcessor processor) { int numTimedOut = 0; for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) { List<Call> contexts = entry.getValue(); @@ -907,18 +920,12 @@ private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<C * * @param now The current time in milliseconds. * @param responses The latest responses from KafkaClient. - * @param correlationIdToCall A map of correlation IDs to calls. - * @param callsInFlight A map of nodes to the calls they have in flight. - **/ - private void handleResponses(long now, - List<ClientResponse> responses, - Map<String, List<Call>> callsInFlight, - Map<Integer, Call> correlationIdToCall) { - + **/ + private void handleResponses(long now, List<ClientResponse> responses) { for (ClientResponse response : responses) { int correlationId = response.requestHeader().correlationId(); - Call call = correlationIdToCall.get(correlationId); + Call call = correlationIdToCalls.get(correlationId); if (call == null) { // If the server returns information about a correlation ID we didn't use yet, // an internal server error has occurred. Close the connection and log an error message. @@ -930,7 +937,7 @@ private void handleResponses(long now, } // Stop tracking this call. - correlationIdToCall.remove(correlationId); + correlationIdToCalls.remove(correlationId); List<Call> calls = callsInFlight.get(response.destination()); if ((calls == null) || (!calls.remove(call))) { log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " + @@ -978,8 +985,7 @@ private boolean hasActiveExternalCalls(Collection<Call> calls) { /** * Return true if there are currently active external calls. */ - private boolean hasActiveExternalCalls(List<Call> pendingCalls, - Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) { + private boolean hasActiveExternalCalls() { if (hasActiveExternalCalls(pendingCalls)) { return true; } @@ -988,15 +994,11 @@ private boolean hasActiveExternalCalls(List<Call> pendingCalls, return true; } } - if (hasActiveExternalCalls(correlationIdToCalls.values())) { - return true; - } - return false; + return hasActiveExternalCalls(correlationIdToCalls.values()); } - private boolean threadShouldExit(long now, long curHardShutdownTimeMs, List<Call> pendingCalls, - Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) { - if (!hasActiveExternalCalls(pendingCalls, callsToSend, correlationIdToCalls)) { + private boolean threadShouldExit(long now, long curHardShutdownTimeMs) { + if (!hasActiveExternalCalls()) { log.trace("All work has been completed, and the I/O thread is now exiting."); return true; } @@ -1010,48 +1012,22 @@ private boolean threadShouldExit(long now, long curHardShutdownTimeMs, List<Call @Override public void run() { - /** - * Calls which have not yet been assigned to a node. - * Only accessed from this thread. - */ - ArrayList<Call> pendingCalls = new ArrayList<>(); - - /** - * Maps nodes to calls that we want to send. - * Only accessed from this thread. - */ - Map<Node, List<Call>> callsToSend = new HashMap<>(); - - /** - * Maps node ID strings to calls that have been sent. - * Only accessed from this thread. - */ - Map<String, List<Call>> callsInFlight = new HashMap<>(); - - /** - * Maps correlation IDs to calls that have been sent. - * Only accessed from this thread. - */ - Map<Integer, Call> correlationIdToCalls = new HashMap<>(); - long now = time.milliseconds(); log.trace("Thread starting"); while (true) { // Copy newCalls into pendingCalls. - drainNewCalls(pendingCalls); + drainNewCalls(); // Check if the AdminClient thread should shut down. long curHardShutdownTimeMs = hardShutdownTimeMs.get(); - if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && - threadShouldExit(now, curHardShutdownTimeMs, pendingCalls, - callsToSend, correlationIdToCalls)) + if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs)) break; // Handle timeouts. TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now); - timeoutPendingCalls(timeoutProcessor, pendingCalls); - timeoutCallsToSend(timeoutProcessor, callsToSend); - timeoutCallsInFlight(timeoutProcessor, callsInFlight); + timeoutPendingCalls(timeoutProcessor); + timeoutCallsToSend(timeoutProcessor); + timeoutCallsInFlight(timeoutProcessor); long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs()); if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) { @@ -1059,7 +1035,7 @@ public void run() { } // Choose nodes for our pending calls. - chooseNodesForPendingCalls(now, pendingCalls.iterator(), callsToSend); + chooseNodesForPendingCalls(now, pendingCalls.iterator()); long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now); if (metadataFetchDelayMs == 0) { metadataManager.transitionToUpdatePending(now); @@ -1067,11 +1043,9 @@ public void run() { // Create a new metadata fetch call and add it to the end of pendingCalls. // Assign a node for just the new call (we handled the other pending nodes above). pendingCalls.add(metadataCall); - chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1), - callsToSend); + chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1)); } - pollTimeout = Math.min(pollTimeout, - sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight)); + pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now)); if (metadataFetchDelayMs > 0) { pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs); } @@ -1083,18 +1057,16 @@ public void run() { // Update the current time and handle the latest responses. now = time.milliseconds(); - handleResponses(now, responses, callsInFlight, correlationIdToCalls); + handleResponses(now, responses); } int numTimedOut = 0; TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE); synchronized (this) { - numTimedOut += timeoutProcessor.handleTimeouts(newCalls, - "The AdminClient thread has exited."); + numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited."); newCalls = null; } - numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, - "The AdminClient thread has exited."); - numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend); + numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited."); + numTimedOut += timeoutCallsToSend(timeoutProcessor); numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(), "The AdminClient thread has exited."); if (numTimedOut > 0) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java rename to clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java index 63e7fc8fa0a..3806560326c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.clients.admin.internal; +package org.apache.kafka.clients.admin.internals; import org.apache.kafka.clients.MetadataUpdater; import org.apache.kafka.common.Cluster; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix AdminClient error handling when metadata changes > ---------------------------------------------------- > > Key: KAFKA-6299 > URL: https://issues.apache.org/jira/browse/KAFKA-6299 > Project: Kafka > Issue Type: Bug > Reporter: Colin P. McCabe > Assignee: Colin P. McCabe > Priority: Major > Fix For: 2.0.0 > > > * AdminClient should only call Metadata#requestUpdate when needed. > * AdminClient should retry requests for which the controller has changed. > * Fix an issue where AdminClient requests might not get a security exception, > even when a metadata fetch fails with an authorization exception. > * Fix a possible issue where AdminClient might leak a socket after the > timeout expires on a hard close, if a very narrow race condition is hit -- This message was sent by Atlassian JIRA (v7.6.3#76005)