This is an automated email from the ASF dual-hosted git repository.

jeffkbkim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05fd36a3b76 KAFKA-18174: Subsequent write event completions should be 
a noop (#18083)
05fd36a3b76 is described below

commit 05fd36a3b766d898c46e4f212490de3525b1e4d5
Author: Jeff Kim <[email protected]>
AuthorDate: Mon Dec 9 17:17:29 2024 -0500

    KAFKA-18174: Subsequent write event completions should be a noop (#18083)
---
 .../common/runtime/CoordinatorRuntime.java         |   8 ++
 .../runtime/CoordinatorRuntimeMetricsImpl.java     |   4 +-
 .../runtime/CoordinatorRuntimeMetricsImplTest.java |  18 +++
 .../common/runtime/CoordinatorRuntimeTest.java     | 138 +++++++++++++++++++++
 4 files changed, 167 insertions(+), 1 deletion(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index bef30a113df..1c21038e66a 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -1360,6 +1360,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         @Override
         public void complete(Throwable exception) {
+            if (future.isDone()) {
+                return;
+            }
+
             final long purgatoryTimeMs = time.milliseconds() - 
deferredEventQueuedTimestamp;
             CompletableFuture<Void> appendFuture = result != null ? 
result.appendFuture() : null;
 
@@ -1653,6 +1657,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         @Override
         public void complete(Throwable exception) {
+            if (future.isDone()) {
+                return;
+            }
+
             final long purgatoryTimeMs = time.milliseconds() - 
deferredEventQueuedTimestamp;
             if (exception == null) {
                 future.complete(null);
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
index 591b37e2fb4..391813250c1 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
+import static 
org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
+
 public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics {
 
     /**
@@ -291,7 +293,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
 
     @Override
     public void recordEventPurgatoryTime(long purgatoryTimeMs) {
-        eventPurgatoryTimeSensor.record(purgatoryTimeMs);
+        eventPurgatoryTimeSensor.record(Math.min(MAX_LATENCY_MS, 
purgatoryTimeMs));
     }
 
     @Override
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index bb637d94b27..7285b58ffab 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -36,6 +36,7 @@ import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetr
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_QUEUE_TIME_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.NUM_PARTITIONS_METRIC_NAME;
+import static 
org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -204,6 +205,23 @@ public class CoordinatorRuntimeMetricsImplTest {
         assertEquals(999.0, metric.metricValue());
     }
 
+    @Test
+    public void testRecordEventPurgatoryTimeLimit() {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
+
+        IntStream.range(1, 1001).forEach(__ -> 
runtimeMetrics.recordEventPurgatoryTime(MAX_LATENCY_MS + 1000L));
+
+        MetricName metricName = kafkaMetricName(metrics, 
EVENT_PURGATORY_TIME_METRIC_NAME + "-max");
+        KafkaMetric metric = metrics.metrics().get(metricName);
+        long value = ((Double) metric.metricValue()).longValue();
+
+        // 3 sigfigs in HdrHistogram is not precise enough.
+        assertTrue(value >= MAX_LATENCY_MS && value < MAX_LATENCY_MS + 1000L);
+    }
+
     private static void assertMetricGauge(Metrics metrics, 
org.apache.kafka.common.MetricName metricName, long count) {
         assertEquals(count, (long) metrics.metric(metricName).metricValue());
     }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 34364887ae1..40d059fa3d6 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -4410,6 +4410,144 @@ public class CoordinatorRuntimeTest {
         verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
     }
 
+    @Test
+    public void testWriteEventCompletesOnlyOnce() throws Exception {
+        // Completes once via timeout, then again with HWM update.
+        Duration writeTimeout = Duration.ofMillis(1000L);
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        ManualEventProcessor processor = new ManualEventProcessor();
+        CoordinatorRuntimeMetrics runtimeMetrics = 
mock(CoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(writeTimeout)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Loads the coordinator. Poll once to execute the load operation and 
once
+        // to complete the load.
+        runtime.scheduleLoadOperation(TP, 10);
+        processor.poll();
+        processor.poll();
+
+        // write#1 will be committed and update the high watermark. Record 
time spent in purgatory.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, writeTimeout,
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
+        );
+
+        processor.poll();
+
+        // Records have been written to the log.
+        long writeTimestamp = timer.time().milliseconds();
+        assertEquals(Collections.singletonList(
+            records(writeTimestamp, "record1")
+        ), writer.entries(TP));
+
+        // There is no pending high watermark.
+        assertEquals(NO_OFFSET, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Advance the clock to time out the write event. Confirm write#1 is 
completed with a timeout.
+        timer.advanceClock(writeTimeout.toMillis() + 1L);
+        processor.poll();
+        verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
+        assertTrue(write1.isCompletedExceptionally());
+
+        // HWM update
+        writer.commit(TP, 1);
+        assertEquals(1, processor.size());
+        assertEquals(1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Poll once to process the high watermark update and complete 
write#1. It has already
+        // been completed and this is a noop.
+        processor.poll();
+
+        assertEquals(NO_OFFSET, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+        assertEquals(1, 
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
+        assertTrue(write1.isCompletedExceptionally());
+        verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
+    }
+
+    @Test
+    public void testCompleteTransactionEventCompletesOnlyOnce() throws 
Exception {
+        // Completes once via timeout, then again with HWM update.
+        Duration writeTimeout = Duration.ofMillis(1000L);
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        ManualEventProcessor processor = new ManualEventProcessor();
+        CoordinatorRuntimeMetrics runtimeMetrics = 
mock(CoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(writeTimeout)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Loads the coordinator. Poll once to execute the load operation and 
once
+        // to complete the load.
+        runtime.scheduleLoadOperation(TP, 10);
+        processor.poll();
+        processor.poll();
+
+        // transaction completion.
+        CompletableFuture<Void> write1 = runtime.scheduleTransactionCompletion(
+            "transactional-write",
+            TP,
+            100L,
+            (short) 50,
+            1,
+            TransactionResult.COMMIT,
+            writeTimeout
+        );
+        processor.poll();
+
+        // Records have been written to the log.
+        assertEquals(List.of(
+            endTransactionMarker(100, (short) 50, timer.time().milliseconds(), 
1, ControlRecordType.COMMIT)
+        ), writer.entries(TP));
+
+        // The write timeout tasks exist.
+        assertEquals(1, timer.size());
+        assertFalse(write1.isDone());
+
+        // Advance the clock to time out the write event. Confirm write#1 is 
completed with a timeout.
+        timer.advanceClock(writeTimeout.toMillis() + 1L);
+        processor.poll();
+        verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
+        assertTrue(write1.isCompletedExceptionally());
+
+        // HWM update
+        writer.commit(TP, 1);
+        assertEquals(1, processor.size());
+        assertEquals(1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Poll once to process the high watermark update and complete 
write#1. It has already
+        // been completed and this is a noop.
+        processor.poll();
+
+        assertEquals(NO_OFFSET, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+        assertEquals(1, 
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
+        assertTrue(write1.isCompletedExceptionally());
+        verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
+    }
     @Test
     public void testCoordinatorExecutor() {
         Duration writeTimeout = Duration.ofMillis(1000);

Reply via email to