This is an automated email from the ASF dual-hosted git repository.
jeffkbkim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 05fd36a3b76 KAFKA-18174: Subsequent write event completions should be
a noop (#18083)
05fd36a3b76 is described below
commit 05fd36a3b766d898c46e4f212490de3525b1e4d5
Author: Jeff Kim <[email protected]>
AuthorDate: Mon Dec 9 17:17:29 2024 -0500
KAFKA-18174: Subsequent write event completions should be a noop (#18083)
---
.../common/runtime/CoordinatorRuntime.java | 8 ++
.../runtime/CoordinatorRuntimeMetricsImpl.java | 4 +-
.../runtime/CoordinatorRuntimeMetricsImplTest.java | 18 +++
.../common/runtime/CoordinatorRuntimeTest.java | 138 +++++++++++++++++++++
4 files changed, 167 insertions(+), 1 deletion(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index bef30a113df..1c21038e66a 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -1360,6 +1360,10 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
@Override
public void complete(Throwable exception) {
+ if (future.isDone()) {
+ return;
+ }
+
final long purgatoryTimeMs = time.milliseconds() -
deferredEventQueuedTimestamp;
CompletableFuture<Void> appendFuture = result != null ?
result.appendFuture() : null;
@@ -1653,6 +1657,10 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
@Override
public void complete(Throwable exception) {
+ if (future.isDone()) {
+ return;
+ }
+
final long purgatoryTimeMs = time.milliseconds() -
deferredEventQueuedTimestamp;
if (exception == null) {
future.complete(null);
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
index 591b37e2fb4..391813250c1 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
+import static
org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
+
public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics {
/**
@@ -291,7 +293,7 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
@Override
public void recordEventPurgatoryTime(long purgatoryTimeMs) {
- eventPurgatoryTimeSensor.record(purgatoryTimeMs);
+ eventPurgatoryTimeSensor.record(Math.min(MAX_LATENCY_MS,
purgatoryTimeMs));
}
@Override
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index bb637d94b27..7285b58ffab 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -36,6 +36,7 @@ import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetr
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_QUEUE_TIME_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.NUM_PARTITIONS_METRIC_NAME;
+import static
org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -204,6 +205,23 @@ public class CoordinatorRuntimeMetricsImplTest {
assertEquals(999.0, metric.metricValue());
}
+ @Test
+ public void testRecordEventPurgatoryTimeLimit() {
+ Time time = new MockTime();
+ Metrics metrics = new Metrics(time);
+
+ CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
+
+ IntStream.range(1, 1001).forEach(__ ->
runtimeMetrics.recordEventPurgatoryTime(MAX_LATENCY_MS + 1000L));
+
+ MetricName metricName = kafkaMetricName(metrics,
EVENT_PURGATORY_TIME_METRIC_NAME + "-max");
+ KafkaMetric metric = metrics.metrics().get(metricName);
+ long value = ((Double) metric.metricValue()).longValue();
+
+ // 3 sigfigs in HdrHistogram is not precise enough.
+ assertTrue(value >= MAX_LATENCY_MS && value < MAX_LATENCY_MS + 1000L);
+ }
+
private static void assertMetricGauge(Metrics metrics,
org.apache.kafka.common.MetricName metricName, long count) {
assertEquals(count, (long) metrics.metric(metricName).metricValue());
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 34364887ae1..40d059fa3d6 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -4410,6 +4410,144 @@ public class CoordinatorRuntimeTest {
verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
}
+ @Test
+ public void testWriteEventCompletesOnlyOnce() throws Exception {
+ // Completes once via timeout, then again with HWM update.
+ Duration writeTimeout = Duration.ofMillis(1000L);
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ ManualEventProcessor processor = new ManualEventProcessor();
+ CoordinatorRuntimeMetrics runtimeMetrics =
mock(CoordinatorRuntimeMetrics.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(writeTimeout)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(processor)
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Loads the coordinator. Poll once to execute the load operation and
once
+ // to complete the load.
+ runtime.scheduleLoadOperation(TP, 10);
+ processor.poll();
+ processor.poll();
+
+ // write#1 will be committed and update the high watermark. Record
time spent in purgatory.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, writeTimeout,
+ state -> new CoordinatorResult<>(List.of("record1"), "response1")
+ );
+
+ processor.poll();
+
+ // Records have been written to the log.
+ long writeTimestamp = timer.time().milliseconds();
+ assertEquals(Collections.singletonList(
+ records(writeTimestamp, "record1")
+ ), writer.entries(TP));
+
+ // There is no pending high watermark.
+ assertEquals(NO_OFFSET,
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+ // Advance the clock to time out the write event. Confirm write#1 is
completed with a timeout.
+ timer.advanceClock(writeTimeout.toMillis() + 1L);
+ processor.poll();
+ verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
+ assertTrue(write1.isCompletedExceptionally());
+
+ // HWM update
+ writer.commit(TP, 1);
+ assertEquals(1, processor.size());
+ assertEquals(1,
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+ // Poll once to process the high watermark update and complete
write#1. It has already
+ // been completed and this is a noop.
+ processor.poll();
+
+ assertEquals(NO_OFFSET,
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+ assertEquals(1,
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
+ assertTrue(write1.isCompletedExceptionally());
+ verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
+ }
+
+ @Test
+ public void testCompleteTransactionEventCompletesOnlyOnce() throws
Exception {
+ // Completes once via timeout, then again with HWM update.
+ Duration writeTimeout = Duration.ofMillis(1000L);
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ ManualEventProcessor processor = new ManualEventProcessor();
+ CoordinatorRuntimeMetrics runtimeMetrics =
mock(CoordinatorRuntimeMetrics.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(writeTimeout)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(processor)
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Loads the coordinator. Poll once to execute the load operation and
once
+ // to complete the load.
+ runtime.scheduleLoadOperation(TP, 10);
+ processor.poll();
+ processor.poll();
+
+ // transaction completion.
+ CompletableFuture<Void> write1 = runtime.scheduleTransactionCompletion(
+ "transactional-write",
+ TP,
+ 100L,
+ (short) 50,
+ 1,
+ TransactionResult.COMMIT,
+ writeTimeout
+ );
+ processor.poll();
+
+ // Records have been written to the log.
+ assertEquals(List.of(
+ endTransactionMarker(100, (short) 50, timer.time().milliseconds(),
1, ControlRecordType.COMMIT)
+ ), writer.entries(TP));
+
+ // The write timeout tasks exist.
+ assertEquals(1, timer.size());
+ assertFalse(write1.isDone());
+
+ // Advance the clock to time out the write event. Confirm write#1 is
completed with a timeout.
+ timer.advanceClock(writeTimeout.toMillis() + 1L);
+ processor.poll();
+ verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
+ assertTrue(write1.isCompletedExceptionally());
+
+ // HWM update
+ writer.commit(TP, 1);
+ assertEquals(1, processor.size());
+ assertEquals(1,
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+ // Poll once to process the high watermark update and complete
write#1. It has already
+ // been completed and this is a noop.
+ processor.poll();
+
+ assertEquals(NO_OFFSET,
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+ assertEquals(1,
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
+ assertTrue(write1.isCompletedExceptionally());
+ verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
+ }
@Test
public void testCoordinatorExecutor() {
Duration writeTimeout = Duration.ofMillis(1000);