This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d066b94c810 MINOR: Fix UpdatedImage and HighWatermarkUpdated events' 
logs (#15432)
d066b94c810 is described below

commit d066b94c8103cca166d7ea01a4b5bf5f65a3b838
Author: David Jacot <[email protected]>
AuthorDate: Thu Feb 29 07:01:21 2024 -0800

    MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs (#15432)
    
    I have noticed the following log when a __consumer_offsets partition 
immigrate from a broker. It appends because the event is queued up after the 
event that unloads the state machine. This patch fixes it and fixes another 
similar one.
    
    ```
    [2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of 
UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not 
the correct coordinator.. 
(org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
    org.apache.kafka.common.errors.NotCoordinatorException: This is not the 
correct coordinator.
    ```
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../group/runtime/CoordinatorRuntime.java          | 144 +++++++++++----------
 1 file changed, 76 insertions(+), 68 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index b0be84c7a90..ccb4caf04f9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -1192,11 +1192,28 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         ) {
             log.debug("High watermark of {} incremented to {}.", tp, offset);
             scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-                withActiveContextOrThrow(tp, context -> {
-                    context.coordinator.updateLastCommittedOffset(offset);
-                    context.deferredEventQueue.completeUpTo(offset);
-                    coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset);
-                });
+                CoordinatorContext context = coordinators.get(tp);
+                if (context != null) {
+                    context.lock.lock();
+                    try {
+                        if (context.state == CoordinatorState.ACTIVE) {
+                            // The updated high watermark can be applied to 
the coordinator only if the coordinator
+                            // exists and is in the active state.
+                            log.debug("Updating high watermark of {} to {}.", 
tp, offset);
+                            
context.coordinator.updateLastCommittedOffset(offset);
+                            context.deferredEventQueue.completeUpTo(offset);
+                            coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
+                        } else {
+                            log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
+                                tp, offset);
+                        }
+                    } finally {
+                        context.lock.unlock();
+                    }
+                } else {
+                    log.debug("Ignored high watermark updated for {} to {} 
because the coordinator does not exist.",
+                        tp, offset);
+                }
             });
         }
     }
@@ -1350,14 +1367,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
     }
 
     /**
-     * Creates the context if it does not exist.
-     *
-     * @param tp    The topic partition.
-     *
-     * Visible for testing.
+     * @return The coordinator context or a new context if it does not exist.
+     * Package private for testing.
      */
-    void maybeCreateContext(TopicPartition tp) {
-        coordinators.computeIfAbsent(tp, CoordinatorContext::new);
+    CoordinatorContext maybeCreateContext(TopicPartition tp) {
+        return coordinators.computeIfAbsent(tp, CoordinatorContext::new);
     }
 
     /**
@@ -1376,29 +1390,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         }
     }
 
-    /**
-     * Calls the provided function with the context; throws an exception 
otherwise.
-     * This method ensures that the context lock is acquired before calling the
-     * function and releases afterwards.
-     *
-     * @param tp    The topic partition.
-     * @param func  The function that will receive the context.
-     * @throws NotCoordinatorException
-     */
-    private void withContextOrThrow(
-        TopicPartition tp,
-        Consumer<CoordinatorContext> func
-    ) throws NotCoordinatorException {
-        CoordinatorContext context = contextOrThrow(tp);
-
-        try {
-            context.lock.lock();
-            func.accept(context);
-        } finally {
-            context.lock.unlock();
-        }
-    }
-
     /**
      * Calls the provided function with the context iff the context is active; 
throws
      * an exception otherwise. This method ensures that the context lock is 
acquired
@@ -1609,7 +1600,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         maybeCreateContext(tp);
 
         scheduleInternalOperation("Load(tp=" + tp + ", epoch=" + 
partitionEpoch + ")", tp, () -> {
-            withContextOrThrow(tp, context -> {
+            // The context is re-created if it does not exist.
+            CoordinatorContext context = maybeCreateContext(tp);
+
+            context.lock.lock();
+            try {
                 if (context.epoch < partitionEpoch) {
                     context.epoch = partitionEpoch;
 
@@ -1617,16 +1612,13 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                         case FAILED:
                         case INITIAL:
                             context.transitionTo(CoordinatorState.LOADING);
-                            loader.load(
-                                tp,
-                                context.coordinator
-                            ).whenComplete((summary, exception) -> {
+                            loader.load(tp, 
context.coordinator).whenComplete((summary, exception) -> {
                                 scheduleInternalOperation("CompleteLoad(tp=" + 
tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
-                                    withContextOrThrow(tp, ctx -> {
+                                    CoordinatorContext ctx = 
coordinators.get(tp);
+                                    if (ctx != null)  {
                                         if (ctx.state != 
CoordinatorState.LOADING) {
-                                            log.info("Ignoring load completion 
from {} because context is in {} state.",
-                                                ctx.tp, ctx.state
-                                            );
+                                            log.info("Ignored load completion 
from {} because context is in {} state.",
+                                                ctx.tp, ctx.state);
                                             return;
                                         }
                                         try {
@@ -1635,18 +1627,19 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                                             if (summary != null) {
                                                 
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), 
summary.endTimeMs());
                                                 log.info("Finished loading of 
metadata from {} with epoch {} in {}ms where {}ms " +
-                                                    "was spent in the 
scheduler. Loaded {} records which total to {} bytes.",
+                                                         "was spent in the 
scheduler. Loaded {} records which total to {} bytes.",
                                                     tp, partitionEpoch, 
summary.endTimeMs() - summary.startTimeMs(),
-                                                    
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes()
-                                                );
+                                                    
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes());
                                             }
                                         } catch (Throwable ex) {
                                             log.error("Failed to load metadata 
from {} with epoch {} due to {}.",
-                                                tp, partitionEpoch, 
ex.toString()
-                                            );
+                                                tp, partitionEpoch, 
ex.toString());
                                             
ctx.transitionTo(CoordinatorState.FAILED);
                                         }
-                                    });
+                                    } else {
+                                        log.debug("Failed to complete the 
loading of metadata for {} in epoch {} since the coordinator does not exist.",
+                                            tp, partitionEpoch);
+                                    }
                                 });
                             });
                             break;
@@ -1663,11 +1656,12 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                             log.error("Cannot load coordinator {} in state 
{}.", tp, context.state);
                     }
                 } else {
-                    log.info("Ignoring loading metadata from {} since current 
epoch {} is larger than or equals to {}.",
-                        context.tp, context.epoch, partitionEpoch
-                    );
+                    log.info("Ignored loading metadata from {} since current 
epoch {} is larger than or equals to {}.",
+                        context.tp, context.epoch, partitionEpoch);
                 }
-            });
+            } finally {
+                context.lock.unlock();
+            }
         });
     }
 
@@ -1689,8 +1683,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + 
partitionEpoch + ")", tp, () -> {
             CoordinatorContext context = coordinators.get(tp);
             if (context != null) {
+                context.lock.lock();
                 try {
-                    context.lock.lock();
                     if (!partitionEpoch.isPresent() || context.epoch < 
partitionEpoch.getAsInt()) {
                         log.info("Started unloading metadata for {} with epoch 
{}.", tp, partitionEpoch);
                         context.transitionTo(CoordinatorState.CLOSED);
@@ -1698,16 +1692,14 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                         log.info("Finished unloading metadata for {} with 
epoch {}.", tp, partitionEpoch);
                     } else {
                         log.info("Ignored unloading metadata for {} in epoch 
{} since current epoch is {}.",
-                            tp, partitionEpoch, context.epoch
-                        );
+                            tp, partitionEpoch, context.epoch);
                     }
                 } finally {
                     context.lock.unlock();
                 }
             } else {
                 log.info("Ignored unloading metadata for {} in epoch {} since 
metadata was never loaded.",
-                    tp, partitionEpoch
-                );
+                    tp, partitionEpoch);
             }
         });
     }
@@ -1731,15 +1723,26 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         // Push an event for each coordinator.
         coordinators.keySet().forEach(tp -> {
             scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + 
newImage.offset() + ")", tp, () -> {
-                withContextOrThrow(tp, context -> {
-                    if (context.state == CoordinatorState.ACTIVE) {
-                        log.debug("Applying new metadata image with offset {} 
to {}.", newImage.offset(), tp);
-                        context.coordinator.onNewMetadataImage(newImage, 
delta);
-                    } else {
-                        log.debug("Ignoring new metadata image with offset {} 
for {} because the coordinator is not active.",
-                            newImage.offset(), tp);
+                CoordinatorContext context = coordinators.get(tp);
+                if (context != null) {
+                    context.lock.lock();
+                    try {
+                        if (context.state == CoordinatorState.ACTIVE) {
+                            // The new image can be applied to the coordinator 
only if the coordinator
+                            // exists and is in the active state.
+                            log.debug("Applying new metadata image with offset 
{} to {}.", newImage.offset(), tp);
+                            context.coordinator.onNewMetadataImage(newImage, 
delta);
+                        } else {
+                            log.debug("Ignored new metadata image with offset 
{} for {} because the coordinator is not active.",
+                                newImage.offset(), tp);
+                        }
+                    } finally {
+                        context.lock.unlock();
                     }
-                });
+                } else {
+                    log.debug("Ignored new metadata image with offset {} for 
{} because the coordinator does not exist.",
+                        newImage.offset(), tp);
+                }
             });
         });
     }
@@ -1764,7 +1767,12 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         Utils.closeQuietly(processor, "event processor");
         // Unload all the coordinators.
         coordinators.forEach((tp, context) -> {
-            context.transitionTo(CoordinatorState.CLOSED);
+            context.lock.lock();
+            try {
+                context.transitionTo(CoordinatorState.CLOSED);
+            } finally {
+                context.lock.unlock();
+            }
         });
         coordinators.clear();
         Utils.closeQuietly(runtimeMetrics, "runtime metrics");

Reply via email to