This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4c5ea05ec85 KAFKA-18058: Share group state record pruning impl.
(#18014)
4c5ea05ec85 is described below
commit 4c5ea05ec85aba18abae1308f78345ff949659a5
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Dec 12 13:08:03 2024 +0530
KAFKA-18058: Share group state record pruning impl. (#18014)
In this PR, we've added a class ShareCoordinatorOffsetsManager, which
tracks the last redundant offset for each share group state topic partition. We
have also added a periodic timer job in ShareCoordinatorService which queries
for the redundant offset at regular intervals and if a valid value is found,
issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In
this way the size of the partitions is kept manageable.
Reviewers: Jun Rao <[email protected]>, David Jacot <[email protected]>,
Andrew Schofield <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 20 +
.../common/runtime/PartitionWriter.java | 11 +
.../common/runtime/InMemoryPartitionWriter.java | 8 +
.../group/CoordinatorPartitionWriter.scala | 21 +
.../main/scala/kafka/server/ReplicaManager.scala | 11 +-
.../group/CoordinatorPartitionWriterTest.scala | 82 +++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 10 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 58 ++-
.../server/config/ShareCoordinatorConfig.java | 14 +-
.../share/ShareCoordinatorOffsetsManager.java | 122 +++++
.../coordinator/share/ShareCoordinatorService.java | 94 +++-
.../coordinator/share/ShareCoordinatorShard.java | 33 +-
.../share/ShareCoordinatorOffsetsManagerTest.java | 209 +++++++++
.../share/ShareCoordinatorServiceTest.java | 505 +++++++++++++++++++--
.../share/ShareCoordinatorShardTest.java | 23 +-
...igTest.java => ShareCoordinatorTestConfig.java} | 3 +-
16 files changed, 1175 insertions(+), 49 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1c21038e66a..ee0cf18212a 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -52,6 +52,7 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -2468,4 +2469,23 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed.");
}
+
+ /**
+ * Util method which returns all the topic partitions for which
+ * the state machine is in active state.
+ * <p>
+ * This could be useful if the caller does not have a specific
+ * target internal topic partition.
+ * @return List of {@link TopicPartition} whose coordinators are active
+ */
+ public List<TopicPartition> activeTopicPartitions() {
+ if (coordinators == null || coordinators.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ return coordinators.entrySet().stream()
+ .filter(entry ->
entry.getValue().state.equals(CoordinatorState.ACTIVE))
+ .map(Map.Entry::getKey)
+ .toList();
+ }
}
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
index 47df8bcae34..cb8bec3f71c 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
@@ -107,4 +107,15 @@ public interface PartitionWriter {
short producerEpoch,
short apiVersion
) throws KafkaException;
+
+ /**
+ * Delete records from a topic partition until specified offset
+ * @param tp The partition to delete records from
+ * @param deleteBeforeOffset Offset to delete until, starting from the
beginning
+ * @throws KafkaException Any KafkaException caught during the
operation.
+ */
+ CompletableFuture<Void> deleteRecords(
+ TopicPartition tp,
+ long deleteBeforeOffset
+ ) throws KafkaException;
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
index 1f676ad550f..a8551f0734b 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
@@ -115,6 +115,14 @@ public class InMemoryPartitionWriter implements
PartitionWriter {
}
}
+ @Override
+ public CompletableFuture<Void> deleteRecords(
+ TopicPartition tp,
+ long deleteBeforeOffset
+ ) throws KafkaException {
+ throw new RuntimeException("method not implemented");
+ }
+
@Override
public CompletableFuture<VerificationGuard>
maybeStartTransactionVerification(
TopicPartition tp,
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index f70819ef438..211be799a7e 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -165,4 +165,25 @@ class CoordinatorPartitionWriter(
// Required offset.
partitionResult.lastOffset + 1
}
+
+ override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long):
CompletableFuture[Void] = {
+ val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]()
+
+ replicaManager.deleteRecords(
+ timeout = 30000L, // 30 seconds.
+ offsetPerPartition = Map(tp -> deleteBeforeOffset),
+ responseCallback = results => {
+ val result = results.get(tp)
+ if (result.isEmpty) {
+ responseFuture.completeExceptionally(new
IllegalStateException(s"Delete status $result should have partition $tp."))
+ } else if (result.get.errorCode != Errors.NONE.code) {
+
responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode).exception)
+ } else {
+ responseFuture.complete(null)
+ }
+ },
+ allowInternalTopicDeletion = true
+ )
+ responseFuture
+ }
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 67cc1a84c56..cb7005f4020 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
* Delete records on leader replicas of the partition, and wait for delete
records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or
logStartOffset of all live replicas have reached the specified offset
*/
- private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition,
Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+ private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition,
Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition,
LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets
[%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) =>
- // reject delete records operation on internal topics
- if (Topic.isInternal(topicPartition.topic)) {
+ // reject delete records operation for internal topics unless
allowInternalTopicDeletion is true
+ if (Topic.isInternal(topicPartition.topic) &&
!allowInternalTopicDeletion) {
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new
InvalidTopicException(s"Cannot delete records of internal topic
${topicPartition.topic}"))))
} else {
try {
@@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,
def deleteRecords(timeout: Long,
offsetPerPartition: Map[TopicPartition, Long],
- responseCallback: Map[TopicPartition,
DeleteRecordsPartitionResult] => Unit): Unit = {
+ responseCallback: Map[TopicPartition,
DeleteRecordsPartitionResult] => Unit,
+ allowInternalTopicDeletion: Boolean = false): Unit = {
val timeBeforeLocalDeleteRecords = time.milliseconds
- val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
+ val localDeleteRecordsResults =
deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion)
debug("Delete records on local log in %d ms".format(time.milliseconds -
timeBeforeLocalDeleteRecords))
val deleteRecordsStatus = localDeleteRecordsResults.map { case
(topicPartition, result) =>
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index f12d21019a7..9b192e851e9 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -20,13 +20,14 @@ import kafka.server.ReplicaManager
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull,
assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
@@ -238,4 +239,83 @@ class CoordinatorPartitionWriterTest {
batch
))
}
+
+ @Test
+ def testDeleteRecordsResponseContainsError(): Unit = {
+ val replicaManager = mock(classOf[ReplicaManager])
+ val partitionRecordWriter = new CoordinatorPartitionWriter(
+ replicaManager
+ )
+
+ val callbackCapture: ArgumentCaptor[Map[TopicPartition,
DeleteRecordsPartitionResult] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition,
DeleteRecordsPartitionResult] => Unit])
+
+ // Response contains error.
+ when(replicaManager.deleteRecords(
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(),
+ callbackCapture.capture(),
+ ArgumentMatchers.eq(true)
+ )).thenAnswer { _ =>
+ callbackCapture.getValue.apply(Map(
+ new TopicPartition("random-topic", 0) -> new
DeleteRecordsPartitionResult()
+ .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code
+ )))
+ }
+
+ partitionRecordWriter.deleteRecords(
+ new TopicPartition("random-topic", 0),
+ 10L
+ ).whenComplete { (_, exp) =>
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp)
+ }
+
+ // Empty response
+ when(replicaManager.deleteRecords(
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(),
+ callbackCapture.capture(),
+ ArgumentMatchers.eq(true)
+ )).thenAnswer { _ =>
+ callbackCapture.getValue.apply(Map[TopicPartition,
DeleteRecordsPartitionResult]())
+ }
+
+ partitionRecordWriter.deleteRecords(
+ new TopicPartition("random-topic", 0),
+ 10L
+ ).whenComplete { (_, exp) =>
+ assertTrue(exp.isInstanceOf[IllegalStateException])
+ }
+ }
+
+ @Test
+ def testDeleteRecordsSuccess(): Unit = {
+ val replicaManager = mock(classOf[ReplicaManager])
+ val partitionRecordWriter = new CoordinatorPartitionWriter(
+ replicaManager
+ )
+
+ val callbackCapture: ArgumentCaptor[Map[TopicPartition,
DeleteRecordsPartitionResult] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition,
DeleteRecordsPartitionResult] => Unit])
+
+ // response contains error
+ when(replicaManager.deleteRecords(
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(),
+ callbackCapture.capture(),
+ ArgumentMatchers.eq(true)
+ )).thenAnswer { _ =>
+ callbackCapture.getValue.apply(Map(
+ new TopicPartition("random-topic", 0) -> new
DeleteRecordsPartitionResult()
+ .setErrorCode(Errors.NONE.code)
+ ))
+ }
+
+ partitionRecordWriter.deleteRecords(
+ new TopicPartition("random-topic", 0),
+ 10L
+ ).whenComplete { (_, exp) =>
+ assertNull(exp)
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a36598a5eeb..27cd6644bd9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,7 @@ import
org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator,
GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorConfigTest}
+import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
@@ -11702,7 +11702,7 @@ class KafkaApisTest extends Logging {
val response = getReadShareGroupResponse(
readRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
readStateResultData
@@ -11757,7 +11757,7 @@ class KafkaApisTest extends Logging {
val response = getReadShareGroupResponse(
readRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
readStateResultData
@@ -11812,7 +11812,7 @@ class KafkaApisTest extends Logging {
val response = getWriteShareGroupResponse(
writeRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
writeStateResultData
@@ -11867,7 +11867,7 @@ class KafkaApisTest extends Logging {
val response = getWriteShareGroupResponse(
writeRequestData,
- config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
writeStateResultData
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ce45fcb6093..044a7d490d7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -34,7 +34,8 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel,
Node, TopicIdPartit
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidPidMappingException,
KafkaStorageException}
-import org.apache.kafka.common.message.LeaderAndIsrRequestData
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.{DeleteRecordsResponseData,
LeaderAndIsrRequestData}
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import
org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
@@ -6660,6 +6661,61 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = {
+ val localId = 1
+ val topicPartition0 = new TopicIdPartition(FOO_UUID, 0,
Topic.GROUP_METADATA_TOPIC_NAME)
+ val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+ val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
localId, setupLogDirMetaProperties = true, directoryEventHandler =
directoryEventHandler)
+ val directoryIds = rm.logManager.directoryIdsSet.toList
+ assertEquals(directoryIds.size, 2)
+ val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId,
isStartIdLeader = true, directoryIds = directoryIds)
+ val (partition: Partition, _) =
rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta,
FOO_UUID).get
+
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(),
1, localId, Seq(1, 2)),
+ new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
+ None)
+
+ def callback(responseStatus: Map[TopicPartition,
DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
+ assert(responseStatus.values.head.errorCode ==
Errors.INVALID_TOPIC_EXCEPTION.code)
+ }
+
+ // default internal topics delete disabled
+ rm.deleteRecords(
+ timeout = 0L,
+ Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L),
+ responseCallback = callback
+ )
+ }
+
+ @Test
+ def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = {
+ val localId = 1
+ val topicPartition0 = new TopicIdPartition(FOO_UUID, 0,
Topic.GROUP_METADATA_TOPIC_NAME)
+ val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+ val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
localId, setupLogDirMetaProperties = true, directoryEventHandler =
directoryEventHandler)
+ val directoryIds = rm.logManager.directoryIdsSet.toList
+ assertEquals(directoryIds.size, 2)
+ val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId,
isStartIdLeader = true, directoryIds = directoryIds)
+ val (partition: Partition, _) =
rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta,
FOO_UUID).get
+
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(),
1, localId, Seq(1, 2)),
+ new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
+ None)
+
+ def callback(responseStatus: Map[TopicPartition,
DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
+ assert(responseStatus.values.head.errorCode == Errors.NONE.code)
+ }
+
+ // internal topics delete allowed
+ rm.deleteRecords(
+ timeout = 0L,
+ Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L),
+ responseCallback = callback,
+ allowInternalTopicDeletion = true
+ )
+ }
+
private def readFromLogWithOffsetOutOfRange(tp: TopicPartition):
Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true,
shouldMockLog = true)
try {
diff --git
a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
index a27a19914c9..58bc774fe20 100644
---
a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Utils;
import java.util.Optional;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
public static final int APPEND_LINGER_MS_DEFAULT = 10;
public static final String APPEND_LINGER_MS_DOC = "The duration in
milliseconds that the share coordinator will wait for writes to accumulate
before flushing them to disk.";
+ public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG =
"share.coordinator.state.topic.prune.interval.ms";
+ public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 *
1000; // 5 minutes
+ public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The
duration in milliseconds that the share coordinator will wait between pruning
eligible records in share-group state topic.";
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT,
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT,
atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT,
atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
- .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT,
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC);
+ .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT,
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
+ .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT,
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);
private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
@@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
private final int loadBufferSize;
private final CompressionType compressionType;
private final int appendLingerMs;
+ private final int pruneIntervalMs;
public ShareCoordinatorConfig(AbstractConfig config) {
@@ -108,6 +115,7 @@ public class ShareCoordinatorConfig {
.map(CompressionType::forId)
.orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
+ pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
validate();
}
@@ -151,6 +159,10 @@ public class ShareCoordinatorConfig {
return compressionType;
}
+ public int shareCoordinatorTopicPruneIntervalMs() {
+ return pruneIntervalMs;
+ }
+
private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 &&
snapshotUpdateRecordsPerSnapshot <= 500,
String.format("%s must be between [0, 500]",
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
new file mode 100644
index 00000000000..69070f65e93
--- /dev/null
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineLong;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Util class to track the offsets written into the internal topic
+ * per share partition key.
+ * It calculates the minimum offset globally up to which the records
+ * in the internal partition are redundant i.e. they have been overridden
+ * by newer records.
+ */
+public class ShareCoordinatorOffsetsManager {
+
+ // Map to store share partition key => current partition offset
+ // being written.
+ private final TimelineHashMap<SharePartitionKey, Long> offsets;
+
+ // Minimum offset representing the smallest necessary offset
+ // across the internal partition (offsets below this are redundant).
+ // We are using timeline object here because the offsets which are passed
into
+ // updateState might not be committed yet. In case of retry, these offsets
would
+ // be invalidated via the snapshot registry. Hence, using timeline object
+ // the values would automatically revert in accordance with the last
committed offset.
+ private final TimelineLong lastRedundantOffset;
+
+ public ShareCoordinatorOffsetsManager(SnapshotRegistry snapshotRegistry) {
+ Objects.requireNonNull(snapshotRegistry);
+ offsets = new TimelineHashMap<>(snapshotRegistry, 0);
+ lastRedundantOffset = new TimelineLong(snapshotRegistry);
+ lastRedundantOffset.set(Long.MAX_VALUE); // For easy application of
Math.min.
+ }
+
+ /**
+ * Method updates internal state with the supplied offset for the provided
+ * share partition key. It then calculates the minimum offset, if possible,
+ * below which all offsets are redundant.
+ *
+ * @param key - represents {@link SharePartitionKey} whose offset needs
updating
+ * @param offset - represents the latest partition offset for provided key
+ */
+ public void updateState(SharePartitionKey key, long offset) {
+ lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset));
+ offsets.put(key, offset);
+
+ Optional<Long> redundantOffset = findRedundantOffset();
+ redundantOffset.ifPresent(lastRedundantOffset::set);
+ }
+
+ private Optional<Long> findRedundantOffset() {
+ if (offsets.isEmpty()) {
+ return Optional.empty();
+ }
+
+ long soFar = Long.MAX_VALUE;
+
+ for (long offset : offsets.values()) {
+ // Get min offset among latest offsets
+ // for all share keys in the internal partition.
+ soFar = Math.min(soFar, offset);
+
+ // lastRedundantOffset represents the smallest necessary offset
+ // and if soFar equals it, we cannot proceed. This can happen
+ // if a share partition key hasn't had records written for a while.
+ // For example,
+ // <p>
+ // key1:1
+ // key2:2 4 6
+ // key3:3 5 7
+ // <p>
+ // We can see in above that offsets 2, 4, 3, 5 are redundant,
+ // but we do not have a contiguous prefix starting at
lastRedundantOffset
+ // and we cannot proceed.
+ if (soFar == lastRedundantOffset.get()) {
+ return Optional.of(soFar);
+ }
+ }
+
+ return Optional.of(soFar);
+ }
+
+ /**
+ * Most recent last redundant offset. This method is to be used
+ * when the caller wants to query the value of such offset.
+ * @return Optional of type Long representing the offset or empty for
invalid offset values
+ */
+ public Optional<Long> lastRedundantOffset() {
+ long value = lastRedundantOffset.get();
+ if (value <= 0 || value == Long.MAX_VALUE) {
+ return Optional.empty();
+ }
+
+ return Optional.of(value);
+ }
+
+ // visible for testing
+ TimelineHashMap<SharePartitionKey, Long> curState() {
+ return offsets;
+ }
+}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index ced4e532b03..71dd2d88056 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -48,6 +48,7 @@ import org.apache.kafka.server.config.ShareCoordinatorConfig;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
@@ -60,6 +61,7 @@ import java.util.Map;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
@@ -75,6 +77,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
private final ShareCoordinatorMetrics shareCoordinatorMetrics;
private volatile int numPartitions = -1; // Number of partitions for
__share_group_state. Provided when component is started.
private final Time time;
+ private final Timer timer;
+ private final PartitionWriter writer;
+ private final Map<TopicPartition, Long> lastPrunedOffsets;
public static class Builder {
private final int nodeId;
@@ -184,7 +189,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
config,
runtime,
coordinatorMetrics,
- time
+ time,
+ timer,
+ writer
);
}
}
@@ -194,12 +201,18 @@ public class ShareCoordinatorService implements
ShareCoordinator {
ShareCoordinatorConfig config,
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime,
ShareCoordinatorMetrics shareCoordinatorMetrics,
- Time time) {
+ Time time,
+ Timer timer,
+ PartitionWriter writer
+ ) {
this.log = logContext.logger(ShareCoordinatorService.class);
this.config = config;
this.runtime = runtime;
this.shareCoordinatorMetrics = shareCoordinatorMetrics;
this.time = time;
+ this.timer = timer;
+ this.writer = writer;
+ this.lastPrunedOffsets = new ConcurrentHashMap<>();
}
@Override
@@ -240,9 +253,82 @@ public class ShareCoordinatorService implements
ShareCoordinator {
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
+ setupRecordPruning();
log.info("Startup complete.");
}
+ private void setupRecordPruning() {
+ log.info("Scheduling share-group state topic prune job.");
+ timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs())
{
+ @Override
+ public void run() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ runtime.activeTopicPartitions().forEach(tp ->
futures.add(performRecordPruning(tp)));
+
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{}))
+ .whenComplete((res, exp) -> {
+ if (exp != null) {
+ log.error("Received error in share-group state
topic prune.", exp);
+ }
+ // Perpetual recursion, failure or not.
+ setupRecordPruning();
+ });
+ }
+ });
+ }
+
+ private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
+ // This future will always be completed normally, exception or not.
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+
+ runtime.scheduleWriteOperation(
+ "write-state-record-prune",
+ tp,
+ Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ ShareCoordinatorShard::lastRedundantOffset
+ ).whenComplete((result, exception) -> {
+ if (exception != null) {
+ log.debug("Last redundant offset for tp {} lookup threw an
error.", tp, exception);
+ Errors error = Errors.forException(exception);
+ // These errors might result from partition metadata not loaded
+ // or shard re-election. Will cause unnecessary noise, hence
not logging
+ if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) ||
error.equals(Errors.NOT_COORDINATOR))) {
+ log.error("Last redundant offset lookup for tp {} threw an
error.", tp, exception);
+ fut.completeExceptionally(exception);
+ return;
+ }
+ fut.complete(null);
+ return;
+ }
+ if (result.isPresent()) {
+ Long off = result.get();
+ Long lastPrunedOffset = lastPrunedOffsets.get(tp);
+ if (lastPrunedOffset != null && lastPrunedOffset.longValue()
== off) {
+ log.debug("{} already pruned till offset {}", tp, off);
+ fut.complete(null);
+ return;
+ }
+
+ log.info("Pruning records in {} till offset {}.", tp, off);
+ writer.deleteRecords(tp, off)
+ .whenComplete((res, exp) -> {
+ if (exp != null) {
+ log.debug("Exception while deleting records in {}
till offset {}.", tp, off, exp);
+ fut.completeExceptionally(exp);
+ return;
+ }
+ fut.complete(null);
+ // Best effort prevention of issuing duplicate delete
calls.
+ lastPrunedOffsets.put(tp, off);
+ });
+ } else {
+ log.debug("No offset value for tp {} found.", tp);
+ fut.complete(null);
+ }
+ });
+ return fut;
+ }
+
@Override
public void shutdown() {
if (!isActive.compareAndSet(true, false)) {
@@ -543,8 +629,10 @@ public class ShareCoordinatorService implements
ShareCoordinator {
@Override
public void onResignation(int partitionIndex, OptionalInt
partitionLeaderEpoch) {
throwIfNotActive();
+ TopicPartition tp = new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex);
+ lastPrunedOffsets.remove(tp);
runtime.scheduleUnloadOperation(
- new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionIndex),
+ tp,
partitionLeaderEpoch
);
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index e8ca9ce6b8d..6f2a9c27b74 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -73,6 +73,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
private final TimelineHashMap<SharePartitionKey, Integer>
snapshotUpdateCount;
private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
private MetadataImage metadataImage;
+ private final ShareCoordinatorOffsetsManager offsetsManager;
public static final Exception NULL_TOPIC_ID = new Exception("The topic id
cannot be null.");
public static final Exception NEGATIVE_PARTITION_ID = new Exception("The
partition id cannot be a negative number.");
@@ -162,6 +163,17 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
CoordinatorMetrics coordinatorMetrics,
CoordinatorMetricsShard metricsShard,
SnapshotRegistry snapshotRegistry
+ ) {
+ this(logContext, config, coordinatorMetrics, metricsShard,
snapshotRegistry, new ShareCoordinatorOffsetsManager(snapshotRegistry));
+ }
+
+ ShareCoordinatorShard(
+ LogContext logContext,
+ ShareCoordinatorConfig config,
+ CoordinatorMetrics coordinatorMetrics,
+ CoordinatorMetricsShard metricsShard,
+ SnapshotRegistry snapshotRegistry,
+ ShareCoordinatorOffsetsManager offsetsManager
) {
this.log = logContext.logger(ShareCoordinatorShard.class);
this.config = config;
@@ -171,6 +183,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.offsetsManager = offsetsManager;
}
@Override
@@ -195,7 +208,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
switch (key.version()) {
case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: //
ShareSnapshot
- handleShareSnapshot((ShareSnapshotKey) key.message(),
(ShareSnapshotValue) messageOrNull(value));
+ handleShareSnapshot((ShareSnapshotKey) key.message(),
(ShareSnapshotValue) messageOrNull(value), offset);
break;
case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: //
ShareUpdate
handleShareUpdate((ShareUpdateKey) key.message(),
(ShareUpdateValue) messageOrNull(value));
@@ -205,7 +218,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
}
- private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue
value) {
+ private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue
value, long offset) {
SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
@@ -219,6 +232,8 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
snapshotUpdateCount.put(mapKey, 0);
}
}
+
+ offsetsManager.updateState(mapKey, offset);
}
private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value)
{
@@ -378,10 +393,22 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(Collections.singletonList(record),
responseData);
}
+ /**
+ * Method which returns the last known redundant offset from the partition
+ * led by this shard.
+ * @return CoordinatorResult containing empty record list and an
Optional<Long> representing the offset.
+ */
+ public CoordinatorResult<Optional<Long>, CoordinatorRecord>
lastRedundantOffset() {
+ return new CoordinatorResult<>(
+ Collections.emptyList(),
+ this.offsetsManager.lastRedundantOffset()
+ );
+ }
+
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for
a key, based on various conditions.
* <p>
- * if no snapshot has been created for the key => create a new
ShareSnapshot record
+ * If no snapshot has been created for the key => create a new
ShareSnapshot record
* else if number of ShareUpdate records for key >= max allowed per
snapshot per key => create a new ShareSnapshot record
* else create a new ShareUpdate record
*
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
new file mode 100644
index 00000000000..262f166be19
--- /dev/null
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShareCoordinatorOffsetsManagerTest {
+
+ private ShareCoordinatorOffsetsManager manager;
+ private static final SharePartitionKey KEY1 =
SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 0);
+ private static final SharePartitionKey KEY2 =
SharePartitionKey.getInstance("gs2", Uuid.randomUuid(), 0);
+ private static final SharePartitionKey KEY3 =
SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 1);
+ private static final SharePartitionKey KEY4 =
SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 7);
+
+ @BeforeEach
+ public void setUp() {
+ manager = new ShareCoordinatorOffsetsManager(new SnapshotRegistry(new
LogContext()));
+ }
+
+ @Test
+ public void testUpdateStateAddsToInternalState() {
+ manager.updateState(KEY1, 0L);
+ assertEquals(Optional.empty(), manager.lastRedundantOffset());
+
+ manager.updateState(KEY1, 10L);
+ assertEquals(Optional.of(10L), manager.lastRedundantOffset()); //
[0-9] offsets are redundant.
+
+ manager.updateState(KEY2, 15L);
+ assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // No
update to last redundant after adding 15L so, still 10L.
+
+ assertEquals(10L, manager.curState().get(KEY1));
+ assertEquals(15L, manager.curState().get(KEY2));
+ }
+
+ private static class ShareOffsetTestHolder {
+ static class TestTuple {
+ final SharePartitionKey key;
+ final long offset;
+ final Optional<Long> expectedOffset;
+
+ private TestTuple(SharePartitionKey key, long offset,
Optional<Long> expectedOffset) {
+ this.key = key;
+ this.offset = offset;
+ this.expectedOffset = expectedOffset;
+ }
+
+ static TestTuple instance(SharePartitionKey key, long offset,
Optional<Long> expectedOffset) {
+ return new TestTuple(key, offset, expectedOffset);
+ }
+ }
+
+ private final String testName;
+ private final List<TestTuple> tuples;
+ private final boolean shouldRun;
+
+ ShareOffsetTestHolder(String testName, List<TestTuple> tuples) {
+ this(testName, tuples, true);
+ }
+
+ ShareOffsetTestHolder(String testName, List<TestTuple> tuples, boolean
shouldRun) {
+ this.testName = testName;
+ this.tuples = tuples;
+ this.shouldRun = shouldRun;
+ }
+ }
+
+ static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() {
+ return Stream.of(
+ new ShareOffsetTestHolder(
+ "no redundant state single key",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L))
+ )
+ ),
+
+ new ShareOffsetTestHolder(
+ "no redundant state multiple keys",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L,
Optional.of(10L))
+ )
+ )
+ );
+ }
+
+ static Stream<ShareOffsetTestHolder> generateRedundantStateCases() {
+ return Stream.of(
+ new ShareOffsetTestHolder(
+ "redundant state single key",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L,
Optional.of(11L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 15L,
Optional.of(15L))
+ )
+ ),
+
+ new ShareOffsetTestHolder(
+ "redundant state multiple keys",
+ // KEY1: 10 17
+ // KEY2: 11 16
+ // KEY3: 15
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L,
Optional.of(10L)), // KEY2 11 redundant but should not be returned
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L,
Optional.of(15L))
+ )
+ )
+ );
+
+ }
+
+ static Stream<ShareOffsetTestHolder> generateComplexCases() {
+ return Stream.of(
+ new ShareOffsetTestHolder(
+ "redundant state reverse key order",
+ // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1.
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L,
Optional.of(18L))
+ )
+ ),
+
+ new ShareOffsetTestHolder(
+ "redundant state infrequently written partition.",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 25L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L,
Optional.of(27L))
+ )
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateNoRedundantStateCases")
+ public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) {
+ if (holder.shouldRun) {
+ holder.tuples.forEach(tuple -> {
+ manager.updateState(tuple.key, tuple.offset);
+ assertEquals(tuple.expectedOffset,
manager.lastRedundantOffset(), holder.testName);
+ });
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateRedundantStateCases")
+ public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) {
+ if (holder.shouldRun) {
+ holder.tuples.forEach(tuple -> {
+ manager.updateState(tuple.key, tuple.offset);
+ assertEquals(tuple.expectedOffset,
manager.lastRedundantOffset(), holder.testName);
+ });
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateComplexCases")
+ public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) {
+ if (holder.shouldRun) {
+ holder.tuples.forEach(tuple -> {
+ manager.updateState(tuple.key, tuple.offset);
+ assertEquals(tuple.expectedOffset,
manager.lastRedundantOffset(), holder.testName);
+ });
+ }
+ }
+}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index fab60d22f75..c725c824031 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -35,9 +35,13 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.server.util.timer.Timer;
import org.junit.jupiter.api.Test;
@@ -45,6 +49,8 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -56,8 +62,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -66,7 +74,10 @@ class ShareCoordinatorServiceTest {
@SuppressWarnings("unchecked")
private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>
mockRuntime() {
- return (CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>)
mock(CoordinatorRuntime.class);
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mock(CoordinatorRuntime.class);
+ when(runtime.activeTopicPartitions())
+ .thenReturn(Collections.singletonList(new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
+ return runtime;
}
@Test
@@ -74,10 +85,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ new MockTimer(),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -95,10 +108,12 @@ class ShareCoordinatorServiceTest {
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L).thenReturn(150L);
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
coordinatorMetrics,
- time
+ time,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -203,10 +218,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -273,7 +290,7 @@ class ShareCoordinatorServiceTest {
.setDeliveryState((byte) 0)
)))
);
-
+
when(runtime.scheduleWriteOperation(
eq("read-update-leader-epoch-state"),
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
@@ -303,10 +320,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -348,10 +367,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -393,10 +414,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -470,10 +493,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -531,10 +556,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -579,10 +606,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -620,10 +649,12 @@ class ShareCoordinatorServiceTest {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
runtime,
new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -646,11 +677,13 @@ class ShareCoordinatorServiceTest {
public void testPartitionFor() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
ShareCoordinatorService service = new ShareCoordinatorService(
- new LogContext(),
-
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
- runtime,
- new ShareCoordinatorMetrics(),
- Time.SYSTEM
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -673,4 +706,422 @@ class ShareCoordinatorServiceTest {
// asCoordinatorKey does not discriminate on topic name.
assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey());
}
+
+ @Test
+ public void testRecordPruningTaskPeriodicityWithAllSuccess() throws
Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(11L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(2))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(2))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskPeriodicityWithSomeFailures() throws
Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+ TopicPartition tp1 = new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0);
+ TopicPartition tp2 = new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1);
+
+ when(runtime.activeTopicPartitions())
+ .thenReturn(List.of(tp1, tp2));
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(writer.deleteRecords(
+ any(),
+ eq(20L)
+ )).thenReturn(
+ CompletableFuture.failedFuture(new Exception("bad stuff"))
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ eq(tp1),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(11L))
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ eq(tp2),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(20L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(21L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 2);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // Prune should be called.
+ verify(runtime, times(2)) // For 2 topic partitions.
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // Prune should be called as future
completes exceptionally.
+ verify(runtime, times(4)) // Second prune with 2 topic partitions.
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(4))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskException() throws Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(0))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskSuccess() throws Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(1))
+ .deleteRecords(any(), eq(20L));
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskEmptyOffsetReturned() throws Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(0))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws
Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(2))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(1))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
+
+ @Test
+ public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic()
throws Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+ CompletableFuture<Void> fut1 = new CompletableFuture<>();
+ fut1.completeExceptionally(new Exception("bad stuff"));
+
+ when(writer.deleteRecords(
+ any(),
+ eq(10L)
+ )).thenReturn(
+ fut1
+ ).thenReturn(
+ CompletableFuture.completedFuture(null)
+ );
+
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ ).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(10L))
+ );
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(1))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(30005L); // prune should be called
+ verify(runtime, times(2))
+ .scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any());
+
+ verify(writer, times(2))
+ .deleteRecords(any(), anyLong());
+ service.shutdown();
+ }
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index d7ddc2e0f44..37e331dceec 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -79,11 +80,12 @@ class ShareCoordinatorShardTest {
private final SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
private MetadataImage metadataImage = null;
private Map<String, String> configOverrides = new HashMap<>();
+ ShareCoordinatorOffsetsManager offsetsManager =
mock(ShareCoordinatorOffsetsManager.class);
ShareCoordinatorShard build() {
if (metadataImage == null) metadataImage =
mock(MetadataImage.class, RETURNS_DEEP_STUBS);
if (config == null) {
- config =
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap(configOverrides));
+ config =
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap(configOverrides));
}
ShareCoordinatorShard shard = new ShareCoordinatorShard(
@@ -91,7 +93,8 @@ class ShareCoordinatorShardTest {
config,
coordinatorMetrics,
metricsShard,
- snapshotRegistry
+ snapshotRegistry,
+ offsetsManager
);
when(metadataImage.topics().getTopic((Uuid)
any())).thenReturn(mock(TopicImage.class));
when(metadataImage.topics().getPartition(any(),
anyInt())).thenReturn(mock(PartitionRegistration.class));
@@ -103,6 +106,11 @@ class ShareCoordinatorShardTest {
this.configOverrides = configOverrides;
return this;
}
+
+ public ShareCoordinatorShardBuilder
setOffsetsManager(ShareCoordinatorOffsetsManager offsetsManager) {
+ this.offsetsManager = offsetsManager;
+ return this;
+ }
}
private void writeAndReplayDefaultRecord(ShareCoordinatorShard shard) {
@@ -796,6 +804,17 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard(),
times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
+ @Test
+ public void testLastRedundantOffset() {
+ ShareCoordinatorOffsetsManager manager =
mock(ShareCoordinatorOffsetsManager.class);
+ ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder()
+ .setOffsetsManager(manager)
+ .build();
+
+ when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L));
+ assertEquals(new CoordinatorResult<>(Collections.emptyList(),
Optional.of(10L)), shard.lastRedundantOffset());
+ }
+
@Test
public void testReadStateLeaderEpochUpdateSuccess() {
ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
similarity index 95%
rename from
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
rename to
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index 5f8c37fc1e6..31b5bd88bdb 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ShareCoordinatorConfigTest {
+public class ShareCoordinatorTestConfig {
private static final List<ConfigDef> CONFIG_DEF_LIST =
Collections.singletonList(
ShareCoordinatorConfig.CONFIG_DEF
@@ -50,6 +50,7 @@ public class ShareCoordinatorConfigTest {
configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555");
configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10");
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG,
String.valueOf(CompressionType.NONE.id));
+
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG,
"30000"); // 30 seconds
return configs;
}