This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f2f31100c4b KAFKA-20611: Stitch share group DLQ manager with the main
code. (#22372)
f2f31100c4b is described below
commit f2f31100c4b7aef96a86e77d5ecf47ef7f9f3468
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri May 29 03:44:48 2026 +0530
KAFKA-20611: Stitch share group DLQ manager with the main code. (#22372)
* Add ShareGroupDLQManager instance creation code in BrokerServer and
pass along the instance to SharePartitionManager to be handed over to
SharePartition.
NOTE: Merge after https://github.com/apache/kafka/pull/22368 Reviewers:
Apoorv Mittal <[email protected]>
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 22 +++++++------
.../kafka/server/share/SharePartitionManager.java | 26 ++++++++++++----
.../src/main/scala/kafka/server/BrokerServer.scala | 36 +++++++++++++++++++++-
.../server/share/SharePartitionManagerTest.java | 11 ++++++-
.../kafka/server/share/SharePartitionTest.java | 12 ++++++--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
.../group/modern/share/ShareGroupConfig.java | 14 ++++++++-
7 files changed, 102 insertions(+), 20 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 02d0cb14413..bfa209b4819 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -42,7 +42,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
-import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
@@ -330,15 +329,16 @@ public class SharePartition {
private long fetchLockIdleDurationMs;
/**
- * Reference to the dlq manager implementation.
+ * Supplier to toggle DLQ support.
*/
- private final ShareGroupDLQManager shareGroupDLQ = new
NoOpShareGroupDLQManager();
+ private final Supplier<Boolean> shareGroupDlqEnableSupplier;
/**
- * Supplier to toggle dlq support.
+ * Reference to the DLQ manager implementation.
*/
- private final Supplier<Boolean> shareGroupDlqEnableSupplier;
+ private final ShareGroupDLQManager shareGroupDLQManager;
+ @SuppressWarnings("ParameterNumber")
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
@@ -352,11 +352,13 @@ public class SharePartition {
ReplicaManager replicaManager,
ShareGroupConfigProvider configProvider,
SharePartitionListener listener,
- Supplier<Boolean> shareGroupDlqEnableSupplier
+ Supplier<Boolean> shareGroupDlqEnableSupplier,
+ ShareGroupDLQManager shareGroupDLQManager
) {
this(groupId, topicIdPartition, leaderEpoch,
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, configProvider,
SharePartitionState.EMPTY, listener,
- new SharePartitionMetrics(groupId, topicIdPartition.topic(),
topicIdPartition.partition()), shareGroupDlqEnableSupplier);
+ new SharePartitionMetrics(groupId, topicIdPartition.topic(),
topicIdPartition.partition()), shareGroupDlqEnableSupplier,
+ shareGroupDLQManager);
}
// Visible for testing
@@ -376,7 +378,8 @@ public class SharePartition {
SharePartitionState sharePartitionState,
SharePartitionListener listener,
SharePartitionMetrics sharePartitionMetrics,
- Supplier<Boolean> shareGroupDlqEnableSupplier
+ Supplier<Boolean> shareGroupDlqEnableSupplier,
+ ShareGroupDLQManager shareGroupDLQManager
) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
@@ -403,6 +406,7 @@ public class SharePartition {
this.registerGaugeMetrics();
this.deliveryCompleteCount = new AtomicInteger(0);
this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+ this.shareGroupDLQManager = shareGroupDLQManager;
}
/**
@@ -3337,7 +3341,7 @@ public class SharePartition {
void initiateDLQAndArchive(InFlightState updatedState, long firstOffset,
long lastOffset, short deliveryCount, Throwable
dlqCause) {
// Step 1: Enqueue to DLQ
- shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
+ shareGroupDLQManager.enqueue(new ShareGroupDLQRecordParameter(
groupId, topicIdPartition, firstOffset, lastOffset,
Optional.of(deliveryCount), Optional.ofNullable(dlqCause), false
)).whenComplete((v1, dlqException) -> {
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 3b4db2162ee..531f5278bc5 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -41,6 +41,7 @@ import
org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
@@ -154,6 +155,11 @@ public class SharePartitionManager implements
AutoCloseable {
*/
private final Supplier<Boolean> shareGroupDlqEnableSupplier;
+ /**
+ * Reference to the DLQ manager implementation.
+ */
+ private final ShareGroupDLQManager shareGroupDLQManager;
+
public SharePartitionManager(
ReplicaManager replicaManager,
Time time,
@@ -165,7 +171,8 @@ public class SharePartitionManager implements AutoCloseable
{
Persister persister,
ShareGroupConfigProvider configProvider,
BrokerTopicStats brokerTopicStats,
- Supplier<Boolean> shareGroupDlqEnableSupplier
+ Supplier<Boolean> shareGroupDlqEnableSupplier,
+ ShareGroupDLQManager shareGroupDLQManager
) {
this(replicaManager,
time,
@@ -179,10 +186,12 @@ public class SharePartitionManager implements
AutoCloseable {
configProvider,
new ShareGroupMetrics(time),
brokerTopicStats,
- shareGroupDlqEnableSupplier
+ shareGroupDlqEnableSupplier,
+ shareGroupDLQManager
);
}
+ @SuppressWarnings("ParameterNumber")
private SharePartitionManager(
ReplicaManager replicaManager,
Time time,
@@ -196,7 +205,8 @@ public class SharePartitionManager implements AutoCloseable
{
ShareGroupConfigProvider configProvider,
ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats,
- Supplier<Boolean> shareGroupDlqEnableSupplier
+ Supplier<Boolean> shareGroupDlqEnableSupplier,
+ ShareGroupDLQManager shareGroupDLQManager
) {
this(replicaManager,
time,
@@ -212,7 +222,8 @@ public class SharePartitionManager implements AutoCloseable
{
configProvider,
shareGroupMetrics,
brokerTopicStats,
- shareGroupDlqEnableSupplier
+ shareGroupDlqEnableSupplier,
+ shareGroupDLQManager
);
}
@@ -232,7 +243,8 @@ public class SharePartitionManager implements AutoCloseable
{
ShareGroupConfigProvider configProvider,
ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats,
- Supplier<Boolean> shareGroupDlqEnableSupplier
+ Supplier<Boolean> shareGroupDlqEnableSupplier,
+ ShareGroupDLQManager shareGroupDLQManager
) {
this.replicaManager = replicaManager;
this.time = time;
@@ -249,6 +261,7 @@ public class SharePartitionManager implements AutoCloseable
{
this.brokerTopicStats = brokerTopicStats;
this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+ this.shareGroupDLQManager = shareGroupDLQManager;
}
/**
@@ -733,7 +746,8 @@ public class SharePartitionManager implements AutoCloseable
{
replicaManager,
configProvider,
listener,
- shareGroupDlqEnableSupplier
+ shareGroupDlqEnableSupplier,
+ shareGroupDLQManager
);
});
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f7a9db7a724..04a1ee7672c 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -62,6 +62,7 @@ import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel,
LogManager => JLogManager}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.server.partition.{AlterPartitionManager,
DefaultAlterPartitionManager}
+import org.apache.kafka.server.share.dlq.{DefaultShareGroupDLQManager,
NoOpShareGroupDLQManager, ShareGroupDLQManager}
import java.time.Duration
import java.util
@@ -170,6 +171,8 @@ class BrokerServer(
private var shareGroupTimer: Timer = _
+ private var shareGroupDLQManager: ShareGroupDLQManager = _
+
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus):
Boolean = {
lock.lock()
try {
@@ -386,6 +389,9 @@ class BrokerServer(
/* create persister */
persister = createShareStatePersister()
+ /* create share group DLQ manager */
+ shareGroupDLQManager = createShareGroupDLQManager()
+
partitionMetadataClient = createPartitionMetadataClient(metadataCache)
groupCoordinator = createGroupCoordinator()
@@ -463,7 +469,8 @@ class BrokerServer(
persister,
new ShareGroupConfigProvider(groupConfigManager),
brokerTopicStats,
- () =>
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)).supportsShareGroupDLQ()
+ () =>
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)).supportsShareGroupDLQ(),
+ shareGroupDLQManager
)
dataPlaneRequestProcessor = new KafkaApis(
@@ -750,6 +757,30 @@ class BrokerServer(
}
}
+ private def createShareGroupDLQManager(): ShareGroupDLQManager = {
+ if (config.shareGroupConfig.shareGroupDLQManagerClassName.nonEmpty) {
+ val klass =
Utils.loadClass(config.shareGroupConfig.shareGroupDLQManagerClassName,
classOf[Object]).asInstanceOf[Class[ShareGroupDLQManager]]
+ if (klass.getName.equals(classOf[DefaultShareGroupDLQManager].getName)) {
+ DefaultShareGroupDLQManager.instance(
+ NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config,
metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager
broker=${config.brokerId}]")),
+ new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key =>
shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
+ Time.SYSTEM,
+ shareGroupTimer
+ )
+ } else if
(klass.getName.equals(classOf[NoOpShareGroupDLQManager].getName)) {
+ info("Using no-op share group DLQ manager")
+ new NoOpShareGroupDLQManager()
+ } else {
+ error("Unknown share group DLQ manager specialization specified.
ShareGroupDLQManager is only factory-pluggable!")
+ throw new IllegalArgumentException("Unknown share group DLQ manager
specified " + config.shareGroupConfig.shareGroupDLQManagerClassName)
+ }
+ } else {
+ // in case share group DLQ manager class name deliberately empty (key=)
+ info("Using no-op share group DLQ manager")
+ new NoOpShareGroupDLQManager()
+ }
+ }
+
protected def createRemoteLogManager(listenerInfo: ListenerInfo):
Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled) {
val listenerName =
config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
@@ -883,6 +914,9 @@ class BrokerServer(
if (persister != null)
Utils.swallow(this.logger.underlying, () => persister.stop())
+ if (shareGroupDLQManager != null)
+ Utils.swallow(this.logger.underlying, () =>
shareGroupDLQManager.stop())
+
Utils.closeQuietly(shareGroupTimer, "share group timer")
if (lifecycleManager != null)
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 45d29fd79b3..d1a0d98eb65 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -64,6 +64,8 @@ import
org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
@@ -3249,6 +3251,7 @@ public class SharePartitionManagerTest {
private ShareGroupMetrics shareGroupMetrics = new
ShareGroupMetrics(time);
private BrokerTopicStats brokerTopicStats;
private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
+ private ShareGroupDLQManager shareGroupDLQManager = new
NoOpShareGroupDLQManager();
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager
replicaManager) {
this.replicaManager = replicaManager;
@@ -3290,6 +3293,11 @@ public class SharePartitionManagerTest {
return this;
}
+ private SharePartitionManagerBuilder
withShareGroupDlqManager(ShareGroupDLQManager shareGroupDLQManager) {
+ this.shareGroupDLQManager = shareGroupDLQManager;
+ return this;
+ }
+
public static SharePartitionManagerBuilder builder() {
return new SharePartitionManagerBuilder();
}
@@ -3308,7 +3316,8 @@ public class SharePartitionManagerTest {
new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
shareGroupMetrics,
brokerTopicStats,
- shareGroupDlqEnableSupplier
+ shareGroupDlqEnableSupplier,
+ shareGroupDLQManager
);
}
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 06d09556492..61480cd73bb 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -13318,6 +13319,7 @@ public class SharePartitionTest {
private Time time = MOCK_TIME;
private SharePartitionMetrics sharePartitionMetrics =
Mockito.mock(SharePartitionMetrics.class);
private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
+ private ShareGroupDLQManager shareGroupDLQManager = new
NoOpShareGroupDLQManager();
private SharePartitionBuilder withMaxInflightRecords(int
defaultMaxInflightRecords) {
this.defaultMaxInflightRecords = defaultMaxInflightRecords;
@@ -13369,14 +13371,20 @@ public class SharePartitionTest {
return this;
}
+ private SharePartitionBuilder
withShareGroupDlqManager(ShareGroupDLQManager shareGroupDLQManager) {
+ this.shareGroupDLQManager = shareGroupDLQManager;
+ return this;
+ }
+
public static SharePartitionBuilder builder() {
return new SharePartitionBuilder();
}
public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0,
defaultMaxInflightRecords, defaultMaxDeliveryCount,
- defaultAcquisitionLockTimeoutMs, mockTimer, time,
persister, replicaManager, configProvider,
- state, Mockito.mock(SharePartitionListener.class),
sharePartitionMetrics, shareGroupDlqEnableSupplier);
+ defaultAcquisitionLockTimeoutMs, mockTimer, time, persister,
replicaManager, configProvider,
+ state, Mockito.mock(SharePartitionListener.class),
sharePartitionMetrics, shareGroupDlqEnableSupplier,
+ shareGroupDLQManager);
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2c726371853..a409f8f7cb1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1092,6 +1092,7 @@ class KafkaConfigTest {
case
GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => //
ignore string
case GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore
string
case GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case ShareGroupConfig.SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG =>
//ignore string
/** Streams groups configs */
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index cc72a8b48e3..2ce1cf1f81f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -82,6 +82,11 @@ public class ShareGroupConfig {
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The
fully qualified name of a class which implements " +
"the <code>org.apache.kafka.server.share.Persister</code> interface.";
+ public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG =
"group.share.dlq.manager.class.name";
+ public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DEFAULT =
"org.apache.kafka.server.share.dlq.DefaultShareGroupDLQManager";
+ public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DOC = "The
fully qualified name of a class which implements " +
+ "the
<code>org.apache.kafka.server.share.dlq.ShareGroupDLQManager</code> interface.";
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM,
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
@@ -94,7 +99,8 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT,
SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 2000), MEDIUM,
SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT,
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM,
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT,
SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
- .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING,
SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM,
SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
+ .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING,
SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM,
SHARE_GROUP_PERSISTER_CLASS_NAME_DOC)
+ .defineInternal(SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG, STRING,
SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DEFAULT, null, MEDIUM,
SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DOC);
private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupMaxPartitionMaxRecordLocks;
@@ -108,6 +114,7 @@ public class ShareGroupConfig {
private final int shareFetchPurgatoryPurgeIntervalRequests;
private final int shareGroupMaxShareSessions;
private final String shareGroupPersisterClassName;
+ private final String shareGroupDLQManagerClassName;
private final AbstractConfig config;
public ShareGroupConfig(AbstractConfig config) {
@@ -124,6 +131,7 @@ public class ShareGroupConfig {
shareFetchPurgatoryPurgeIntervalRequests =
config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
shareGroupMaxShareSessions =
config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
shareGroupPersisterClassName =
config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
+ shareGroupDLQManagerClassName =
config.getString(ShareGroupConfig.SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG);
validate();
}
@@ -186,6 +194,10 @@ public class ShareGroupConfig {
return shareGroupPersisterClassName;
}
+ public String shareGroupDLQManagerClassName() {
+ return shareGroupDLQManagerClassName;
+ }
+
private void validate() {
Utils.require(shareGroupMaxDeliveryCountLimit >=
shareGroupDeliveryCountLimit,
String.format("%s must be greater than or equal to %s",