This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d066b94c810 MINOR: Fix UpdatedImage and HighWatermarkUpdated events'
logs (#15432)
d066b94c810 is described below
commit d066b94c8103cca166d7ea01a4b5bf5f65a3b838
Author: David Jacot <[email protected]>
AuthorDate: Thu Feb 29 07:01:21 2024 -0800
MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs (#15432)
I have noticed the following log when a __consumer_offsets partition
immigrate from a broker. It appends because the event is queued up after the
event that unloads the state machine. This patch fixes it and fixes another
similar one.
```
[2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of
UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not
the correct coordinator..
(org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the
correct coordinator.
```
Reviewers: Justine Olshan <[email protected]>
---
.../group/runtime/CoordinatorRuntime.java | 144 +++++++++++----------
1 file changed, 76 insertions(+), 68 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index b0be84c7a90..ccb4caf04f9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -1192,11 +1192,28 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
) {
log.debug("High watermark of {} incremented to {}.", tp, offset);
scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ",
offset=" + offset + ")", tp, () -> {
- withActiveContextOrThrow(tp, context -> {
- context.coordinator.updateLastCommittedOffset(offset);
- context.deferredEventQueue.completeUpTo(offset);
- coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset);
- });
+ CoordinatorContext context = coordinators.get(tp);
+ if (context != null) {
+ context.lock.lock();
+ try {
+ if (context.state == CoordinatorState.ACTIVE) {
+ // The updated high watermark can be applied to
the coordinator only if the coordinator
+ // exists and is in the active state.
+ log.debug("Updating high watermark of {} to {}.",
tp, offset);
+
context.coordinator.updateLastCommittedOffset(offset);
+ context.deferredEventQueue.completeUpTo(offset);
+ coordinatorMetrics.onUpdateLastCommittedOffset(tp,
offset);
+ } else {
+ log.debug("Ignored high watermark updated for {}
to {} because the coordinator is not active.",
+ tp, offset);
+ }
+ } finally {
+ context.lock.unlock();
+ }
+ } else {
+ log.debug("Ignored high watermark updated for {} to {}
because the coordinator does not exist.",
+ tp, offset);
+ }
});
}
}
@@ -1350,14 +1367,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
/**
- * Creates the context if it does not exist.
- *
- * @param tp The topic partition.
- *
- * Visible for testing.
+ * @return The coordinator context or a new context if it does not exist.
+ * Package private for testing.
*/
- void maybeCreateContext(TopicPartition tp) {
- coordinators.computeIfAbsent(tp, CoordinatorContext::new);
+ CoordinatorContext maybeCreateContext(TopicPartition tp) {
+ return coordinators.computeIfAbsent(tp, CoordinatorContext::new);
}
/**
@@ -1376,29 +1390,6 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
}
- /**
- * Calls the provided function with the context; throws an exception
otherwise.
- * This method ensures that the context lock is acquired before calling the
- * function and releases afterwards.
- *
- * @param tp The topic partition.
- * @param func The function that will receive the context.
- * @throws NotCoordinatorException
- */
- private void withContextOrThrow(
- TopicPartition tp,
- Consumer<CoordinatorContext> func
- ) throws NotCoordinatorException {
- CoordinatorContext context = contextOrThrow(tp);
-
- try {
- context.lock.lock();
- func.accept(context);
- } finally {
- context.lock.unlock();
- }
- }
-
/**
* Calls the provided function with the context iff the context is active;
throws
* an exception otherwise. This method ensures that the context lock is
acquired
@@ -1609,7 +1600,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
maybeCreateContext(tp);
scheduleInternalOperation("Load(tp=" + tp + ", epoch=" +
partitionEpoch + ")", tp, () -> {
- withContextOrThrow(tp, context -> {
+ // The context is re-created if it does not exist.
+ CoordinatorContext context = maybeCreateContext(tp);
+
+ context.lock.lock();
+ try {
if (context.epoch < partitionEpoch) {
context.epoch = partitionEpoch;
@@ -1617,16 +1612,13 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
case FAILED:
case INITIAL:
context.transitionTo(CoordinatorState.LOADING);
- loader.load(
- tp,
- context.coordinator
- ).whenComplete((summary, exception) -> {
+ loader.load(tp,
context.coordinator).whenComplete((summary, exception) -> {
scheduleInternalOperation("CompleteLoad(tp=" +
tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
- withContextOrThrow(tp, ctx -> {
+ CoordinatorContext ctx =
coordinators.get(tp);
+ if (ctx != null) {
if (ctx.state !=
CoordinatorState.LOADING) {
- log.info("Ignoring load completion
from {} because context is in {} state.",
- ctx.tp, ctx.state
- );
+ log.info("Ignored load completion
from {} because context is in {} state.",
+ ctx.tp, ctx.state);
return;
}
try {
@@ -1635,18 +1627,19 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
if (summary != null) {
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(),
summary.endTimeMs());
log.info("Finished loading of
metadata from {} with epoch {} in {}ms where {}ms " +
- "was spent in the
scheduler. Loaded {} records which total to {} bytes.",
+ "was spent in the
scheduler. Loaded {} records which total to {} bytes.",
tp, partitionEpoch,
summary.endTimeMs() - summary.startTimeMs(),
-
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes()
- );
+
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes());
}
} catch (Throwable ex) {
log.error("Failed to load metadata
from {} with epoch {} due to {}.",
- tp, partitionEpoch,
ex.toString()
- );
+ tp, partitionEpoch,
ex.toString());
ctx.transitionTo(CoordinatorState.FAILED);
}
- });
+ } else {
+ log.debug("Failed to complete the
loading of metadata for {} in epoch {} since the coordinator does not exist.",
+ tp, partitionEpoch);
+ }
});
});
break;
@@ -1663,11 +1656,12 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
log.error("Cannot load coordinator {} in state
{}.", tp, context.state);
}
} else {
- log.info("Ignoring loading metadata from {} since current
epoch {} is larger than or equals to {}.",
- context.tp, context.epoch, partitionEpoch
- );
+ log.info("Ignored loading metadata from {} since current
epoch {} is larger than or equals to {}.",
+ context.tp, context.epoch, partitionEpoch);
}
- });
+ } finally {
+ context.lock.unlock();
+ }
});
}
@@ -1689,8 +1683,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" +
partitionEpoch + ")", tp, () -> {
CoordinatorContext context = coordinators.get(tp);
if (context != null) {
+ context.lock.lock();
try {
- context.lock.lock();
if (!partitionEpoch.isPresent() || context.epoch <
partitionEpoch.getAsInt()) {
log.info("Started unloading metadata for {} with epoch
{}.", tp, partitionEpoch);
context.transitionTo(CoordinatorState.CLOSED);
@@ -1698,16 +1692,14 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
log.info("Finished unloading metadata for {} with
epoch {}.", tp, partitionEpoch);
} else {
log.info("Ignored unloading metadata for {} in epoch
{} since current epoch is {}.",
- tp, partitionEpoch, context.epoch
- );
+ tp, partitionEpoch, context.epoch);
}
} finally {
context.lock.unlock();
}
} else {
log.info("Ignored unloading metadata for {} in epoch {} since
metadata was never loaded.",
- tp, partitionEpoch
- );
+ tp, partitionEpoch);
}
});
}
@@ -1731,15 +1723,26 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
// Push an event for each coordinator.
coordinators.keySet().forEach(tp -> {
scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" +
newImage.offset() + ")", tp, () -> {
- withContextOrThrow(tp, context -> {
- if (context.state == CoordinatorState.ACTIVE) {
- log.debug("Applying new metadata image with offset {}
to {}.", newImage.offset(), tp);
- context.coordinator.onNewMetadataImage(newImage,
delta);
- } else {
- log.debug("Ignoring new metadata image with offset {}
for {} because the coordinator is not active.",
- newImage.offset(), tp);
+ CoordinatorContext context = coordinators.get(tp);
+ if (context != null) {
+ context.lock.lock();
+ try {
+ if (context.state == CoordinatorState.ACTIVE) {
+ // The new image can be applied to the coordinator
only if the coordinator
+ // exists and is in the active state.
+ log.debug("Applying new metadata image with offset
{} to {}.", newImage.offset(), tp);
+ context.coordinator.onNewMetadataImage(newImage,
delta);
+ } else {
+ log.debug("Ignored new metadata image with offset
{} for {} because the coordinator is not active.",
+ newImage.offset(), tp);
+ }
+ } finally {
+ context.lock.unlock();
}
- });
+ } else {
+ log.debug("Ignored new metadata image with offset {} for
{} because the coordinator does not exist.",
+ newImage.offset(), tp);
+ }
});
});
}
@@ -1764,7 +1767,12 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
Utils.closeQuietly(processor, "event processor");
// Unload all the coordinators.
coordinators.forEach((tp, context) -> {
- context.transitionTo(CoordinatorState.CLOSED);
+ context.lock.lock();
+ try {
+ context.transitionTo(CoordinatorState.CLOSED);
+ } finally {
+ context.lock.unlock();
+ }
});
coordinators.clear();
Utils.closeQuietly(runtimeMetrics, "runtime metrics");