junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1290474356


##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -140,17 +167,30 @@ public long metadataExpireMs() {
     }
 
     /**
-     * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
+     * Request an update of the current cluster metadata info, permitting 
backoff based on the number of
+     * equivalent responses, which indicate that metadata responses did not 
make progress and may be stale.

Review Comment:
   Could we add a comment on when the caller should set 
permitBackoffOnEquivalentResponses to true? Also, when should 
backoffOnEquivalentResponses be reset to false?



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -114,18 +127,32 @@ public synchronized Cluster fetch() {
 
     /**
      * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+     * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+     * since the last successful response, and how many equivalent metadata 
responses have been received.
+     * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+     * metadata responses are clean.
+     * <p>
+     * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+     * not be delayed if this method returns 0.
      *
      * @param nowMs current time in ms
      * @return remaining time in ms till the cluster info can be updated again
      */
     public synchronized long timeToAllowUpdate(long nowMs) {
-        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+        // Calculate the backoff for attempts which acts when metadata 
responses fail
+        long backoffForAttempts = Math.max(this.lastRefreshMs + 
this.refreshBackoff.backoff(this.attempts) - nowMs, 0);
+
+        // Calculate the backoff for equivalent responses which acts when 
metadata responses as not making progress

Review Comment:
   as not making => are not making ?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -598,22 +608,22 @@ private void insertInSequenceOrder(Deque<ProducerBatch> 
deque, ProducerBatch bat
     /**
      * Add the leader to the ready nodes if the batch is ready
      *
-     * @param nowMs The current time
      * @param exhausted 'true' is the buffer pool is exhausted
      * @param part The partition
      * @param leader The leader for the partition
      * @param waitedTimeMs How long batch waited
      * @param backingOff Is backing off
+     * @param backoffAttempts Number of attempts for calculating backoff delay
      * @param full Is batch full
      * @param nextReadyCheckDelayMs The delay for next check
      * @param readyNodes The set of ready nodes (to be filled in)
      * @return The delay for next check
      */
-    private long batchReady(long nowMs, boolean exhausted, TopicPartition 
part, Node leader,
-                            long waitedTimeMs, boolean backingOff, boolean 
full,
-                            long nextReadyCheckDelayMs, Set<Node> readyNodes) {
+    private long batchReady(boolean exhausted, TopicPartition part, Node 
leader,
+                            long waitedTimeMs, boolean backingOff, int 
backoffAttempts,
+                            boolean full, long nextReadyCheckDelayMs, 
Set<Node> readyNodes) {
         if (!readyNodes.contains(leader) && !isMuted(part)) {
-            long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+            long timeToWaitMs = backingOff ? 
retryBackoff.backoff(backoffAttempts) : lingerMs;

Review Comment:
   Here, backoffAttempts is already subtracted by one when calling 
retryBackoff.backoff. In other places, we pass in the true attempts and 
subtract by one when calling retryBackoff.backoff. It would be useful to make 
that consistent.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -550,7 +550,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
                     // refresh metadata before re-joining the group as long as 
the refresh backoff time has
                     // passed.
                     if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) 
== 0) {
-                        this.metadata.requestUpdate();
+                        this.metadata.requestUpdate(true);

Review Comment:
   Why is permitBackoffOnEquivalentResponses set to true? We refresh the 
metadata here not because we have discovered stale metadata.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -140,17 +167,30 @@ public long metadataExpireMs() {
     }
 
     /**
-     * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
+     * Request an update of the current cluster metadata info, permitting 
backoff based on the number of
+     * equivalent responses, which indicate that metadata responses did not 
make progress and may be stale.
+     * @param permitBackoffOnEquivalentResponses Whether to permit backoff 
when consecutive responses are equivalent
+     * @return The current updateVersion before the update
      */
-    public synchronized int requestUpdate() {
+    public synchronized int requestUpdate(final boolean 
permitBackoffOnEquivalentResponses) {
         this.needFullUpdate = true;
+        if (this.backoffOnEquivalentResponses) {

Review Comment:
   Hmm, we need to check backoffOnEquivalentResponses when calculating 
`timeToAllowUpdate`, right?



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -267,11 +307,14 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
 
         this.needPartialUpdate = requestVersion < this.requestVersion;
         this.lastRefreshMs = nowMs;
+        this.attempts = 0;
         this.updateVersion += 1;
         if (!isPartialUpdate) {
             this.needFullUpdate = false;
             this.lastSuccessfulRefreshMs = nowMs;
         }
+        this.backoffOnEquivalentResponses = true;
+        this.equivalentResponseCount++;

Review Comment:
   The metadata request can also be driven by periodic metadata refresh. In 
that case, we don't want to trigger the exponential backoff if the metadata 
response doesn't change. However, in the PR, it seems that once 
backoffOnEquivalentResponses is set to true. It's not reset to false before the 
periodic metadata refresh?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to