This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d0ef66bf79e KAFKA-19829: Implement group-level initial rebalance delay
(#20755)
d0ef66bf79e is described below
commit d0ef66bf79e3412a0db52597288b1c79bbd909bf
Author: Jinhe Zhang <[email protected]>
AuthorDate: Fri Nov 14 07:41:32 2025 -0500
KAFKA-19829: Implement group-level initial rebalance delay (#20755)
During testing, an artifact of the new rebalance protocol showed up. In
some cases, the first joining member gets all active tasks assigned, and
is slow to revoke the tasks after more member has joined the group. This
affects in particular cases where the first member is slow (possibly
overloaded in the case of cloudlimits benchmarks) and there are a lot of
tasks to be assigned.
To help with this situation, we want to introduce a new group-specific
configuration to delay the initial rebalance.
Main changes:
- add 3s as default delay to `GroupConfig` & `GroupCoordinatorConfig`
- add `is_scheduled method` to timer
- add rebalance delay logic
- check if it's initial rebalance by using `bumpGroup` & `groupEpoch
== 0`
- if it's initial rebalance then trigger the timer and bump the group
by two
- when the timer is end, compute a new target assignment
- add related tests
- ignore epoch check in `DescribeStreamsGroupTest` since it's flaky now
- add `setGroupStreamsInitialRebalanceDelay` to EmbeddedKafkaCluster` so
we can manully set the config.
- Set delay to 0 in most of the tests
Reviewers: Lucas Brutschy <[email protected]>, Sean Quah
<[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 5 +
.../common/runtime/CoordinatorTimer.java | 8 +
.../common/runtime/CoordinatorRuntimeTest.java | 51 +++++
.../common/runtime/MockCoordinatorTimer.java | 8 +
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../server/StreamsGroupHeartbeatRequestTest.scala | 23 +-
.../kafka/coordinator/group/GroupConfig.java | 20 +-
.../coordinator/group/GroupCoordinatorConfig.java | 16 +-
.../coordinator/group/GroupMetadataManager.java | 134 ++++++++++--
.../group/streams/StreamsGroupMember.java | 1 +
.../kafka/coordinator/group/GroupConfigTest.java | 5 +
.../group/GroupCoordinatorConfigTest.java | 24 +++
.../group/GroupMetadataManagerTest.java | 233 +++++++++++++++++----
.../integration/RestoreIntegrationTest.java | 1 +
.../integration/utils/EmbeddedKafkaCluster.java | 13 ++
.../tools/streams/DescribeStreamsGroupTest.java | 32 ++-
16 files changed, 490 insertions(+), 87 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 06017417233..911a167ebf5 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -441,6 +441,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
if (prevTask != null) prevTask.cancel();
}
+ @Override
+ public boolean isScheduled(String key) {
+ return tasks.containsKey(key);
+ }
+
public void cancelAll() {
Iterator<Map.Entry<String, TimerTask>> iterator =
tasks.entrySet().iterator();
while (iterator.hasNext()) {
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
index d10e38a7d82..2f288df3026 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
@@ -84,4 +84,12 @@ public interface CoordinatorTimer<T, U> {
* @param key The key.
*/
void cancel(String key);
+
+ /**
+ * Check if an operation with the given key is scheduled.
+ *
+ * @param key The key.
+ * @return True if an operation with the key is scheduled, false otherwise.
+ */
+ boolean isScheduled(String key);
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index de1aa21f3f1..494b8f34d51 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -2351,6 +2351,57 @@ public class CoordinatorRuntimeTest {
assertEquals(0, ctx.timer.size());
}
+ @Test
+ public void testTimerIsScheduled() throws InterruptedException {
+ MockTimer timer = new MockTimer();
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(new MockPartitionWriter())
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ runtime.scheduleLoadOperation(TP, 10);
+
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0, ctx.timer.size());
+
+ assertFalse(ctx.timer.isScheduled("timer-1"));
+
+ ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false,
+ () -> new CoordinatorResult<>(List.of("record1"), null));
+
+ assertTrue(ctx.timer.isScheduled("timer-1"));
+ assertFalse(ctx.timer.isScheduled("timer-2"));
+ assertEquals(1, ctx.timer.size());
+
+ ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, false,
+ () -> new CoordinatorResult<>(List.of("record2"), null));
+
+ assertTrue(ctx.timer.isScheduled("timer-1"));
+ assertTrue(ctx.timer.isScheduled("timer-2"));
+ assertEquals(2, ctx.timer.size());
+
+ ctx.timer.cancel("timer-1");
+
+ assertFalse(ctx.timer.isScheduled("timer-1"));
+ assertTrue(ctx.timer.isScheduled("timer-2"));
+ assertEquals(1, ctx.timer.size());
+
+ timer.advanceClock(21);
+
+ assertFalse(ctx.timer.isScheduled("timer-2"));
+ assertEquals(0, ctx.timer.size());
+ }
+
@Test
public void testStateChanges() throws Exception {
MockTimer timer = new MockTimer();
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
index 78e14ac576b..69e3954a0a6 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
@@ -110,6 +110,14 @@ public class MockCoordinatorTimer<T, U> implements
CoordinatorTimer<T, U> {
}
}
+ /**
+ * Checks if a timeout with the given key is scheduled.
+ */
+ @Override
+ public boolean isScheduled(String key) {
+ return timeoutMap.containsKey(key);
+ }
+
/**
* @return True if a timeout with the key exists; false otherwise.
*/
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6fe6e0fb44f..284f92b6136 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -364,6 +364,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
+ cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index d3b0e3f2341..2d7f0649333 100644
---
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -36,7 +36,8 @@ import scala.jdk.CollectionConverters._
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
- new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer,streams")
+ new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer,streams"),
+ new ClusterConfigProperty(key =
"group.streams.initial.rebalance.delay.ms", value = "0")
)
)
class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@@ -171,7 +172,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Verify the response
assertNotNull(streamsGroupHeartbeatResponse,
"StreamsGroupHeartbeatResponse should not be null")
assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
- assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+ assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
val expectedStatus = new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(1)
.setStatusDetail(s"Source topics $topicName are missing.")
@@ -207,7 +208,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Active task assignment should be available
assertNotNull(streamsGroupHeartbeatResponse,
"StreamsGroupHeartbeatResponse should not be null")
assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
- assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
+ assertEquals(3, streamsGroupHeartbeatResponse.memberEpoch())
assertEquals(null, streamsGroupHeartbeatResponse.status())
val expectedActiveTasks = List(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -275,7 +276,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Verify first member gets all tasks initially
assertNotNull(streamsGroupHeartbeatResponse1,
"StreamsGroupHeartbeatResponse should not be null")
assertEquals(memberId1, streamsGroupHeartbeatResponse1.memberId())
- assertEquals(1, streamsGroupHeartbeatResponse1.memberEpoch())
+ assertEquals(2, streamsGroupHeartbeatResponse1.memberEpoch())
assertEquals(1, streamsGroupHeartbeatResponse1.activeTasks().size())
assertEquals(3,
streamsGroupHeartbeatResponse1.activeTasks().get(0).partitions().size())
@@ -303,7 +304,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Verify second member gets assigned
assertNotNull(streamsGroupHeartbeatResponse2,
"StreamsGroupHeartbeatResponse should not be null")
assertEquals(memberId2, streamsGroupHeartbeatResponse2.memberId())
- assertEquals(2, streamsGroupHeartbeatResponse2.memberEpoch())
+ assertEquals(3, streamsGroupHeartbeatResponse2.memberEpoch())
// Wait for both members to get their task assignments by sending
heartbeats
// until they both have non-null activeTasks
@@ -431,7 +432,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Verify the member joined successfully
assertNotNull(streamsGroupHeartbeatResponse,
"StreamsGroupHeartbeatResponse should not be null")
assertEquals("test-member", streamsGroupHeartbeatResponse.memberId())
- assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+ assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
// Send a leave request
streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
@@ -508,7 +509,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
val expectedResponse = new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.FENCED_MEMBER_EPOCH.code())
- .setErrorMessage("The streams group member has a greater member epoch
(999) than the one known by the group coordinator (1). The member must abandon
all its partitions and rejoin.")
+ .setErrorMessage("The streams group member has a greater member epoch
(999) than the one known by the group coordinator (2). The member must abandon
all its partitions and rejoin.")
assertEquals(expectedResponse, streamsGroupHeartbeatResponse)
} finally {
admin.close()
@@ -571,7 +572,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Verify the heartbeat was successful
assert(streamsGroupHeartbeatResponse != null,
"StreamsGroupHeartbeatResponse should not be null")
assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
- assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+ assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
// Wait for internal topics to be created
val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
@@ -859,7 +860,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
}, "StreamsGroupHeartbeatRequest did not succeed within the timeout
period.")
val memberEpoch = streamsGroupHeartbeatResponse.memberEpoch()
- assertEquals(1, memberEpoch)
+ assertEquals(2, memberEpoch)
// Blocking the thread for 1 sec so that the session times out and the
member needs to rejoin
Thread.sleep(1000)
@@ -972,7 +973,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Verify the response for member
assert(streamsGroupHeartbeatResponse != null,
"StreamsGroupHeartbeatResponse should not be null")
assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
- assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+ assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
assertNotNull(streamsGroupHeartbeatResponse.activeTasks())
// Restart the only running broker
@@ -997,7 +998,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
// Verify the response. Epoch should not have changed and null
assignments determine that no
// change in old assignment
assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
- assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+ assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
assertNull(streamsGroupHeartbeatResponse.activeTasks())
assertNull(streamsGroupHeartbeatResponse.standbyTasks())
assertNull(streamsGroupHeartbeatResponse.warmupTasks())
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 565d492507b..bdb0cbb9f1c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -75,6 +75,8 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG =
"streams.num.standby.replicas";
+ public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG =
"streams.initial.rebalance.delay.ms";
+
public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs;
@@ -93,6 +95,8 @@ public final class GroupConfig extends AbstractConfig {
public final int streamsNumStandbyReplicas;
+ public final int streamsInitialRebalanceDelayMs;
+
public final String shareIsolationLevel;
private static final ConfigDef CONFIG = new ConfigDef()
@@ -155,7 +159,13 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
atLeast(0),
MEDIUM,
- GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
+ GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
+ .define(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
+ INT,
+
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
+ atLeast(0),
+ MEDIUM,
+
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);
public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
@@ -168,6 +178,7 @@ public final class GroupConfig extends AbstractConfig {
this.streamsSessionTimeoutMs =
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs =
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas =
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+ this.streamsInitialRebalanceDelayMs =
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
}
@@ -379,6 +390,13 @@ public final class GroupConfig extends AbstractConfig {
return streamsNumStandbyReplicas;
}
+ /**
+ * The initial rebalance delay for streams groups.
+ */
+ public int streamsInitialRebalanceDelayMs() {
+ return streamsInitialRebalanceDelayMs;
+ }
+
/**
* The share group isolation level.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 2653be28b3e..9405a32f967 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -289,6 +289,10 @@ public class GroupCoordinatorConfig {
public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The
maximum allowed value for the group-level configuration of " +
GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
+ public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG
= "group.streams.initial.rebalance.delay.ms";
+ public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT =
3000;
+ public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC =
"The amount of time the group coordinator will wait for more streams clients to
join a new group before performing the first rebalance. A longer delay means
potentially fewer rebalances.";
+
public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG
= "group.share.initialize.retry.interval.ms";
// Because persister retries with exp backoff 5 times and upper cap of 30
secs.
public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT =
30_000;
@@ -351,7 +355,8 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT,
STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
.define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT,
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
- .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
+ .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC)
+ .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);
/**
@@ -404,6 +409,7 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxSize;
private final int streamsGroupNumStandbyReplicas;
private final int streamsGroupMaxStandbyReplicas;
+ private final int streamsGroupInitialRebalanceDelayMs;
@SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) {
@@ -456,6 +462,7 @@ public class GroupCoordinatorConfig {
this.streamsGroupMaxSize =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
this.streamsGroupNumStandbyReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsGroupMaxStandbyReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
+ this.streamsGroupInitialRebalanceDelayMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >=
consumerGroupMinHeartbeatIntervalMs,
@@ -960,4 +967,11 @@ public class GroupCoordinatorConfig {
public int streamsGroupMaxNumStandbyReplicas() {
return streamsGroupMaxStandbyReplicas;
}
+
+ /**
+ * The initial rebalance delay for streams groups.
+ */
+ public int streamsGroupInitialRebalanceDelayMs() {
+ return streamsGroupInitialRebalanceDelayMs;
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 81b4feb2f75..f31d2c9a06d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2031,29 +2031,55 @@ public class GroupMetadataManager {
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
- groupEpoch += 1;
+ if (groupEpoch == 0) {
+ groupEpoch = 2;
+ } else {
+ groupEpoch += 1;
+ }
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId,
groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
+ // Schedule initial rebalance delay for new streams groups to coalesce
joins.
+ boolean isInitialRebalance = (group.groupEpoch() == 0);
+ if (isInitialRebalance) {
+ int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
+ if (initialDelayMs > 0) {
+ timer.scheduleIfAbsent(
+ streamsInitialRebalanceKey(groupId),
+ initialDelayMs,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> computeDelayedTargetAssignment(groupId)
+ );
+ }
+ }
+
// 4. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
// replaces an existing static member.
// The delta between the existing and the new target assignment is
persisted to the partition.
int targetAssignmentEpoch;
TasksTuple targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
- targetAssignment = updateStreamsTargetAssignment(
- group,
- groupEpoch,
- updatedMember,
- updatedConfiguredTopology,
- metadataImage,
- records,
- currentAssignmentConfigs
- );
- targetAssignmentEpoch = groupEpoch;
+ boolean initialDelayActive =
timer.isScheduled(streamsInitialRebalanceKey(groupId));
+ if (initialDelayActive && group.assignmentEpoch() == 0) {
+ // During initial rebalance delay, return empty assignment to
first joining members.
+ targetAssignmentEpoch = 1;
+ targetAssignment = TasksTuple.EMPTY;
+ } else {
+ targetAssignment = updateStreamsTargetAssignment(
+ group,
+ groupEpoch,
+ Optional.of(updatedMember),
+ updatedConfiguredTopology,
+ metadataImage,
+ records,
+ currentAssignmentConfigs
+ );
+ targetAssignmentEpoch = groupEpoch;
+ }
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment =
group.targetAssignment(updatedMember.memberId());
@@ -4010,15 +4036,15 @@ public class GroupMetadataManager {
*
* @param group The StreamsGroup.
* @param groupEpoch The group epoch.
- * @param updatedMember The updated member.
+ * @param updatedMember The updated member (optional).
* @param metadataImage The metadata image.
* @param records The list to accumulate any new records.
- * @return The new target assignment.
+ * @return The new target assignment for the updated member, or EMPTY if
no member specified.
*/
private TasksTuple updateStreamsTargetAssignment(
StreamsGroup group,
int groupEpoch,
- StreamsGroupMember updatedMember,
+ Optional<StreamsGroupMember> updatedMember,
ConfiguredTopology configuredTopology,
CoordinatorMetadataImage metadataImage,
List<CoordinatorRecord> records,
@@ -4037,8 +4063,11 @@ public class GroupMetadataManager {
.withTopology(configuredTopology)
.withStaticMembers(group.staticMembers())
.withMetadataImage(metadataImage)
- .withTargetAssignment(group.targetAssignment())
- .addOrUpdateMember(updatedMember.memberId(), updatedMember);
+ .withTargetAssignment(group.targetAssignment());
+
+ updatedMember.ifPresent(member ->
+ assignmentResultBuilder.addOrUpdateMember(member.memberId(),
member)
+ );
long startTimeMs = time.milliseconds();
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult =
@@ -4055,7 +4084,8 @@ public class GroupMetadataManager {
records.addAll(assignmentResult.records());
- return
assignmentResult.targetAssignment().get(updatedMember.memberId());
+ return updatedMember.map(member ->
assignmentResult.targetAssignment().get(member.memberId()))
+ .orElse(TasksTuple.EMPTY);
} catch (TaskAssignorException ex) {
String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",
groupEpoch, ex.getMessage());
@@ -4064,6 +4094,51 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Fires the initial rebalance for a streams group when the delay timer
expires.
+ * Computes and persists target assignment for all members if conditions
are met.
+ *
+ * @param groupId The group id.
+ * @return A CoordinatorResult with records to persist the target
assignment, or EMPTY_RESULT.
+ */
+ private CoordinatorResult<Void, CoordinatorRecord>
computeDelayedTargetAssignment(
+ String groupId
+ ) {
+ try {
+ StreamsGroup group = streamsGroup(groupId);
+
+ if (group.isEmpty()) {
+ log.info("[GroupId {}] Skipping delayed target assignment:
group is empty", groupId);
+ return EMPTY_RESULT;
+ }
+
+ if (group.groupEpoch() <= group.assignmentEpoch()) {
+ throw new IllegalStateException("Group epoch should be always
larger to assignment epoch");
+ }
+
+ if (!group.configuredTopology().isPresent()) {
+ log.warn("[GroupId {}] Cannot compute delayed target
assignment: configured topology is not present", groupId);
+ return EMPTY_RESULT;
+ }
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ updateStreamsTargetAssignment(
+ group,
+ group.groupEpoch(),
+ Optional.empty(),
+ group.configuredTopology().get(),
+ metadataImage,
+ records,
+ group.lastAssignmentConfigs()
+ );
+
+ return new CoordinatorResult<>(records, null);
+ } catch (GroupIdNotFoundException ex) {
+ log.warn("[GroupId {}] Group not found during initial rebalance.",
groupId);
+ return EMPTY_RESULT;
+ }
+ }
+
/**
* Handles leave request from a consumer group member.
* @param groupId The group id from the request.
@@ -8158,6 +8233,7 @@ public class GroupMetadataManager {
* Populates the record list passed in with record to update the state
machine.
* Validations are done in {@link
GroupCoordinatorShard#deleteGroups(AuthorizableRequestContext, List)} by
* calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ * Cancels the Streams initial rebalance delay timer if it is scheduled.
*
* @param groupId The id of the group to be deleted. It has been checked
in {@link GroupMetadataManager#validateDeleteGroup}.
* @param records The record list to populate.
@@ -8172,6 +8248,7 @@ public class GroupMetadataManager {
/**
* Populates the record list passed in with record to update the state
machine.
+ * Cancels the Streams initial rebalance delay timer if it is scheduled.
*
* @param group The group to be deleted.
* @param records The record list to populate.
@@ -8181,6 +8258,7 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records
) {
group.createGroupTombstoneRecords(records);
+ timer.cancel(streamsInitialRebalanceKey(group.groupId()));
}
/**
@@ -8741,6 +8819,15 @@ public class GroupMetadataManager {
.orElse(config.streamsGroupHeartbeatIntervalMs());
}
+ /**
+ * Get the initial rebalance delay of the provided streams group.
+ */
+ private int streamsGroupInitialRebalanceDelayMs(String groupId) {
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs)
+ .orElse(config.streamsGroupInitialRebalanceDelayMs());
+ }
+
/**
* Get the assignor of the provided streams group.
*/
@@ -8798,6 +8885,19 @@ public class GroupMetadataManager {
return "sync-" + groupId;
}
+ /**
+ * Generate a streams group initial rebalance key for the timer.
+ *
+ * Package private for testing.
+ *
+ * @param groupId The group id.
+ *
+ * @return the initial rebalance key.
+ */
+ static String streamsInitialRebalanceKey(String groupId) {
+ return "initial-rebalance-timeout-" + groupId;
+ }
+
/**
* Generate a consumer group join key for the timer.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index ba368cc15b7..47e05e72688 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -284,6 +284,7 @@ public record StreamsGroupMember(String memberId,
.setClientTags(Collections.emptyMap())
.setState(MemberState.STABLE)
.setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
.setAssignedTasks(TasksTupleWithEpochs.EMPTY)
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
.setUserEndpoint(null);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 77014de5bf1..a6837926818 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -68,6 +68,8 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
+ } else if
(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
} else {
assertPropertyInvalid(name, "not_a_number", "-0.1");
}
@@ -237,6 +239,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
"10");
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
"2000");
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
+
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@@ -252,6 +255,7 @@ public class GroupConfigTest {
assertEquals(10,
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(1,
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
+ assertEquals(3000,
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
}
@Test
@@ -274,6 +278,7 @@ public class GroupConfigTest {
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
+ props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"3000");
return props;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 491df993e09..c447aec5374 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -198,6 +198,7 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
111);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
222);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
15 * 60 * 1000);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
5000);
GroupCoordinatorConfig config = createConfig(configs);
@@ -227,6 +228,7 @@ public class GroupCoordinatorConfigTest {
assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
assertEquals(15 * 60 * 1000,
config.consumerGroupRegexRefreshIntervalMs());
+ assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs());
}
@Test
@@ -323,6 +325,11 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
50000);
assertEquals("group.streams.heartbeat.interval.ms must be less than
group.streams.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
-1);
+ assertEquals("Invalid value -1 for configuration
group.streams.initial.rebalance.delay.ms: Value must be at least 0",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
}
public static GroupCoordinatorConfig createGroupCoordinatorConfig(
@@ -359,6 +366,23 @@ public class GroupCoordinatorConfigTest {
return createConfig(configs);
}
+ @Test
+ public void testStreamsGroupInitialRebalanceDelayDefaultValue() {
+ Map<String, Object> configs = new HashMap<>();
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(3000, config.streamsGroupInitialRebalanceDelayMs());
+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
+ config.streamsGroupInitialRebalanceDelayMs());
+ }
+
+ @Test
+ public void testStreamsGroupInitialRebalanceDelayCustomValue() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
7000);
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(7000, config.streamsGroupInitialRebalanceDelayMs());
+ }
+
public static GroupCoordinatorConfig createConfig(Map<String, Object>
configs) {
return new GroupCoordinatorConfig(new AbstractConfig(
GroupCoordinatorConfig.CONFIG_DEF,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6ac1587accd..6f0a4048de1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -10502,6 +10502,59 @@ public class GroupMetadataManagerTest {
assertEquals(expectedRecords, records);
}
+ @Test
+ public void testStreamsGroupDeleteCancelsInitialRebalanceTimer() {
+ String groupId = "streams-group-id";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
1000)
+ .build();
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ String timerKey =
GroupMetadataManager.streamsInitialRebalanceKey(groupId);
+ assertTrue(context.timer.isScheduled(timerKey), "Timer should be
scheduled after first member joins");
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ context.groupMetadataManager.createGroupTombstoneRecords(groupId,
records);
+
+ assertFalse(context.timer.isScheduled(timerKey), "Timer should be
cancelled after group deletion");
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(groupId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(groupId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId)
+ );
+ assertEquals(expectedRecords, records);
+ }
+
@Test
public void testConsumerGroupMaybeDelete() {
String groupId = "group-id";
@@ -16236,6 +16289,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -16261,7 +16315,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -16278,12 +16332,12 @@ public class GroupMetadataManagerTest {
StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(1500)
- .setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, 1,
+ .setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, 2,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)))
@@ -16292,13 +16346,13 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
)),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -16324,6 +16378,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -16348,7 +16403,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -16362,7 +16417,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -16377,7 +16432,7 @@ public class GroupMetadataManagerTest {
List<StreamsGroupDescribeResponseData.DescribedGroup>
actualDescribedGroups =
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId),
context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup expectedDescribedGroup
= new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
- .setAssignmentEpoch(1)
+ .setAssignmentEpoch(2)
.setTopology(
new StreamsGroupDescribeResponseData.Topology()
.setEpoch(0)
@@ -16392,7 +16447,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
))
.setGroupState(StreamsGroupState.STABLE.toString())
- .setGroupEpoch(1);
+ .setGroupEpoch(2);
assertEquals(1, actualDescribedGroups.size());
assertEquals(expectedDescribedGroup, actualDescribedGroups.get(0));
}
@@ -16419,6 +16474,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
// Member joins the streams group.
@@ -16441,7 +16497,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
@@ -16454,7 +16510,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -16464,11 +16520,11 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
)), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -16499,6 +16555,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
// Member joins the streams group.
@@ -16526,7 +16583,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
@@ -16539,7 +16596,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -16549,11 +16606,11 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
)), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -16586,6 +16643,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
// Member joins the streams group.
@@ -16608,7 +16666,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
@@ -16621,7 +16679,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -16631,12 +16689,12 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
)), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -17352,6 +17410,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
// Prepare new assignment for the group.
@@ -17375,7 +17434,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -17396,12 +17455,87 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000),
result.response().data()
);
}
+ @Test
+ public void
testStreamsInitialRebalanceDelayEmptyDuringDelayAssignsAfterTimer() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
1000)
+ .build();
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result;
+
+ result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setPartitionsByUserEndpoint(null)
+ .setEndpointInformationEpoch(0),
+ result.response().data()
+ );
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ context.sleep(10000);
+
+ result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1))))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+ }
+
@Test
public void testStreamsReconciliationProcess() {
String groupId = "fooup";
@@ -17957,6 +18091,7 @@ public class GroupMetadataManagerTest {
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
// Member 1 joins the streams group. The request fails because the
@@ -17972,7 +18107,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())));
- assertEquals("Failed to compute a new target assignment for epoch 1:
Assignment failed.", e.getMessage());
+ assertEquals("Failed to compute a new target assignment for epoch 2:
Assignment failed.", e.getMessage());
}
@Test
@@ -18223,6 +18358,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -18241,7 +18377,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of()));
- assertEquals(1, result.response().data().memberEpoch());
+ assertEquals(2, result.response().data().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -18258,7 +18394,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().data().memberEpoch()));
- assertEquals(1, result.response().data().memberEpoch());
+ assertEquals(2, result.response().data().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -18303,6 +18439,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -18321,7 +18458,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of()));
- assertEquals(1, result.response().data().memberEpoch());
+ assertEquals(2, result.response().data().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -18338,7 +18475,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
)
)
)),
@@ -18368,6 +18505,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 3)
.buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
assignor.prepareGroupAssignment(Map.of(memberId1,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -18390,7 +18528,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -18431,7 +18569,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId2)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
@@ -18450,13 +18588,13 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setRebalanceTimeoutMs(12000));
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -18482,7 +18620,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setActiveTasks(List.of(new
StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(subtopology1)
.setPartitions(List.of(0, 1))))
@@ -18492,7 +18630,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(0),
result.response().data()
@@ -18529,6 +18667,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
assignor.prepareGroupAssignment(
@@ -18550,7 +18689,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -18591,7 +18730,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId2)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
@@ -18610,12 +18749,12 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
- .setMemberEpoch(1));
+ .setMemberEpoch(2));
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -18638,7 +18777,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId1),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 4,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
)
)
)),
@@ -18765,6 +18904,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
// Prepare new assignment for the group.
@@ -18795,7 +18935,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
@@ -18836,6 +18976,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 4)
.buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
// Prepare assignment for first member
@@ -18856,7 +18997,7 @@ public class GroupMetadataManagerTest {
.setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint().setHost("host1").setPort(9092))
.setEndpointInformationEpoch(0));
- assertEquals(1, result.response().data().memberEpoch());
+ assertEquals(2, result.response().data().memberEpoch());
// Prepare assignment for both members
assignor.prepareGroupAssignment(
@@ -19059,6 +19200,7 @@ public class GroupMetadataManagerTest {
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
ClassicGroup classicGroup = new ClassicGroup(
new LogContext(),
@@ -19081,7 +19223,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember expectedMember =
StreamsGroupMember.Builder.withDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setClientId(DEFAULT_CLIENT_ID)
@@ -19098,9 +19240,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId,
1, 0, -1, Map.of("num.standby.replicas", "0")),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId,
2, 0, -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
2),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
),
result.records()
@@ -19398,6 +19540,7 @@ public class GroupMetadataManagerTest {
.addTopic(fooTopicId, fooTopicName, 6)
.addRacks()
.buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
.build();
assignor.prepareGroupAssignment(
@@ -19416,7 +19559,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of()));
- assertEquals(1, result.response().data().memberEpoch());
+ assertEquals(2, result.response().data().memberEpoch());
assertEquals(Map.of("num.standby.replicas", "0"),
assignor.lastPassedAssignmentConfigs());
// Verify heartbeat interval
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 7ffeb6ac035..32df2b6f653 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -160,6 +160,7 @@ public class RestoreIntegrationTest {
appId = safeUniqueTestName(testInfo);
inputStream = appId + "-input-stream";
CLUSTER.createTopic(inputStream, 2, 1);
+ CLUSTER.setGroupStreamsInitialRebalanceDelay(appId, 0);
}
private Properties props() {
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 6de262e72f9..ba3451589f0 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -480,6 +480,19 @@ public class EmbeddedKafkaCluster {
}
}
+ public void setGroupStreamsInitialRebalanceDelay(final String groupId,
final int initialRebalanceDelayMs) {
+ try (final Admin adminClient = createAdminClient()) {
+ adminClient.incrementalAlterConfigs(
+ Map.of(
+ new ConfigResource(ConfigResource.Type.GROUP, groupId),
+ List.of(new AlterConfigOp(new
ConfigEntry(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
String.valueOf(initialRebalanceDelayMs)), AlterConfigOp.OpType.SET))
+ )
+ ).all().get();
+ } catch (final InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private final class TopicsRemainingCondition implements TestCondition {
final Set<String> remainingTopics = new HashSet<>();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 723e928e770..3915f2e73b7 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -167,9 +167,10 @@ public class DescribeStreamsGroupTest {
@Test
public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws
Exception {
final List<String> expectedHeader = List.of("GROUP", "COORDINATOR",
"(ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
- final Set<List<String>> expectedRows = Set.of(List.of(APP_ID, "", "",
"Stable", "3", "3", "2"));
+ final Set<List<String>> expectedRows = Set.of(List.of(APP_ID, "", "",
"Stable", "", "", "2"));
// The coordinator is not deterministic, so we don't care about it.
- final List<Integer> dontCares = List.of(1, 2);
+ // The GROUP-EPOCH and TARGET-ASSIGNMENT-EPOCH can vary due to
rebalance timing, so we don't care about them either.
+ final List<Integer> dontCares = List.of(1, 2, 4, 5);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe",
"--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows,
dontCares);
@@ -194,10 +195,11 @@ public class DescribeStreamsGroupTest {
public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws
Exception {
final List<String> expectedHeader = List.of("GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows = Set.of(
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
+ List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+ List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
// The member and process names as well as client-id are not
deterministic, so we don't care about them.
- final List<Integer> dontCares = List.of(3, 6, 7);
+ // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to
rebalance timing, so we don't care about them either.
+ final List<Integer> dontCares = List.of(1, 3, 5, 6, 7);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe",
"--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows,
dontCares);
@@ -208,22 +210,28 @@ public class DescribeStreamsGroupTest {
@Test
public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions()
throws Exception {
cluster.createTopic(INPUT_TOPIC_2, 1, 1);
+ TestUtils.waitForCondition(
+ () -> cluster.getAllTopicsInCluster().contains(INPUT_TOPIC_2),
+ 30000,
+ "Topic " + INPUT_TOPIC_2 + " not created"
+ );
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2,
OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
startApplicationAndWaitUntilRunning(streams2);
final List<String> expectedHeader = List.of("GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows1 = Set.of(
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
+ List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+ List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
final Set<List<String>> expectedRows2 = Set.of(
- List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"1:[0];", "TARGET-ACTIVE:", "1:[0];"),
- List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0];", "TARGET-ACTIVE:", "0:[0];"));
+ List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:",
"1:[0];", "TARGET-ACTIVE:", "1:[0];"),
+ List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[0];", "TARGET-ACTIVE:", "0:[0];"));
final Map<String, Set<List<String>>> expectedRowsMap = new HashMap<>();
expectedRowsMap.put(APP_ID, expectedRows1);
expectedRowsMap.put(APP_ID_2, expectedRows2);
// The member and process names as well as client-id are not
deterministic, so we don't care about them.
- final List<Integer> dontCares = List.of(3, 6, 7);
+ // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to
rebalance timing, so we don't care about them either.
+ final List<Integer> dontCares = List.of(1, 3, 5, 6, 7);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe",
"--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
@@ -254,7 +262,9 @@ public class DescribeStreamsGroupTest {
@Test
public void testDescribeStreamsGroupWithShortTimeout() {
- List<String> args = List.of("--bootstrap-server", bootstrapServers,
"--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1");
+ // Note: 1ms timeout may not always trigger timeout on fast machines
with warm groups
+ // Using 0ms to ensure timeout
+ List<String> args = List.of("--bootstrap-server", bootstrapServers,
"--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "0");
Throwable e = assertThrows(ExecutionException.class, () ->
getStreamsGroupService(args.toArray(new String[0])).describeGroups());
assertEquals(TimeoutException.class, e.getCause().getClass());
}