[ 
https://issues.apache.org/jira/browse/KAFKA-6299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471322#comment-16471322
 ] 

ASF GitHub Bot commented on KAFKA-6299:
---------------------------------------

hachikuji closed pull request #4989: MINOR: A few small cleanups from KAFKA-6299
URL: https://github.com/apache/kafka/pull/4989
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 70e9fbd6fbb..c9e0e186316 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
-import org.apache.kafka.clients.admin.internal.AdminMetadataManager;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -737,6 +737,30 @@ int nextTimeoutMs() {
     }
 
     private final class AdminClientRunnable implements Runnable {
+        /**
+         * Calls which have not yet been assigned to a node.
+         * Only accessed from this thread.
+         */
+        private final ArrayList<Call> pendingCalls = new ArrayList<>();
+
+        /**
+         * Maps nodes to calls that we want to send.
+         * Only accessed from this thread.
+         */
+        private final Map<Node, List<Call>> callsToSend = new HashMap<>();
+
+        /**
+         * Maps node ID strings to calls that have been sent.
+         * Only accessed from this thread.
+         */
+        private final Map<String, List<Call>> callsInFlight = new HashMap<>();
+
+        /**
+         * Maps correlation IDs to calls that have been sent.
+         * Only accessed from this thread.
+         */
+        private final Map<Integer, Call> correlationIdToCalls = new 
HashMap<>();
+
         /**
          * Pending calls. Protected by the object monitor.
          * This will be null only if the thread has shut down.
@@ -748,9 +772,8 @@ int nextTimeoutMs() {
          *
          * @param processor     The timeout processor.
          */
-        private void timeoutPendingCalls(TimeoutProcessor processor, 
List<Call> pendingCalls) {
-            int numTimedOut = processor.handleTimeouts(pendingCalls,
-                    "Timed out waiting for a node assignment.");
+        private void timeoutPendingCalls(TimeoutProcessor processor) {
+            int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed 
out waiting for a node assignment.");
             if (numTimedOut > 0)
                 log.debug("Timed out {} pending calls.", numTimedOut);
         }
@@ -759,9 +782,8 @@ private void timeoutPendingCalls(TimeoutProcessor 
processor, List<Call> pendingC
          * Time out calls which have been assigned to nodes.
          *
          * @param processor     The timeout processor.
-         * @param callsToSend   A map of nodes to the calls they need to 
handle.
          */
-        private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, 
List<Call>> callsToSend) {
+        private int timeoutCallsToSend(TimeoutProcessor processor) {
             int numTimedOut = 0;
             for (List<Call> callList : callsToSend.values()) {
                 numTimedOut += processor.handleTimeouts(callList,
@@ -778,7 +800,7 @@ private int timeoutCallsToSend(TimeoutProcessor processor, 
Map<Node, List<Call>>
          * This function holds the lock for the minimum amount of time, to 
avoid blocking
          * users of AdminClient who will also take the lock to add new calls.
          */
-        private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) {
+        private synchronized void drainNewCalls() {
             if (!newCalls.isEmpty()) {
                 pendingCalls.addAll(newCalls);
                 newCalls.clear();
@@ -790,11 +812,8 @@ private synchronized void drainNewCalls(ArrayList<Call> 
pendingCalls) {
          *
          * @param now           The current time in milliseconds.
          * @param pendingIter   An iterator yielding pending calls.
-         * @param callsToSend   A map of nodes to the calls they need to 
handle.
-         *
          */
-        private void chooseNodesForPendingCalls(long now, Iterator<Call> 
pendingIter,
-                Map<Node, List<Call>> callsToSend) {
+        private void chooseNodesForPendingCalls(long now, Iterator<Call> 
pendingIter) {
             while (pendingIter.hasNext()) {
                 Call call = pendingIter.next();
                 Node node = null;
@@ -821,14 +840,9 @@ private void chooseNodesForPendingCalls(long now, 
Iterator<Call> pendingIter,
          * Send the calls which are ready.
          *
          * @param now                   The current time in milliseconds.
-         * @param callsToSend           The calls to send, by node.
-         * @param correlationIdToCalls  A map of correlation IDs to calls.
-         * @param callsInFlight         A map of nodes to the calls they have 
in flight.
-         *
          * @return                      The minimum timeout we need for poll().
          */
-        private long sendEligibleCalls(long now, Map<Node, List<Call>> 
callsToSend,
-                         Map<Integer, Call> correlationIdToCalls, Map<String, 
List<Call>> callsInFlight) {
+        private long sendEligibleCalls(long now) {
             long pollTimeout = Long.MAX_VALUE;
             for (Iterator<Map.Entry<Node, List<Call>>> iter = 
callsToSend.entrySet().iterator();
                      iter.hasNext(); ) {
@@ -872,9 +886,8 @@ private long sendEligibleCalls(long now, Map<Node, 
List<Call>> callsToSend,
          * to time them out is to close the entire connection.
          *
          * @param processor         The timeout processor.
-         * @param callsInFlight     A map of nodes to the calls they have in 
flight.
          */
-        private void timeoutCallsInFlight(TimeoutProcessor processor, 
Map<String, List<Call>> callsInFlight) {
+        private void timeoutCallsInFlight(TimeoutProcessor processor) {
             int numTimedOut = 0;
             for (Map.Entry<String, List<Call>> entry : 
callsInFlight.entrySet()) {
                 List<Call> contexts = entry.getValue();
@@ -907,18 +920,12 @@ private void timeoutCallsInFlight(TimeoutProcessor 
processor, Map<String, List<C
          *
          * @param now                   The current time in milliseconds.
          * @param responses             The latest responses from KafkaClient.
-         * @param correlationIdToCall   A map of correlation IDs to calls.
-         * @param callsInFlight         A map of nodes to the calls they have 
in flight.
-        **/
-        private void handleResponses(long now,
-                                     List<ClientResponse> responses,
-                                     Map<String, List<Call>> callsInFlight,
-                                     Map<Integer, Call> correlationIdToCall) {
-
+         **/
+        private void handleResponses(long now, List<ClientResponse> responses) 
{
             for (ClientResponse response : responses) {
                 int correlationId = response.requestHeader().correlationId();
 
-                Call call = correlationIdToCall.get(correlationId);
+                Call call = correlationIdToCalls.get(correlationId);
                 if (call == null) {
                     // If the server returns information about a correlation 
ID we didn't use yet,
                     // an internal server error has occurred. Close the 
connection and log an error message.
@@ -930,7 +937,7 @@ private void handleResponses(long now,
                 }
 
                 // Stop tracking this call.
-                correlationIdToCall.remove(correlationId);
+                correlationIdToCalls.remove(correlationId);
                 List<Call> calls = callsInFlight.get(response.destination());
                 if ((calls == null) || (!calls.remove(call))) {
                     log.error("Internal server error on {}: ignoring call {} 
in correlationIdToCall " +
@@ -978,8 +985,7 @@ private boolean hasActiveExternalCalls(Collection<Call> 
calls) {
         /**
          * Return true if there are currently active external calls.
          */
-        private boolean hasActiveExternalCalls(List<Call> pendingCalls,
-                Map<Node, List<Call>> callsToSend, Map<Integer, Call> 
correlationIdToCalls) {
+        private boolean hasActiveExternalCalls() {
             if (hasActiveExternalCalls(pendingCalls)) {
                 return true;
             }
@@ -988,15 +994,11 @@ private boolean hasActiveExternalCalls(List<Call> 
pendingCalls,
                     return true;
                 }
             }
-            if (hasActiveExternalCalls(correlationIdToCalls.values())) {
-                return true;
-            }
-            return false;
+            return hasActiveExternalCalls(correlationIdToCalls.values());
         }
 
-        private boolean threadShouldExit(long now, long curHardShutdownTimeMs, 
List<Call> pendingCalls,
-                Map<Node, List<Call>> callsToSend, Map<Integer, Call> 
correlationIdToCalls) {
-            if (!hasActiveExternalCalls(pendingCalls, callsToSend, 
correlationIdToCalls)) {
+        private boolean threadShouldExit(long now, long curHardShutdownTimeMs) 
{
+            if (!hasActiveExternalCalls()) {
                 log.trace("All work has been completed, and the I/O thread is 
now exiting.");
                 return true;
             }
@@ -1010,48 +1012,22 @@ private boolean threadShouldExit(long now, long 
curHardShutdownTimeMs, List<Call
 
         @Override
         public void run() {
-            /**
-             * Calls which have not yet been assigned to a node.
-             * Only accessed from this thread.
-             */
-            ArrayList<Call> pendingCalls = new ArrayList<>();
-
-            /**
-             * Maps nodes to calls that we want to send.
-             * Only accessed from this thread.
-             */
-            Map<Node, List<Call>> callsToSend = new HashMap<>();
-
-            /**
-             * Maps node ID strings to calls that have been sent.
-             * Only accessed from this thread.
-             */
-            Map<String, List<Call>> callsInFlight = new HashMap<>();
-
-            /**
-             * Maps correlation IDs to calls that have been sent.
-             * Only accessed from this thread.
-             */
-            Map<Integer, Call> correlationIdToCalls = new HashMap<>();
-
             long now = time.milliseconds();
             log.trace("Thread starting");
             while (true) {
                 // Copy newCalls into pendingCalls.
-                drainNewCalls(pendingCalls);
+                drainNewCalls();
 
                 // Check if the AdminClient thread should shut down.
                 long curHardShutdownTimeMs = hardShutdownTimeMs.get();
-                if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
-                        threadShouldExit(now, curHardShutdownTimeMs, 
pendingCalls,
-                            callsToSend, correlationIdToCalls))
+                if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && 
threadShouldExit(now, curHardShutdownTimeMs))
                     break;
 
                 // Handle timeouts.
                 TimeoutProcessor timeoutProcessor = 
timeoutProcessorFactory.create(now);
-                timeoutPendingCalls(timeoutProcessor, pendingCalls);
-                timeoutCallsToSend(timeoutProcessor, callsToSend);
-                timeoutCallsInFlight(timeoutProcessor, callsInFlight);
+                timeoutPendingCalls(timeoutProcessor);
+                timeoutCallsToSend(timeoutProcessor);
+                timeoutCallsInFlight(timeoutProcessor);
 
                 long pollTimeout = Math.min(1200000, 
timeoutProcessor.nextTimeoutMs());
                 if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
@@ -1059,7 +1035,7 @@ public void run() {
                 }
 
                 // Choose nodes for our pending calls.
-                chooseNodesForPendingCalls(now, pendingCalls.iterator(), 
callsToSend);
+                chooseNodesForPendingCalls(now, pendingCalls.iterator());
                 long metadataFetchDelayMs = 
metadataManager.metadataFetchDelayMs(now);
                 if (metadataFetchDelayMs == 0) {
                     metadataManager.transitionToUpdatePending(now);
@@ -1067,11 +1043,9 @@ public void run() {
                     // Create a new metadata fetch call and add it to the end 
of pendingCalls.
                     // Assign a node for just the new call (we handled the 
other pending nodes above).
                     pendingCalls.add(metadataCall);
-                    chooseNodesForPendingCalls(now, 
pendingCalls.listIterator(pendingCalls.size() - 1),
-                        callsToSend);
+                    chooseNodesForPendingCalls(now, 
pendingCalls.listIterator(pendingCalls.size() - 1));
                 }
-                pollTimeout = Math.min(pollTimeout,
-                    sendEligibleCalls(now, callsToSend, correlationIdToCalls, 
callsInFlight));
+                pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
                 if (metadataFetchDelayMs > 0) {
                     pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
                 }
@@ -1083,18 +1057,16 @@ public void run() {
 
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
-                handleResponses(now, responses, callsInFlight, 
correlationIdToCalls);
+                handleResponses(now, responses);
             }
             int numTimedOut = 0;
             TimeoutProcessor timeoutProcessor = new 
TimeoutProcessor(Long.MAX_VALUE);
             synchronized (this) {
-                numTimedOut += timeoutProcessor.handleTimeouts(newCalls,
-                        "The AdminClient thread has exited.");
+                numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The 
AdminClient thread has exited.");
                 newCalls = null;
             }
-            numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls,
-                "The AdminClient thread has exited.");
-            numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend);
+            numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The 
AdminClient thread has exited.");
+            numTimedOut += timeoutCallsToSend(timeoutProcessor);
             numTimedOut += 
timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
                     "The AdminClient thread has exited.");
             if (numTimedOut > 0) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
similarity index 99%
rename from 
clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
rename to 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 63e7fc8fa0a..3806560326c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.clients.admin.internal;
+package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.clients.MetadataUpdater;
 import org.apache.kafka.common.Cluster;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix AdminClient error handling when metadata changes
> ----------------------------------------------------
>
>                 Key: KAFKA-6299
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6299
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Colin P. McCabe
>            Assignee: Colin P. McCabe
>            Priority: Major
>             Fix For: 2.0.0
>
>
> * AdminClient should only call Metadata#requestUpdate when needed.
> * AdminClient should retry requests for which the controller has changed.
> * Fix an issue where AdminClient requests might not get a security exception, 
> even when a metadata fetch fails with an authorization exception. 
> * Fix a possible issue where AdminClient might leak a socket after the 
> timeout expires on a hard close, if a very narrow race condition is hit



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to