This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b25c96a9159 KAFKA-16229: Fix slow expired producer id deletion (#15324)
b25c96a9159 is described below

commit b25c96a91599ea029eae77c55cffaeafcc87b8a4
Author: Jorge Esteban Quilcate Otoya <jorge.quilc...@aiven.io>
AuthorDate: Fri Feb 9 20:17:17 2024 -0500

    KAFKA-16229: Fix slow expired producer id deletion (#15324)
    
    Expiration of ProducerIds is implemented with a slow removal of map keys:
            producers.keySet().removeAll(keys);
    Unnecessarily going through all producer ids and then throw all expired 
keys to be removed.
    This leads to exponential time on worst case when most/all keys need to be 
removed:
    
    Benchmark                                        (numProducerIds)  Mode  
Cnt           Score            Error  Units
    ProducerStateManagerBench.testDeleteExpiringIds               100  avgt    
3        9164.043 ±      10647.877  ns/op
    ProducerStateManagerBench.testDeleteExpiringIds              1000  avgt    
3      341561.093 ±      20283.211  ns/op
    ProducerStateManagerBench.testDeleteExpiringIds             10000  avgt    
3    44957983.550 ±    9389011.290  ns/op
    ProducerStateManagerBench.testDeleteExpiringIds            100000  avgt    
3  5683374164.167 ± 1446242131.466  ns/op
    A simple fix is to use map#remove(key) instead, leading to a more linear 
growth:
    
    Benchmark                                        (numProducerIds)  Mode  
Cnt        Score         Error  Units
    ProducerStateManagerBench.testDeleteExpiringIds               100  avgt    
3     5779.056 ±     651.389  ns/op
    ProducerStateManagerBench.testDeleteExpiringIds              1000  avgt    
3    61430.530 ±   21875.644  ns/op
    ProducerStateManagerBench.testDeleteExpiringIds             10000  avgt    
3   643887.031 ±  600475.302  ns/op
    ProducerStateManagerBench.testDeleteExpiringIds            100000  avgt    
3  7741689.539 ± 3218317.079  ns/op
    Flamegraph of the CPU usage at dealing with expiration when producers ids 
~1Million:
    
    Reviewers: Justine Olshan <jols...@confluent.io>
---
 .../jmh/storage/ProducerStateManagerBench.java     | 99 ++++++++++++++++++++++
 .../internals/log/ProducerStateManager.java        |  2 +-
 2 files changed, 100 insertions(+), 1 deletion(-)

diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
new file mode 100644
index 00000000000..291c78d72ad
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.storage;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.log.ProducerStateEntry;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@State(value = Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ProducerStateManagerBench {
+    Time time = new MockTime();
+    final int producerIdExpirationMs = 1000;
+
+    ProducerStateManager manager;
+    Path tempDirectory;
+
+    @Param({"100", "1000", "10000", "100000"})
+    public int numProducerIds;
+
+    @Setup(Level.Trial)
+    public void setup() throws IOException {
+        tempDirectory = Files.createTempDirectory("kafka-logs");
+        manager = new ProducerStateManager(
+            new TopicPartition("t1", 0),
+            tempDirectory.toFile(),
+            Integer.MAX_VALUE,
+            new ProducerStateManagerConfig(producerIdExpirationMs, false),
+            time
+        );
+    }
+
+
+    @TearDown(Level.Trial)
+    public void tearDown() throws Exception {
+        Files.deleteIfExists(tempDirectory);
+    }
+
+    @Benchmark
+    @Threads(1)
+    public void testDeleteExpiringIds() {
+        short epoch = 0;
+        for (long i = 0L; i < numProducerIds; i++) {
+            final ProducerStateEntry entry = new ProducerStateEntry(
+                i,
+                epoch,
+                0,
+                time.milliseconds(),
+                OptionalLong.empty(),
+                Optional.empty()
+            );
+            manager.loadProducerEntry(entry);
+        }
+
+        manager.removeExpiredProducers(time.milliseconds() + 
producerIdExpirationMs + 1);
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index 6bcafd2d607..270aa0a42f9 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -177,7 +177,7 @@ public class ProducerStateManager {
     }
 
     private void removeProducerIds(List<Long> keys) {
-        producers.keySet().removeAll(keys);
+        keys.forEach(producers::remove);
         producerIdCount = producers.size();
     }
 

Reply via email to