This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4c5ea05ec85 KAFKA-18058: Share group state record pruning impl. 
(#18014)
4c5ea05ec85 is described below

commit 4c5ea05ec85aba18abae1308f78345ff949659a5
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Dec 12 13:08:03 2024 +0530

    KAFKA-18058: Share group state record pruning impl. (#18014)
    
    In this PR, we've added a class ShareCoordinatorOffsetsManager, which 
tracks the last redundant offset for each share group state topic partition. We 
have also added a periodic timer job in ShareCoordinatorService which queries 
for the redundant offset at regular intervals and if a valid value is found, 
issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In 
this way the size of the partitions is kept manageable.
    
    Reviewers: Jun Rao <[email protected]>, David Jacot <[email protected]>, 
Andrew Schofield <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  20 +
 .../common/runtime/PartitionWriter.java            |  11 +
 .../common/runtime/InMemoryPartitionWriter.java    |   8 +
 .../group/CoordinatorPartitionWriter.scala         |  21 +
 .../main/scala/kafka/server/ReplicaManager.scala   |  11 +-
 .../group/CoordinatorPartitionWriterTest.scala     |  82 +++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  10 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  58 ++-
 .../server/config/ShareCoordinatorConfig.java      |  14 +-
 .../share/ShareCoordinatorOffsetsManager.java      | 122 +++++
 .../coordinator/share/ShareCoordinatorService.java |  94 +++-
 .../coordinator/share/ShareCoordinatorShard.java   |  33 +-
 .../share/ShareCoordinatorOffsetsManagerTest.java  | 209 +++++++++
 .../share/ShareCoordinatorServiceTest.java         | 505 +++++++++++++++++++--
 .../share/ShareCoordinatorShardTest.java           |  23 +-
 ...igTest.java => ShareCoordinatorTestConfig.java} |   3 +-
 16 files changed, 1175 insertions(+), 49 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1c21038e66a..ee0cf18212a 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -52,6 +52,7 @@ import org.slf4j.Logger;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -2468,4 +2469,23 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         Utils.closeQuietly(runtimeMetrics, "runtime metrics");
         log.info("Coordinator runtime closed.");
     }
+
+    /**
+     * Util method which returns all the topic partitions for which
+     * the state machine is in active state.
+     * <p>
+     * This could be useful if the caller does not have a specific
+     * target internal topic partition.
+     * @return List of {@link TopicPartition} whose coordinators are active
+     */
+    public List<TopicPartition> activeTopicPartitions() {
+        if (coordinators == null || coordinators.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        return coordinators.entrySet().stream()
+            .filter(entry -> 
entry.getValue().state.equals(CoordinatorState.ACTIVE))
+            .map(Map.Entry::getKey)
+            .toList();
+    }
 }
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
index 47df8bcae34..cb8bec3f71c 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
@@ -107,4 +107,15 @@ public interface PartitionWriter {
         short producerEpoch,
         short apiVersion
     ) throws KafkaException;
+
+    /**
+     * Delete records from a topic partition until specified offset
+     * @param tp                    The partition to delete records from
+     * @param deleteBeforeOffset    Offset to delete until, starting from the 
beginning
+     * @throws KafkaException       Any KafkaException caught during the 
operation.
+     */
+    CompletableFuture<Void> deleteRecords(
+        TopicPartition tp,
+        long deleteBeforeOffset
+    ) throws KafkaException;
 }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
index 1f676ad550f..a8551f0734b 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
@@ -115,6 +115,14 @@ public class InMemoryPartitionWriter implements 
PartitionWriter {
         }
     }
 
+    @Override
+    public CompletableFuture<Void> deleteRecords(
+        TopicPartition tp,
+        long deleteBeforeOffset
+    ) throws KafkaException {
+        throw new RuntimeException("method not implemented");
+    }
+
     @Override
     public CompletableFuture<VerificationGuard> 
maybeStartTransactionVerification(
         TopicPartition tp,
diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index f70819ef438..211be799a7e 100644
--- 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -165,4 +165,25 @@ class CoordinatorPartitionWriter(
     // Required offset.
     partitionResult.lastOffset + 1
   }
+
+  override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): 
CompletableFuture[Void] = {
+    val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]()
+
+    replicaManager.deleteRecords(
+      timeout = 30000L, // 30 seconds.
+      offsetPerPartition = Map(tp -> deleteBeforeOffset),
+      responseCallback = results => {
+        val result = results.get(tp)
+        if (result.isEmpty) {
+          responseFuture.completeExceptionally(new 
IllegalStateException(s"Delete status $result should have partition $tp."))
+        } else if (result.get.errorCode != Errors.NONE.code) {
+          
responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode).exception)
+        } else {
+          responseFuture.complete(null)
+        }
+      },
+      allowInternalTopicDeletion = true
+    )
+    responseFuture
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 67cc1a84c56..cb7005f4020 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
    * Delete records on leader replicas of the partition, and wait for delete 
records operation be propagated to other replicas;
    * the callback function will be triggered either when timeout or 
logStartOffset of all live replicas have reached the specified offset
    */
-  private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, 
Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+  private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, 
Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, 
LogDeleteRecordsResult] = {
     trace("Delete records on local logs to offsets 
[%s]".format(offsetPerPartition))
     offsetPerPartition.map { case (topicPartition, requestedOffset) =>
-      // reject delete records operation on internal topics
-      if (Topic.isInternal(topicPartition.topic)) {
+      // reject delete records operation for internal topics unless 
allowInternalTopicDeletion is true
+      if (Topic.isInternal(topicPartition.topic) && 
!allowInternalTopicDeletion) {
         (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new 
InvalidTopicException(s"Cannot delete records of internal topic 
${topicPartition.topic}"))))
       } else {
         try {
@@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,
 
   def deleteRecords(timeout: Long,
                     offsetPerPartition: Map[TopicPartition, Long],
-                    responseCallback: Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit): Unit = {
+                    responseCallback: Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit,
+                    allowInternalTopicDeletion: Boolean = false): Unit = {
     val timeBeforeLocalDeleteRecords = time.milliseconds
-    val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
+    val localDeleteRecordsResults = 
deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion)
     debug("Delete records on local log in %d ms".format(time.milliseconds - 
timeBeforeLocalDeleteRecords))
 
     val deleteRecordsStatus = localDeleteRecordsResults.map { case 
(topicPartition, result) =>
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index f12d21019a7..9b192e851e9 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -20,13 +20,14 @@ import kafka.server.ReplicaManager
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
VerificationGuard}
 import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
@@ -238,4 +239,83 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer { _ =>
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code
+          )))
+    }
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete { (_, exp) =>
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp)
+    }
+
+    // Empty response
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer { _ =>
+      callbackCapture.getValue.apply(Map[TopicPartition, 
DeleteRecordsPartitionResult]())
+    }
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete { (_, exp) =>
+      assertTrue(exp.isInstanceOf[IllegalStateException])
+    }
+  }
+
+  @Test
+  def testDeleteRecordsSuccess(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // response contains error
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer { _ =>
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NONE.code)
+      ))
+    }
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete { (_, exp) =>
+      assertNull(exp)
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a36598a5eeb..27cd6644bd9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,7 @@ import 
org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
 import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorConfigTest}
+import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorTestConfig}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.LeaderAndIsr
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
@@ -11702,7 +11702,7 @@ class KafkaApisTest extends Logging {
 
     val response = getReadShareGroupResponse(
       readRequestData,
-      config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
       verifyNoErr = true,
       null,
       readStateResultData
@@ -11757,7 +11757,7 @@ class KafkaApisTest extends Logging {
 
     val response = getReadShareGroupResponse(
       readRequestData,
-      config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
       verifyNoErr = false,
       authorizer,
       readStateResultData
@@ -11812,7 +11812,7 @@ class KafkaApisTest extends Logging {
 
     val response = getWriteShareGroupResponse(
       writeRequestData,
-      config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
       verifyNoErr = true,
       null,
       writeStateResultData
@@ -11867,7 +11867,7 @@ class KafkaApisTest extends Logging {
 
     val response = getWriteShareGroupResponse(
       writeRequestData,
-      config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
       verifyNoErr = false,
       authorizer,
       writeStateResultData
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ce45fcb6093..044a7d490d7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -34,7 +34,8 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, 
Node, TopicIdPartit
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.{InvalidPidMappingException, 
KafkaStorageException}
-import org.apache.kafka.common.message.LeaderAndIsrRequestData
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.{DeleteRecordsResponseData, 
LeaderAndIsrRequestData}
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import 
org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
@@ -6660,6 +6661,61 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = {
+    val localId = 1
+    val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, 
Topic.GROUP_METADATA_TOPIC_NAME)
+    val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+    val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), 
localId, setupLogDirMetaProperties = true, directoryEventHandler = 
directoryEventHandler)
+    val directoryIds = rm.logManager.directoryIdsSet.toList
+    assertEquals(directoryIds.size, 2)
+    val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, 
isStartIdLeader = true, directoryIds = directoryIds)
+    val (partition: Partition, _) = 
rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, 
FOO_UUID).get
+    
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(),
 1, localId, Seq(1, 2)),
+      new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
+      None)
+
+    def callback(responseStatus: Map[TopicPartition, 
DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
+      assert(responseStatus.values.head.errorCode == 
Errors.INVALID_TOPIC_EXCEPTION.code)
+    }
+
+    // default internal topics delete disabled
+    rm.deleteRecords(
+      timeout = 0L,
+      Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L),
+      responseCallback = callback
+    )
+  }
+
+  @Test
+  def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = {
+    val localId = 1
+    val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, 
Topic.GROUP_METADATA_TOPIC_NAME)
+    val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+    val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), 
localId, setupLogDirMetaProperties = true, directoryEventHandler = 
directoryEventHandler)
+      val directoryIds = rm.logManager.directoryIdsSet.toList
+      assertEquals(directoryIds.size, 2)
+      val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, 
isStartIdLeader = true, directoryIds = directoryIds)
+      val (partition: Partition, _) = 
rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, 
FOO_UUID).get
+      
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(),
 1, localId, Seq(1, 2)),
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
+        None)
+
+    def callback(responseStatus: Map[TopicPartition, 
DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
+      assert(responseStatus.values.head.errorCode == Errors.NONE.code)
+    }
+
+    // internal topics delete allowed
+    rm.deleteRecords(
+      timeout = 0L,
+      Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L),
+      responseCallback = callback,
+      allowInternalTopicDeletion = true
+    )
+  }
+
   private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
     try {
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
 
b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
index a27a19914c9..58bc774fe20 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/ShareCoordinatorConfig.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.util.Optional;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
     public static final int APPEND_LINGER_MS_DEFAULT = 10;
     public static final String APPEND_LINGER_MS_DOC = "The duration in 
milliseconds that the share coordinator will wait for writes to accumulate 
before flushing them to disk.";
 
+    public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = 
"share.coordinator.state.topic.prune.interval.ms";
+    public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 
1000; // 5 minutes
+    public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The 
duration in milliseconds that the share coordinator will wait between pruning 
eligible records in share-group state topic.";
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, 
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_NUM_PARTITIONS_DOC)
         .define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, 
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
         .define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, 
atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
         .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) 
STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, 
STATE_TOPIC_COMPRESSION_CODEC_DOC)
         .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, 
atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
-        .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, 
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC);
+        .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, 
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
+        .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);
 
     private final int stateTopicNumPartitions;
     private final short stateTopicReplicationFactor;
@@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
     private final int loadBufferSize;
     private final CompressionType compressionType;
     private final int appendLingerMs;
+    private final int pruneIntervalMs;
 
 
     public ShareCoordinatorConfig(AbstractConfig config) {
@@ -108,6 +115,7 @@ public class ShareCoordinatorConfig {
             .map(CompressionType::forId)
             .orElse(null);
         appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
+        pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
         validate();
     }
 
@@ -151,6 +159,10 @@ public class ShareCoordinatorConfig {
         return compressionType;
     }
 
+    public int shareCoordinatorTopicPruneIntervalMs() {
+        return pruneIntervalMs;
+    }
+
     private void validate() {
         Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && 
snapshotUpdateRecordsPerSnapshot <= 500,
             String.format("%s must be between [0, 500]", 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
new file mode 100644
index 00000000000..69070f65e93
--- /dev/null
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineLong;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Util class to track the offsets written into the internal topic
+ * per share partition key.
+ * It calculates the minimum offset globally up to which the records
+ * in the internal partition are redundant i.e. they have been overridden
+ * by newer records.
+ */
+public class ShareCoordinatorOffsetsManager {
+
+    // Map to store share partition key => current partition offset
+    // being written.
+    private final TimelineHashMap<SharePartitionKey, Long> offsets;
+
+    // Minimum offset representing the smallest necessary offset
+    // across the internal partition (offsets below this are redundant).
+    // We are using timeline object here because the offsets which are passed 
into
+    // updateState might not be committed yet. In case of retry, these offsets 
would
+    // be invalidated via the snapshot registry. Hence, using timeline object
+    // the values would automatically revert in accordance with the last 
committed offset.
+    private final TimelineLong lastRedundantOffset;
+
+    public ShareCoordinatorOffsetsManager(SnapshotRegistry snapshotRegistry) {
+        Objects.requireNonNull(snapshotRegistry);
+        offsets = new TimelineHashMap<>(snapshotRegistry, 0);
+        lastRedundantOffset = new TimelineLong(snapshotRegistry);
+        lastRedundantOffset.set(Long.MAX_VALUE);  // For easy application of 
Math.min.
+    }
+
+    /**
+     * Method updates internal state with the supplied offset for the provided
+     * share partition key. It then calculates the minimum offset, if possible,
+     * below which all offsets are redundant.
+     *
+     * @param key    - represents {@link SharePartitionKey} whose offset needs 
updating
+     * @param offset - represents the latest partition offset for provided key
+     */
+    public void updateState(SharePartitionKey key, long offset) {
+        lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset));
+        offsets.put(key, offset);
+
+        Optional<Long> redundantOffset = findRedundantOffset();
+        redundantOffset.ifPresent(lastRedundantOffset::set);
+    }
+
+    private Optional<Long> findRedundantOffset() {
+        if (offsets.isEmpty()) {
+            return Optional.empty();
+        }
+
+        long soFar = Long.MAX_VALUE;
+
+        for (long offset : offsets.values()) {
+            // Get min offset among latest offsets
+            // for all share keys in the internal partition.
+            soFar = Math.min(soFar, offset);
+
+            // lastRedundantOffset represents the smallest necessary offset
+            // and if soFar equals it, we cannot proceed. This can happen
+            // if a share partition key hasn't had records written for a while.
+            // For example,
+            // <p>
+            // key1:1
+            // key2:2 4 6
+            // key3:3 5 7
+            // <p>
+            // We can see in above that offsets 2, 4, 3, 5 are redundant,
+            // but we do not have a contiguous prefix starting at 
lastRedundantOffset
+            // and we cannot proceed.
+            if (soFar == lastRedundantOffset.get()) {
+                return Optional.of(soFar);
+            }
+        }
+
+        return Optional.of(soFar);
+    }
+
+    /**
+     * Most recent last redundant offset. This method is to be used
+     * when the caller wants to query the value of such offset.
+     * @return Optional of type Long representing the offset or empty for 
invalid offset values
+     */
+    public Optional<Long> lastRedundantOffset() {
+        long value = lastRedundantOffset.get();
+        if (value <= 0 || value == Long.MAX_VALUE) {
+            return Optional.empty();
+        }
+
+        return Optional.of(value);
+    }
+
+    // visible for testing
+    TimelineHashMap<SharePartitionKey, Long> curState() {
+        return offsets;
+    }
+}
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index ced4e532b03..71dd2d88056 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -48,6 +48,7 @@ import org.apache.kafka.server.config.ShareCoordinatorConfig;
 import org.apache.kafka.server.record.BrokerCompressionType;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
 
 import org.slf4j.Logger;
 
@@ -60,6 +61,7 @@ import java.util.Map;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
@@ -75,6 +77,9 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     private final ShareCoordinatorMetrics shareCoordinatorMetrics;
     private volatile int numPartitions = -1; // Number of partitions for 
__share_group_state. Provided when component is started.
     private final Time time;
+    private final Timer timer;
+    private final PartitionWriter writer;
+    private final Map<TopicPartition, Long> lastPrunedOffsets;
 
     public static class Builder {
         private final int nodeId;
@@ -184,7 +189,9 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 config,
                 runtime,
                 coordinatorMetrics,
-                time
+                time,
+                timer,
+                writer
             );
         }
     }
@@ -194,12 +201,18 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         ShareCoordinatorConfig config,
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime,
         ShareCoordinatorMetrics shareCoordinatorMetrics,
-        Time time) {
+        Time time,
+        Timer timer,
+        PartitionWriter writer
+    ) {
         this.log = logContext.logger(ShareCoordinatorService.class);
         this.config = config;
         this.runtime = runtime;
         this.shareCoordinatorMetrics = shareCoordinatorMetrics;
         this.time = time;
+        this.timer = timer;
+        this.writer = writer;
+        this.lastPrunedOffsets = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -240,9 +253,82 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning() {
+        log.info("Scheduling share-group state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share-group state 
topic prune.", exp);
+                        }
+                        // Perpetual recursion, failure or not.
+                        setupRecordPruning();
+                    });
+            }
+        });
+    }
+
+    private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
+        // This future will always be completed normally, exception or not.
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+
+        runtime.scheduleWriteOperation(
+            "write-state-record-prune",
+            tp,
+            Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+            ShareCoordinatorShard::lastRedundantOffset
+        ).whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.debug("Last redundant offset for tp {} lookup threw an 
error.", tp, exception);
+                Errors error = Errors.forException(exception);
+                // These errors might result from partition metadata not loaded
+                // or shard re-election. Will cause unnecessary noise, hence 
not logging
+                if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || 
error.equals(Errors.NOT_COORDINATOR))) {
+                    log.error("Last redundant offset lookup for tp {} threw an 
error.", tp, exception);
+                    fut.completeExceptionally(exception);
+                    return;
+                }
+                fut.complete(null);
+                return;
+            }
+            if (result.isPresent()) {
+                Long off = result.get();
+                Long lastPrunedOffset = lastPrunedOffsets.get(tp);
+                if (lastPrunedOffset != null && lastPrunedOffset.longValue() 
== off) {
+                    log.debug("{} already pruned till offset {}", tp, off);
+                    fut.complete(null);
+                    return;
+                }
+
+                log.info("Pruning records in {} till offset {}.", tp, off);
+                writer.deleteRecords(tp, off)
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.debug("Exception while deleting records in {} 
till offset {}.", tp, off, exp);
+                            fut.completeExceptionally(exp);
+                            return;
+                        }
+                        fut.complete(null);
+                        // Best effort prevention of issuing duplicate delete 
calls.
+                        lastPrunedOffsets.put(tp, off);
+                    });
+            } else {
+                log.debug("No offset value for tp {} found.", tp);
+                fut.complete(null);
+            }
+        });
+        return fut;
+    }
+
     @Override
     public void shutdown() {
         if (!isActive.compareAndSet(true, false)) {
@@ -543,8 +629,10 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     @Override
     public void onResignation(int partitionIndex, OptionalInt 
partitionLeaderEpoch) {
         throwIfNotActive();
+        TopicPartition tp = new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex);
+        lastPrunedOffsets.remove(tp);
         runtime.scheduleUnloadOperation(
-            new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 
partitionIndex),
+            tp,
             partitionLeaderEpoch
         );
     }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index e8ca9ce6b8d..6f2a9c27b74 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -73,6 +73,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     private final TimelineHashMap<SharePartitionKey, Integer> 
snapshotUpdateCount;
     private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
     private MetadataImage metadataImage;
+    private final ShareCoordinatorOffsetsManager offsetsManager;
 
     public static final Exception NULL_TOPIC_ID = new Exception("The topic id 
cannot be null.");
     public static final Exception NEGATIVE_PARTITION_ID = new Exception("The 
partition id cannot be a negative number.");
@@ -162,6 +163,17 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         CoordinatorMetrics coordinatorMetrics,
         CoordinatorMetricsShard metricsShard,
         SnapshotRegistry snapshotRegistry
+    ) {
+        this(logContext, config, coordinatorMetrics, metricsShard, 
snapshotRegistry, new ShareCoordinatorOffsetsManager(snapshotRegistry));
+    }
+
+    ShareCoordinatorShard(
+        LogContext logContext,
+        ShareCoordinatorConfig config,
+        CoordinatorMetrics coordinatorMetrics,
+        CoordinatorMetricsShard metricsShard,
+        SnapshotRegistry snapshotRegistry,
+        ShareCoordinatorOffsetsManager offsetsManager
     ) {
         this.log = logContext.logger(ShareCoordinatorShard.class);
         this.config = config;
@@ -171,6 +183,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
         this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
         this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.offsetsManager = offsetsManager;
     }
 
     @Override
@@ -195,7 +208,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         switch (key.version()) {
             case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: // 
ShareSnapshot
-                handleShareSnapshot((ShareSnapshotKey) key.message(), 
(ShareSnapshotValue) messageOrNull(value));
+                handleShareSnapshot((ShareSnapshotKey) key.message(), 
(ShareSnapshotValue) messageOrNull(value), offset);
                 break;
             case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: // 
ShareUpdate
                 handleShareUpdate((ShareUpdateKey) key.message(), 
(ShareUpdateValue) messageOrNull(value));
@@ -205,7 +218,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         }
     }
 
-    private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue 
value) {
+    private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue 
value, long offset) {
         SharePartitionKey mapKey = 
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
         maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
         maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
@@ -219,6 +232,8 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 snapshotUpdateCount.put(mapKey, 0);
             }
         }
+
+        offsetsManager.updateState(mapKey, offset);
     }
 
     private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) 
{
@@ -378,10 +393,22 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
 
+    /**
+     * Method which returns the last known redundant offset from the partition
+     * led by this shard.
+     * @return CoordinatorResult containing empty record list and an 
Optional<Long> representing the offset.
+     */
+    public CoordinatorResult<Optional<Long>, CoordinatorRecord> 
lastRedundantOffset() {
+        return new CoordinatorResult<>(
+            Collections.emptyList(),
+            this.offsetsManager.lastRedundantOffset()
+        );
+    }
+
     /**
      * Util method to generate a ShareSnapshot or ShareUpdate type record for 
a key, based on various conditions.
      * <p>
-     * if no snapshot has been created for the key => create a new 
ShareSnapshot record
+     * If no snapshot has been created for the key => create a new 
ShareSnapshot record
      * else if number of ShareUpdate records for key >= max allowed per 
snapshot per key => create a new ShareSnapshot record
      * else create a new ShareUpdate record
      *
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
new file mode 100644
index 00000000000..262f166be19
--- /dev/null
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShareCoordinatorOffsetsManagerTest {
+
+    private ShareCoordinatorOffsetsManager manager;
+    private static final SharePartitionKey KEY1 = 
SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 0);
+    private static final SharePartitionKey KEY2 = 
SharePartitionKey.getInstance("gs2", Uuid.randomUuid(), 0);
+    private static final SharePartitionKey KEY3 = 
SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 1);
+    private static final SharePartitionKey KEY4 = 
SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 7);
+
+    @BeforeEach
+    public void setUp() {
+        manager = new ShareCoordinatorOffsetsManager(new SnapshotRegistry(new 
LogContext()));
+    }
+
+    @Test
+    public void testUpdateStateAddsToInternalState() {
+        manager.updateState(KEY1, 0L);
+        assertEquals(Optional.empty(), manager.lastRedundantOffset());
+
+        manager.updateState(KEY1, 10L);
+        assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // 
[0-9] offsets are redundant.
+
+        manager.updateState(KEY2, 15L);
+        assertEquals(Optional.of(10L), manager.lastRedundantOffset());  // No 
update to last redundant after adding 15L so, still 10L.
+
+        assertEquals(10L, manager.curState().get(KEY1));
+        assertEquals(15L, manager.curState().get(KEY2));
+    }
+
+    private static class ShareOffsetTestHolder {
+        static class TestTuple {
+            final SharePartitionKey key;
+            final long offset;
+            final Optional<Long> expectedOffset;
+
+            private TestTuple(SharePartitionKey key, long offset, 
Optional<Long> expectedOffset) {
+                this.key = key;
+                this.offset = offset;
+                this.expectedOffset = expectedOffset;
+            }
+
+            static TestTuple instance(SharePartitionKey key, long offset, 
Optional<Long> expectedOffset) {
+                return new TestTuple(key, offset, expectedOffset);
+            }
+        }
+
+        private final String testName;
+        private final List<TestTuple> tuples;
+        private final boolean shouldRun;
+
+        ShareOffsetTestHolder(String testName, List<TestTuple> tuples) {
+            this(testName, tuples, true);
+        }
+
+        ShareOffsetTestHolder(String testName, List<TestTuple> tuples, boolean 
shouldRun) {
+            this.testName = testName;
+            this.tuples = tuples;
+            this.shouldRun = shouldRun;
+        }
+    }
+
+    static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() {
+        return Stream.of(
+            new ShareOffsetTestHolder(
+                "no redundant state single key",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L))
+                )
+            ),
+
+            new ShareOffsetTestHolder(
+                "no redundant state multiple keys",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, 
Optional.of(10L))
+                )
+            )
+        );
+    }
+
+    static Stream<ShareOffsetTestHolder> generateRedundantStateCases() {
+        return Stream.of(
+            new ShareOffsetTestHolder(
+                "redundant state single key",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, 
Optional.of(11L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 15L, 
Optional.of(15L))
+                )
+            ),
+
+            new ShareOffsetTestHolder(
+                "redundant state multiple keys",
+                // KEY1: 10 17
+                // KEY2: 11 16
+                // KEY3: 15
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, 
Optional.of(10L)),  // KEY2 11 redundant but should not be returned
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, 
Optional.of(15L))
+                )
+            )
+        );
+
+    }
+
+    static Stream<ShareOffsetTestHolder> generateComplexCases() {
+        return Stream.of(
+            new ShareOffsetTestHolder(
+                "redundant state reverse key order",
+                // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1.
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, 
Optional.of(18L))
+                )
+            ),
+
+            new ShareOffsetTestHolder(
+                "redundant state infrequently written partition.",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 25L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, 
Optional.of(27L))
+                )
+            )
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("generateNoRedundantStateCases")
+    public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) {
+        if (holder.shouldRun) {
+            holder.tuples.forEach(tuple -> {
+                manager.updateState(tuple.key, tuple.offset);
+                assertEquals(tuple.expectedOffset, 
manager.lastRedundantOffset(), holder.testName);
+            });
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("generateRedundantStateCases")
+    public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) {
+        if (holder.shouldRun) {
+            holder.tuples.forEach(tuple -> {
+                manager.updateState(tuple.key, tuple.offset);
+                assertEquals(tuple.expectedOffset, 
manager.lastRedundantOffset(), holder.testName);
+            });
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("generateComplexCases")
+    public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) {
+        if (holder.shouldRun) {
+            holder.tuples.forEach(tuple -> {
+                manager.updateState(tuple.key, tuple.offset);
+                assertEquals(tuple.expectedOffset, 
manager.lastRedundantOffset(), holder.testName);
+            });
+        }
+    }
+}
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index fab60d22f75..c725c824031 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -35,9 +35,13 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.util.FutureUtils;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.server.util.timer.Timer;
 
 import org.junit.jupiter.api.Test;
 
@@ -45,6 +49,8 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -56,8 +62,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -66,7 +74,10 @@ class ShareCoordinatorServiceTest {
 
     @SuppressWarnings("unchecked")
     private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> 
mockRuntime() {
-        return (CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>) 
mock(CoordinatorRuntime.class);
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
 mock(CoordinatorRuntime.class);
+        when(runtime.activeTopicPartitions())
+            .thenReturn(Collections.singletonList(new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
+        return runtime;
     }
 
     @Test
@@ -74,10 +85,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            new MockTimer(),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -95,10 +108,12 @@ class ShareCoordinatorServiceTest {
         
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L).thenReturn(150L);
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             coordinatorMetrics,
-            time
+            time,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -203,10 +218,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -273,7 +290,7 @@ class ShareCoordinatorServiceTest {
                         .setDeliveryState((byte) 0)
                 )))
             );
-
+        
         when(runtime.scheduleWriteOperation(
             eq("read-update-leader-epoch-state"),
             eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
@@ -303,10 +320,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -348,10 +367,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -393,10 +414,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -470,10 +493,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -531,10 +556,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -579,10 +606,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -620,10 +649,12 @@ class ShareCoordinatorServiceTest {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
             new LogContext(),
-            
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
             runtime,
             new ShareCoordinatorMetrics(),
-            Time.SYSTEM
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -646,11 +677,13 @@ class ShareCoordinatorServiceTest {
     public void testPartitionFor() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         ShareCoordinatorService service = new ShareCoordinatorService(
-                new LogContext(),
-                
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
-                runtime,
-                new ShareCoordinatorMetrics(),
-                Time.SYSTEM
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -673,4 +706,422 @@ class ShareCoordinatorServiceTest {
         // asCoordinatorKey does not discriminate on topic name.
         assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey());
     }
+
+    @Test
+    public void testRecordPruningTaskPeriodicityWithAllSuccess() throws 
Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        when(writer.deleteRecords(
+            any(),
+            eq(10L)
+        )).thenReturn(
+            CompletableFuture.completedFuture(null)
+        );
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        )).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(10L))
+        ).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(11L))
+        );
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 1);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(1))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(2))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        verify(writer, times(2))
+            .deleteRecords(any(), anyLong());
+        service.shutdown();
+    }
+
+    @Test
+    public void testRecordPruningTaskPeriodicityWithSomeFailures() throws 
Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+        TopicPartition tp1 = new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0);
+        TopicPartition tp2 = new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1);
+
+        when(runtime.activeTopicPartitions())
+            .thenReturn(List.of(tp1, tp2));
+
+        when(writer.deleteRecords(
+            any(),
+            eq(10L)
+        )).thenReturn(
+            CompletableFuture.completedFuture(null)
+        );
+
+        when(writer.deleteRecords(
+            any(),
+            eq(20L)
+        )).thenReturn(
+            CompletableFuture.failedFuture(new Exception("bad stuff"))
+        );
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            eq(tp1),
+            any(),
+            any()
+        )).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(10L))
+        ).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(11L))
+        );
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            eq(tp2),
+            any(),
+            any()
+        )).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(20L))
+        ).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(21L))
+        );
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 2);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // Prune should be called.
+        verify(runtime, times(2))   // For 2 topic partitions.
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // Prune should be called as future 
completes exceptionally.
+        verify(runtime, times(4))   // Second prune with 2 topic partitions.
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        verify(writer, times(4))
+            .deleteRecords(any(), anyLong());
+        service.shutdown();
+    }
+
+    @Test
+    public void testRecordPruningTaskException() throws Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 1);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(1))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        verify(writer, times(0))
+            .deleteRecords(any(), anyLong());
+        service.shutdown();
+    }
+
+    @Test
+    public void testRecordPruningTaskSuccess() throws Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        )).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 1);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(1))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        verify(writer, times(1))
+            .deleteRecords(any(), eq(20L));
+        service.shutdown();
+    }
+
+    @Test
+    public void testRecordPruningTaskEmptyOffsetReturned() throws Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        )).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 1);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(1))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        verify(writer, times(0))
+            .deleteRecords(any(), anyLong());
+        service.shutdown();
+    }
+
+    @Test
+    public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws 
Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        when(writer.deleteRecords(
+            any(),
+            eq(10L)
+        )).thenReturn(
+            CompletableFuture.completedFuture(null)
+        );
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        )).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(10L))
+        ).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(10L))
+        );
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 1);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(1))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(2))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        verify(writer, times(1))
+            .deleteRecords(any(), anyLong());
+        service.shutdown();
+    }
+
+    @Test
+    public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() 
throws Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+        CompletableFuture<Void> fut1 = new CompletableFuture<>();
+        fut1.completeExceptionally(new Exception("bad stuff"));
+
+        when(writer.deleteRecords(
+            any(),
+            eq(10L)
+        )).thenReturn(
+            fut1
+        ).thenReturn(
+            CompletableFuture.completedFuture(null)
+        );
+
+        when(runtime.scheduleWriteOperation(
+            eq("write-state-record-prune"),
+            any(),
+            any(),
+            any()
+        )).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(10L))
+        ).thenReturn(
+            CompletableFuture.completedFuture(Optional.of(10L))
+        );
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 1);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(1))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(30005L); // prune should be called
+        verify(runtime, times(2))
+            .scheduleWriteOperation(
+                eq("write-state-record-prune"),
+                any(),
+                any(),
+                any());
+
+        verify(writer, times(2))
+            .deleteRecords(any(), anyLong());
+        service.shutdown();
+    }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index d7ddc2e0f44..37e331dceec 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -52,6 +52,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -79,11 +80,12 @@ class ShareCoordinatorShardTest {
         private final SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
         private MetadataImage metadataImage = null;
         private Map<String, String> configOverrides = new HashMap<>();
+        ShareCoordinatorOffsetsManager offsetsManager = 
mock(ShareCoordinatorOffsetsManager.class);
 
         ShareCoordinatorShard build() {
             if (metadataImage == null) metadataImage = 
mock(MetadataImage.class, RETURNS_DEEP_STUBS);
             if (config == null) {
-                config = 
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap(configOverrides));
+                config = 
ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap(configOverrides));
             }
 
             ShareCoordinatorShard shard = new ShareCoordinatorShard(
@@ -91,7 +93,8 @@ class ShareCoordinatorShardTest {
                 config,
                 coordinatorMetrics,
                 metricsShard,
-                snapshotRegistry
+                snapshotRegistry,
+                offsetsManager
             );
             when(metadataImage.topics().getTopic((Uuid) 
any())).thenReturn(mock(TopicImage.class));
             when(metadataImage.topics().getPartition(any(), 
anyInt())).thenReturn(mock(PartitionRegistration.class));
@@ -103,6 +106,11 @@ class ShareCoordinatorShardTest {
             this.configOverrides = configOverrides;
             return this;
         }
+
+        public ShareCoordinatorShardBuilder 
setOffsetsManager(ShareCoordinatorOffsetsManager offsetsManager) {
+            this.offsetsManager = offsetsManager;
+            return this;
+        }
     }
 
     private void writeAndReplayDefaultRecord(ShareCoordinatorShard shard) {
@@ -796,6 +804,17 @@ class ShareCoordinatorShardTest {
         verify(shard.getMetricsShard(), 
times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testLastRedundantOffset() {
+        ShareCoordinatorOffsetsManager manager = 
mock(ShareCoordinatorOffsetsManager.class);
+        ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder()
+            .setOffsetsManager(manager)
+            .build();
+
+        when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L));
+        assertEquals(new CoordinatorResult<>(Collections.emptyList(), 
Optional.of(10L)), shard.lastRedundantOffset());
+    }
+
     @Test
     public void testReadStateLeaderEpochUpdateSuccess() {
         ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
similarity index 95%
rename from 
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
rename to 
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index 5f8c37fc1e6..31b5bd88bdb 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ShareCoordinatorConfigTest {
+public class ShareCoordinatorTestConfig {
 
     private static final List<ConfigDef> CONFIG_DEF_LIST = 
Collections.singletonList(
         ShareCoordinatorConfig.CONFIG_DEF
@@ -50,6 +50,7 @@ public class ShareCoordinatorConfigTest {
         configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555");
         configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10");
         
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, 
String.valueOf(CompressionType.NONE.id));
+        
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, 
"30000");  // 30 seconds
         return configs;
     }
 

Reply via email to