This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b25c96a9159 KAFKA-16229: Fix slow expired producer id deletion (#15324) b25c96a9159 is described below commit b25c96a91599ea029eae77c55cffaeafcc87b8a4 Author: Jorge Esteban Quilcate Otoya <jorge.quilc...@aiven.io> AuthorDate: Fri Feb 9 20:17:17 2024 -0500 KAFKA-16229: Fix slow expired producer id deletion (#15324) Expiration of ProducerIds is implemented with a slow removal of map keys: producers.keySet().removeAll(keys); Unnecessarily going through all producer ids and then throw all expired keys to be removed. This leads to exponential time on worst case when most/all keys need to be removed: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op A simple fix is to use map#remove(key) instead, leading to a more linear growth: Benchmark (numProducerIds) Mode Cnt Score Error Units ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million: Reviewers: Justine Olshan <jols...@confluent.io> --- .../jmh/storage/ProducerStateManagerBench.java | 99 ++++++++++++++++++++++ .../internals/log/ProducerStateManager.java | 2 +- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java new file mode 100644 index 00000000000..291c78d72ad --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.storage; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.ProducerStateEntry; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@Fork(1) +@BenchmarkMode(Mode.AverageTime) +@State(value = Scope.Benchmark) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ProducerStateManagerBench { + Time time = new MockTime(); + final int producerIdExpirationMs = 1000; + + ProducerStateManager manager; + Path tempDirectory; + + @Param({"100", "1000", "10000", "100000"}) + public int numProducerIds; + + @Setup(Level.Trial) + public void setup() throws IOException { + tempDirectory = Files.createTempDirectory("kafka-logs"); + manager = new ProducerStateManager( + new TopicPartition("t1", 0), + tempDirectory.toFile(), + Integer.MAX_VALUE, + new ProducerStateManagerConfig(producerIdExpirationMs, false), + time + ); + } + + + @TearDown(Level.Trial) + public void tearDown() throws Exception { + Files.deleteIfExists(tempDirectory); + } + + @Benchmark + @Threads(1) + public void testDeleteExpiringIds() { + short epoch = 0; + for (long i = 0L; i < numProducerIds; i++) { + final ProducerStateEntry entry = new ProducerStateEntry( + i, + epoch, + 0, + time.milliseconds(), + OptionalLong.empty(), + Optional.empty() + ); + manager.loadProducerEntry(entry); + } + + manager.removeExpiredProducers(time.milliseconds() + producerIdExpirationMs + 1); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 6bcafd2d607..270aa0a42f9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -177,7 +177,7 @@ public class ProducerStateManager { } private void removeProducerIds(List<Long> keys) { - producers.keySet().removeAll(keys); + keys.forEach(producers::remove); producerIdCount = producers.size(); }