This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c70b7c4b9e6 KAFKA-18323: Add StreamsGroup class (#18729)
c70b7c4b9e6 is described below
commit c70b7c4b9e6fbd805d9a229b4ac13c1e10016d76
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Feb 12 11:01:53 2025 +0100
KAFKA-18323: Add StreamsGroup class (#18729)
Implements a memory model for representing streams groups in the group
coordinator, as well as group count and rebalance metrics.
Reviewers: Bill Bejeck <[email protected]>, Bruno Cadonna
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/coordinator/group/Group.java | 1 +
.../group/metrics/GroupCoordinatorMetrics.java | 128 ++-
.../metrics/GroupCoordinatorMetricsShard.java | 142 +++
.../streams/StreamsCoordinatorRecordHelpers.java | 12 -
.../coordinator/group/streams/StreamsGroup.java | 1012 ++++++++++++++++++
.../coordinator/group/streams/TopicMetadata.java | 23 +-
.../StreamsGroupPartitionMetadataValue.json | 9 +-
.../group/metrics/GroupCoordinatorMetricsTest.java | 60 +-
.../StreamsCoordinatorRecordHelpersTest.java | 14 +-
.../group/streams/StreamsGroupTest.java | 1099 ++++++++++++++++++++
.../group/streams/TargetAssignmentBuilderTest.java | 35 +-
.../group/streams/TopicMetadataTest.java | 45 +-
.../streams/topics/InternalTopicManagerTest.java | 8 +-
14 files changed, 2472 insertions(+), 118 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4b5fad5d6e1..8d753be0e99 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -335,7 +335,7 @@
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
-
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde).java"/>
+
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
<suppress checks="JavaNCSS"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index b5d63499751..79c1b72237b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -37,6 +37,7 @@ public interface Group {
CONSUMER("consumer"),
CLASSIC("classic"),
SHARE("share"),
+ STREAMS("streams"),
UNKNOWN("unknown");
private final String name;
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
index 23606593a6b..1cd9e4b0bd9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
@@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -73,6 +74,8 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
public static final String SHARE_GROUP_COUNT_METRIC_NAME = "group-count";
public static final String CONSUMER_GROUP_COUNT_STATE_TAG = "state";
public static final String SHARE_GROUP_COUNT_STATE_TAG =
CONSUMER_GROUP_COUNT_STATE_TAG;
+ public static final String STREAMS_GROUP_COUNT_METRIC_NAME =
"streams-group-count";
+ public static final String STREAMS_GROUP_COUNT_STATE_TAG = "state";
public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
@@ -80,6 +83,7 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME
= "CompletedRebalances";
public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME =
"ConsumerGroupRebalances";
public static final String SHARE_GROUP_REBALANCES_SENSOR_NAME =
"ShareGroupRebalances";
+ public static final String STREAMS_GROUP_REBALANCES_SENSOR_NAME =
"StreamsGroupRebalances";
private final MetricName classicGroupCountMetricName;
private final MetricName consumerGroupCountMetricName;
@@ -92,6 +96,13 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
private final MetricName shareGroupCountEmptyMetricName;
private final MetricName shareGroupCountStableMetricName;
private final MetricName shareGroupCountDeadMetricName;
+ private final MetricName streamsGroupCountMetricName;
+ private final MetricName streamsGroupCountEmptyMetricName;
+ private final MetricName streamsGroupCountAssigningMetricName;
+ private final MetricName streamsGroupCountReconcilingMetricName;
+ private final MetricName streamsGroupCountStableMetricName;
+ private final MetricName streamsGroupCountDeadMetricName;
+ private final MetricName streamsGroupCountNotReadyMetricName;
private final MetricsRegistry registry;
private final Metrics metrics;
@@ -106,6 +117,7 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
this(KafkaYammerMetrics.defaultRegistry(), new Metrics());
}
+ @SuppressWarnings("MethodLength")
public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
this.registry = Objects.requireNonNull(registry);
this.metrics = Objects.requireNonNull(metrics);
@@ -190,6 +202,55 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
SHARE_GROUP_COUNT_STATE_TAG,
ShareGroup.ShareGroupState.DEAD.toString()
);
+ streamsGroupCountMetricName = metrics.metricName(
+ GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The total number of groups using the streams rebalance protocol.",
+ Collections.singletonMap(GROUP_COUNT_PROTOCOL_TAG,
Group.GroupType.STREAMS.toString())
+ );
+
+ streamsGroupCountEmptyMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in empty state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG,
StreamsGroupState.EMPTY.toString())
+ );
+
+ streamsGroupCountAssigningMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in assigning state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG,
StreamsGroupState.ASSIGNING.toString())
+ );
+
+ streamsGroupCountReconcilingMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in reconciling state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG,
StreamsGroupState.RECONCILING.toString())
+ );
+
+ streamsGroupCountStableMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in stable state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG,
StreamsGroupState.STABLE.toString())
+ );
+
+ streamsGroupCountDeadMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in dead state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG,
StreamsGroupState.DEAD.toString())
+ );
+
+ streamsGroupCountNotReadyMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in not ready state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG,
StreamsGroupState.NOT_READY.toString())
+ );
+
registerGauges();
Sensor offsetCommitsSensor =
metrics.sensor(OFFSET_COMMITS_SENSOR_NAME);
@@ -247,6 +308,15 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
METRICS_GROUP,
"The total number of share group rebalances",
SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
+
+ Sensor streamsGroupRebalanceSensor =
metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
+ streamsGroupRebalanceSensor.add(new Meter(
+ metrics.metricName("streams-group-rebalance-rate",
+ METRICS_GROUP,
+ "The rate of streams group rebalances"),
+ metrics.metricName("streams-group-rebalance-count",
+ METRICS_GROUP,
+ "The total number of streams group rebalances")));
globalSensors = Collections.unmodifiableMap(Utils.mkMap(
Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor),
@@ -254,7 +324,8 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
classicGroupCompletedRebalancesSensor),
Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME,
consumerGroupRebalanceSensor),
- Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME,
shareGroupRebalanceSensor)
+ Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME,
shareGroupRebalanceSensor),
+ Utils.mkEntry(STREAMS_GROUP_REBALANCES_SENSOR_NAME,
streamsGroupRebalanceSensor)
));
}
@@ -278,6 +349,14 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
return shards.values().stream().mapToLong(shard ->
shard.numConsumerGroups(state)).sum();
}
+ private long numStreamsGroups() {
+ return
shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numStreamsGroups).sum();
+ }
+
+ private long numStreamsGroups(StreamsGroupState state) {
+ return shards.values().stream().mapToLong(shard ->
shard.numStreamsGroups(state)).sum();
+ }
+
private long numShareGroups() {
return
shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numShareGroups).sum();
}
@@ -309,7 +388,14 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
shareGroupCountMetricName,
shareGroupCountEmptyMetricName,
shareGroupCountStableMetricName,
- shareGroupCountDeadMetricName
+ shareGroupCountDeadMetricName,
+ streamsGroupCountMetricName,
+ streamsGroupCountEmptyMetricName,
+ streamsGroupCountAssigningMetricName,
+ streamsGroupCountReconcilingMetricName,
+ streamsGroupCountStableMetricName,
+ streamsGroupCountDeadMetricName,
+ streamsGroupCountNotReadyMetricName
).forEach(metrics::removeMetric);
Arrays.asList(
@@ -318,7 +404,8 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
OFFSET_DELETIONS_SENSOR_NAME,
CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
CONSUMER_GROUP_REBALANCES_SENSOR_NAME,
- SHARE_GROUP_REBALANCES_SENSOR_NAME
+ SHARE_GROUP_REBALANCES_SENSOR_NAME,
+ STREAMS_GROUP_REBALANCES_SENSOR_NAME
).forEach(metrics::removeSensor);
}
@@ -461,5 +548,40 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
shareGroupCountDeadMetricName,
(Gauge<Long>) (config, now) ->
numShareGroups(ShareGroup.ShareGroupState.DEAD)
);
+
+ metrics.addMetric(
+ streamsGroupCountMetricName,
+ (Gauge<Long>) (config, now) -> numStreamsGroups()
+ );
+
+ metrics.addMetric(
+ streamsGroupCountEmptyMetricName,
+ (Gauge<Long>) (config, now) ->
numStreamsGroups(StreamsGroupState.EMPTY)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountAssigningMetricName,
+ (Gauge<Long>) (config, now) ->
numStreamsGroups(StreamsGroupState.ASSIGNING)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountReconcilingMetricName,
+ (Gauge<Long>) (config, now) ->
numStreamsGroups(StreamsGroupState.RECONCILING)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountStableMetricName,
+ (Gauge<Long>) (config, now) ->
numStreamsGroups(StreamsGroupState.STABLE)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountDeadMetricName,
+ (Gauge<Long>) (config, now) ->
numStreamsGroups(StreamsGroupState.DEAD)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountNotReadyMetricName,
+ (Gauge<Long>) (config, now) ->
numStreamsGroups(StreamsGroupState.NOT_READY)
+ );
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
index 1ed75229f58..8b814bb0b23 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;
@@ -78,6 +79,11 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
*/
private final Map<ShareGroup.ShareGroupState, TimelineGaugeCounter>
shareGroupGauges;
+ /**
+ * Streams group size gauge counters keyed by the metric name.
+ */
+ private final Map<StreamsGroupState, TimelineGaugeCounter>
streamsGroupGauges;
+
/**
* All sensors keyed by the sensor name. A Sensor object is shared across
all metrics shards.
*/
@@ -119,6 +125,21 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0)))
);
+ this.streamsGroupGauges = Utils.mkMap(
+ Utils.mkEntry(StreamsGroupState.EMPTY,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.ASSIGNING,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.RECONCILING,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.STABLE,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.DEAD,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.NOT_READY,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0)))
+ );
+
this.globalSensors = Objects.requireNonNull(globalSensors);
this.topicPartition = Objects.requireNonNull(topicPartition);
}
@@ -144,6 +165,20 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
this.consumerGroupGauges = consumerGroupGauges;
}
+ /**
+ * Increment the number of streams groups.
+ *
+ * @param state the streams group state.
+ */
+ public void incrementNumStreamsGroups(StreamsGroupState state) {
+ TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+ if (gaugeCounter != null) {
+ synchronized (gaugeCounter.timelineLong) {
+ gaugeCounter.timelineLong.increment();
+ }
+ }
+ }
+
/**
* Decrement the number of offsets.
*/
@@ -153,6 +188,20 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
}
}
+ /**
+ * Decrement the number of streams groups.
+ *
+ * @param state the streams group state.
+ */
+ public void decrementNumStreamsGroups(StreamsGroupState state) {
+ TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+ if (gaugeCounter != null) {
+ synchronized (gaugeCounter.timelineLong) {
+ gaugeCounter.timelineLong.decrement();
+ }
+ }
+ }
+
/**
* @return The number of offsets.
*/
@@ -205,6 +254,29 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
return consumerGroupGauges.values().stream()
.mapToLong(Long::longValue).sum();
}
+
+ /**
+ * Obtain the number of streams groups in the specified state.
+ *
+ * @param state the streams group state.
+ *
+ * @return The number of streams groups in `state`.
+ */
+ public long numStreamsGroups(StreamsGroupState state) {
+ TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+ if (gaugeCounter != null) {
+ return gaugeCounter.atomicLong.get();
+ }
+ return 0L;
+ }
+
+ /**
+ * @return The total number of streams groups.
+ */
+ public long numStreamsGroups() {
+ return streamsGroupGauges.values().stream()
+ .mapToLong(timelineGaugeCounter ->
timelineGaugeCounter.atomicLong.get()).sum();
+ }
@Override
public void record(String sensorName) {
@@ -246,6 +318,14 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
}
gaugeCounter.atomicLong.set(value);
});
+
+ this.streamsGroupGauges.forEach((__, gaugeCounter) -> {
+ long value;
+ synchronized (gaugeCounter.timelineLong) {
+ value = gaugeCounter.timelineLong.get(offset);
+ }
+ gaugeCounter.atomicLong.set(value);
+ });
}
/**
@@ -330,4 +410,66 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
}
}
}
+
+ /**
+ * Called when a streams group's state has changed. Increment/decrement
+ * the counter accordingly.
+ *
+ * @param oldState The previous state. null value means that it's a new
group.
+ * @param newState The next state. null value means that the group has
been removed.
+ */
+ public void onStreamsGroupStateTransition(
+ StreamsGroupState oldState,
+ StreamsGroupState newState
+ ) {
+ if (newState != null) {
+ switch (newState) {
+ case EMPTY:
+ incrementNumStreamsGroups(StreamsGroupState.EMPTY);
+ break;
+ case NOT_READY:
+ incrementNumStreamsGroups(StreamsGroupState.NOT_READY);
+ break;
+ case ASSIGNING:
+ incrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
+ break;
+ case RECONCILING:
+ incrementNumStreamsGroups(StreamsGroupState.RECONCILING);
+ break;
+ case STABLE:
+ incrementNumStreamsGroups(StreamsGroupState.STABLE);
+ break;
+ case DEAD:
+ incrementNumStreamsGroups(StreamsGroupState.DEAD);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown new state for
streams group: " + newState);
+ }
+ }
+
+ if (oldState != null) {
+ switch (oldState) {
+ case EMPTY:
+ decrementNumStreamsGroups(StreamsGroupState.EMPTY);
+ break;
+ case NOT_READY:
+ decrementNumStreamsGroups(StreamsGroupState.NOT_READY);
+ break;
+ case ASSIGNING:
+ decrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
+ break;
+ case RECONCILING:
+ decrementNumStreamsGroups(StreamsGroupState.RECONCILING);
+ break;
+ case STABLE:
+ decrementNumStreamsGroups(StreamsGroupState.STABLE);
+ break;
+ case DEAD:
+ decrementNumStreamsGroups(StreamsGroupState.DEAD);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown old state for
streams group: " + newState);
+ }
+ }
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index c44e5d89713..881f930d8a5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -26,7 +26,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -116,21 +115,10 @@ public class StreamsCoordinatorRecordHelpers {
StreamsGroupPartitionMetadataValue value = new
StreamsGroupPartitionMetadataValue();
newPartitionMetadata.forEach((topicName, topicMetadata) -> {
- List<StreamsGroupPartitionMetadataValue.PartitionMetadata>
partitionMetadata = new ArrayList<>();
- if (!topicMetadata.partitionRacks().isEmpty()) {
- topicMetadata.partitionRacks().forEach((partition, racks) ->
- partitionMetadata.add(new
StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(partition)
- .setRacks(racks.stream().sorted().toList())
- )
- );
- }
-
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
- .setPartitionMetadata(partitionMetadata)
);
});
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
new file mode 100644
index 00000000000..d161e64f599
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -0,0 +1,1012 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+ /**
+ * The protocol type for streams groups. There is only one protocol type,
"streams".
+ */
+ private static final String PROTOCOL_TYPE = "streams";
+
+ public enum StreamsGroupState {
+ EMPTY("Empty"),
+ NOT_READY("NotReady"),
+ ASSIGNING("Assigning"),
+ RECONCILING("Reconciling"),
+ STABLE("Stable"),
+ DEAD("Dead");
+
+ private final String name;
+
+ private final String lowerCaseName;
+
+ StreamsGroupState(String name) {
+ this.name = name;
+ if (Objects.equals(name, "NotReady")) {
+ this.lowerCaseName = "not_ready";
+ } else {
+ this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ public String toLowerCaseString() {
+ return lowerCaseName;
+ }
+ }
+
+ public static class DeadlineAndEpoch {
+
+ static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+ public final long deadlineMs;
+ public final int epoch;
+
+ DeadlineAndEpoch(long deadlineMs, int epoch) {
+ this.deadlineMs = deadlineMs;
+ this.epoch = epoch;
+ }
+ }
+
+ private final LogContext logContext;
+ private final Logger log;
+
+ /**
+ * The snapshot registry.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The group ID.
+ */
+ private final String groupId;
+
+ /**
+ * The group state.
+ */
+ private final TimelineObject<StreamsGroupState> state;
+
+ /**
+ * The group epoch. The epoch is incremented whenever the topology, topic
metadata or the set of members changes and it will trigger
+ * the computation of a new assignment for the group.
+ */
+ private final TimelineInteger groupEpoch;
+
+ /**
+ * The group members.
+ */
+ private final TimelineHashMap<String, StreamsGroupMember> members;
+
+ /**
+ * The static group members.
+ */
+ private final TimelineHashMap<String, String> staticMembers;
+
+ /**
+ * The metadata associated with each subscribed topic name.
+ */
+ private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+ /**
+ * The target assignment epoch. An assignment epoch smaller than the group
epoch means that a new assignment is required. The assignment
+ * epoch is updated when a new assignment is installed.
+ */
+ private final TimelineInteger targetAssignmentEpoch;
+
+ /**
+ * The target assignment per member ID.
+ */
+ private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+ /**
+ * These maps map each active/standby/warmup task to the process ID(s) of
their current owner.
+ * The mapping is of the form <code>subtopology -> partition ->
memberId</code>.
+ * When a member revokes a partition, it removes its process ID from this
map.
+ * When a member gets a partition, it adds its process ID to this map.
+ */
+ private final TimelineHashMap<String, TimelineHashMap<Integer, String>>
currentActiveTaskToProcessId;
+ private final TimelineHashMap<String, TimelineHashMap<Integer,
Set<String>>> currentStandbyTaskToProcessIds;
+ private final TimelineHashMap<String, TimelineHashMap<Integer,
Set<String>>> currentWarmupTaskToProcessIds;
+
+ /**
+ * The coordinator metrics.
+ */
+ private final GroupCoordinatorMetricsShard metrics;
+
+ /**
+ * The Streams topology.
+ */
+ private final TimelineObject<Optional<StreamsTopology>> topology;
+
+ /**
+ * The configured topology including resolved regular expressions.
+ */
+ private final TimelineObject<Optional<ConfiguredTopology>>
configuredTopology;
+
+ /**
+ * The metadata refresh deadline. It consists of a timestamp in
milliseconds together with the group epoch at the time of setting it.
+ * The metadata refresh time is considered as a soft state (read that it
is not stored in a timeline data structure). It is like this
+ * because it is not persisted to the log. The group epoch is here to
ensure that the metadata refresh deadline is invalidated if the
+ * group epoch does not correspond to the current group epoch. This can
happen if the metadata refresh deadline is updated after having
+ * refreshed the metadata but the write operation failed. In this case,
the time is not automatically rolled back.
+ */
+ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+ public StreamsGroup(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ String groupId,
+ GroupCoordinatorMetricsShard metrics
+ ) {
+ this.log = logContext.logger(StreamsGroup.class);
+ this.logContext = logContext;
+ this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+ this.groupId = Objects.requireNonNull(groupId);
+ this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+ this.groupEpoch = new TimelineInteger(snapshotRegistry);
+ this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+ this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.currentActiveTaskToProcessId = new
TimelineHashMap<>(snapshotRegistry, 0);
+ this.currentStandbyTaskToProcessIds = new
TimelineHashMap<>(snapshotRegistry, 0);
+ this.currentWarmupTaskToProcessIds = new
TimelineHashMap<>(snapshotRegistry, 0);
+ this.metrics = Objects.requireNonNull(metrics);
+ this.topology = new TimelineObject<>(snapshotRegistry,
Optional.empty());
+ this.configuredTopology = new TimelineObject<>(snapshotRegistry,
Optional.empty());
+ }
+
+ /**
+ * @return The group type (Streams).
+ */
+ @Override
+ public GroupType type() {
+ return GroupType.STREAMS;
+ }
+
+ /**
+ * @return The current state as a String.
+ */
+ @Override
+ public String stateAsString() {
+ return state.get().toString();
+ }
+
+ /**
+ * @return The current state as a String with given committedOffset.
+ */
+ public String stateAsString(long committedOffset) {
+ return state.get(committedOffset).toString();
+ }
+
+ /**
+ * @return the group formatted as a list group response based on the
committed offset.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup(long
committedOffset) {
+ return new ListGroupsResponseData.ListedGroup()
+ .setGroupId(groupId)
+ .setProtocolType(PROTOCOL_TYPE)
+ .setGroupState(state.get(committedOffset).toString())
+ .setGroupType(type().toString());
+ }
+
+ public Optional<ConfiguredTopology> configuredTopology() {
+ return configuredTopology.get();
+ }
+
+ public Optional<StreamsTopology> topology() {
+ return topology.get();
+ }
+
+ public void setTopology(StreamsTopology topology) {
+ this.topology.set(Optional.of(topology));
+ maybeUpdateConfiguredTopology();
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * @return The group ID.
+ */
+ @Override
+ public String groupId() {
+ return groupId;
+ }
+
+ /**
+ * @return The current state.
+ */
+ public StreamsGroupState state() {
+ return state.get();
+ }
+
+ /**
+ * @return The group epoch.
+ */
+ public int groupEpoch() {
+ return groupEpoch.get();
+ }
+
+ /**
+ * Sets the group epoch.
+ *
+ * @param groupEpoch The new group epoch.
+ */
+ public void setGroupEpoch(int groupEpoch) {
+ this.groupEpoch.set(groupEpoch);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * @return The target assignment epoch.
+ */
+ public int assignmentEpoch() {
+ return targetAssignmentEpoch.get();
+ }
+
+ /**
+ * Sets the assignment epoch.
+ *
+ * @param targetAssignmentEpoch The new assignment epoch.
+ */
+ public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+ this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Get member ID of a static member that matches the given group instance
ID.
+ *
+ * @param groupInstanceId The group instance ID.
+ * @return The member ID corresponding to the given instance ID or null if
it does not exist
+ */
+ public String staticMemberId(String groupInstanceId) {
+ return staticMembers.get(groupInstanceId);
+ }
+
+ /**
+ * Gets or creates a new member but without adding it to the group. Adding
a member is done via the
+ * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+ *
+ * @param memberId The member ID.
+ * @param createIfNotExists Booleans indicating whether the member must be
created if it does not exist.
+ * @return A StreamsGroupMember.
+ */
+ public StreamsGroupMember getOrMaybeCreateMember(
+ String memberId,
+ boolean createIfNotExists
+ ) {
+ StreamsGroupMember member = members.get(memberId);
+ if (member != null) {
+ return member;
+ }
+
+ if (!createIfNotExists) {
+ throw new UnknownMemberIdException(
+ String.format("Member %s is not a member of group %s.",
memberId, groupId)
+ );
+ }
+
+ return new StreamsGroupMember.Builder(memberId).build();
+ }
+
+ /**
+ * Gets a static member.
+ *
+ * @param instanceId The group instance ID.
+ * @return The member corresponding to the given instance ID or null if it
does not exist
+ */
+ public StreamsGroupMember staticMember(String instanceId) {
+ String existingMemberId = staticMemberId(instanceId);
+ return existingMemberId == null ? null :
getOrMaybeCreateMember(existingMemberId, false);
+ }
+
+ /**
+ * Adds or updates the member.
+ *
+ * @param newMember The new member state.
+ */
+ public void updateMember(StreamsGroupMember newMember) {
+ if (newMember == null) {
+ throw new IllegalArgumentException("newMember cannot be null.");
+ }
+ StreamsGroupMember oldMember = members.put(newMember.memberId(),
newMember);
+ maybeUpdateTaskProcessId(oldMember, newMember);
+ updateStaticMember(newMember);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Updates the member ID stored against the instance ID if the member is a
static member.
+ *
+ * @param newMember The new member state.
+ */
+ private void updateStaticMember(StreamsGroupMember newMember) {
+ if (newMember.instanceId() != null &&
newMember.instanceId().isPresent()) {
+ staticMembers.put(newMember.instanceId().get(),
newMember.memberId());
+ }
+ }
+
+ /**
+ * Remove the member from the group.
+ *
+ * @param memberId The member ID to remove.
+ */
+ public void removeMember(String memberId) {
+ StreamsGroupMember oldMember = members.remove(memberId);
+ maybeRemoveTaskProcessId(oldMember);
+ removeStaticMember(oldMember);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Remove the static member mapping if the removed member is static.
+ *
+ * @param oldMember The member to remove.
+ */
+ private void removeStaticMember(StreamsGroupMember oldMember) {
+ if (oldMember.instanceId() != null &&
oldMember.instanceId().isPresent()) {
+ staticMembers.remove(oldMember.instanceId().get());
+ }
+ }
+
+ /**
+ * Returns true if the member exists.
+ *
+ * @param memberId The member ID.
+ * @return A boolean indicating whether the member exists or not.
+ */
+ public boolean hasMember(String memberId) {
+ return members.containsKey(memberId);
+ }
+
+ /**
+ * @return The number of members.
+ */
+ public int numMembers() {
+ return members.size();
+ }
+
+ /**
+ * @return An immutable map containing all the members keyed by their ID.
+ */
+ public Map<String, StreamsGroupMember> members() {
+ return Collections.unmodifiableMap(members);
+ }
+
+ /**
+ * @return An immutable map containing all the static members keyed by
instance ID.
+ */
+ public Map<String, String> staticMembers() {
+ return Collections.unmodifiableMap(staticMembers);
+ }
+
+ /**
+ * Returns the target assignment of the member.
+ *
+ * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not
exist.
+ */
+ public TasksTuple targetAssignment(String memberId) {
+ return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
+ }
+
+ /**
+ * Updates the target assignment of a member.
+ *
+ * @param memberId The member ID.
+ * @param newTargetAssignment The new target assignment.
+ */
+ public void updateTargetAssignment(String memberId, TasksTuple
newTargetAssignment) {
+ targetAssignment.put(memberId, newTargetAssignment);
+ }
+
+ /**
+ * @return An immutable map containing all the target assignment keyed by
member ID.
+ */
+ public Map<String, TasksTuple> targetAssignment() {
+ return Collections.unmodifiableMap(targetAssignment);
+ }
+
+ /**
+ * Returns the current process ID of a task or null if the task does not
have one.
+ *
+ * @param subtopologyId The topic ID.
+ * @param taskId The task ID.
+ * @return The process ID or null.
+ */
+ public String currentActiveTaskProcessId(
+ String subtopologyId, int taskId
+ ) {
+ Map<Integer, String> tasks =
currentActiveTaskToProcessId.get(subtopologyId);
+ if (tasks == null) {
+ return null;
+ } else {
+ return tasks.getOrDefault(taskId, null);
+ }
+ }
+
+ /**
+ * Returns the current process IDs of a task or empty set if the task does
not have one.
+ *
+ * @param subtopologyId The topic ID.
+ * @param taskId The task ID.
+ * @return The process IDs or empty set.
+ */
+ public Set<String> currentStandbyTaskProcessIds(
+ String subtopologyId, int taskId
+ ) {
+ Map<Integer, Set<String>> tasks =
currentStandbyTaskToProcessIds.get(subtopologyId);
+ if (tasks == null) {
+ return Collections.emptySet();
+ } else {
+ return tasks.getOrDefault(taskId, Collections.emptySet());
+ }
+ }
+
+ /**
+ * Returns the current process ID of a task or empty set if the task does
not have one.
+ *
+ * @param subtopologyId The topic ID.
+ * @param taskId The process ID.
+ * @return The member IDs or empty set.
+ */
+ public Set<String> currentWarmupTaskProcessIds(
+ String subtopologyId, int taskId
+ ) {
+ Map<Integer, Set<String>> tasks =
currentWarmupTaskToProcessIds.get(subtopologyId);
+ if (tasks == null) {
+ return Collections.emptySet();
+ } else {
+ return tasks.getOrDefault(taskId, Collections.emptySet());
+ }
+ }
+
+ /**
+ * @return An immutable map of partition metadata for each topic that are
inputs for this streams group.
+ */
+ public Map<String, TopicMetadata> partitionMetadata() {
+ return Collections.unmodifiableMap(partitionMetadata);
+ }
+
+ /**
+ * Updates the partition metadata. This replaces the previous one.
+ *
+ * @param partitionMetadata The new partition metadata.
+ */
+ public void setPartitionMetadata(
+ Map<String, TopicMetadata> partitionMetadata
+ ) {
+ this.partitionMetadata.clear();
+ this.partitionMetadata.putAll(partitionMetadata);
+ maybeUpdateConfiguredTopology();
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Computes the partition metadata based on the current topology and the
current topics image.
+ *
+ * @param topicsImage The current metadata for all available topics.
+ * @param topology The current metadata for the Streams topology
+ * @return An immutable map of partition metadata for each topic that the
Streams topology is using (besides non-repartition sink topics)
+ */
+ public Map<String, TopicMetadata> computePartitionMetadata(
+ TopicsImage topicsImage,
+ StreamsTopology topology
+ ) {
+ Set<String> requiredTopicNames = topology.requiredTopics();
+
+ // Create the topic metadata for each subscribed topic.
+ Map<String, TopicMetadata> newPartitionMetadata = new
HashMap<>(requiredTopicNames.size());
+
+ requiredTopicNames.forEach(topicName -> {
+ TopicImage topicImage = topicsImage.getTopic(topicName);
+ if (topicImage != null) {
+ newPartitionMetadata.put(topicName, new TopicMetadata(
+ topicImage.id(),
+ topicImage.name(),
+ topicImage.partitions().size())
+ );
+ }
+ });
+
+ return Collections.unmodifiableMap(newPartitionMetadata);
+ }
+
+ /**
+ * Updates the metadata refresh deadline.
+ *
+ * @param deadlineMs The deadline in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+ public void setMetadataRefreshDeadline(
+ long deadlineMs,
+ int groupEpoch
+ ) {
+ this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs,
groupEpoch);
+ }
+
+ /**
+ * Requests a metadata refresh.
+ */
+ public void requestMetadataRefresh() {
+ this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+ }
+
+ /**
+ * Checks if a metadata refresh is required. A refresh is required in two
cases: 1) The deadline is smaller or equal to the current
+ * time; 2) The group epoch associated with the deadline is larger than
the current group epoch. This means that the operations which
+ * updated the deadline failed.
+ *
+ * @param currentTimeMs The current time in milliseconds.
+ * @return A boolean indicating whether a refresh is required or not.
+ */
+ public boolean hasMetadataExpired(long currentTimeMs) {
+ return currentTimeMs >= metadataRefreshDeadline.deadlineMs ||
groupEpoch() < metadataRefreshDeadline.epoch;
+ }
+
+ /**
+ * @return The metadata refresh deadline.
+ */
+ public DeadlineAndEpoch metadataRefreshDeadline() {
+ return metadataRefreshDeadline;
+ }
+
+ /**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId The member ID.
+ * @param groupInstanceId The group instance ID.
+ * @param memberEpoch The member epoch.
+ * @param isTransactional Whether the offset commit is transactional or
not.
+ * @param apiVersion The api version.
+ * @throws UnknownMemberIdException If the member is not found.
+ * @throws StaleMemberEpochException If the provided member epoch doesn't
match the actual member epoch.
+ */
+ @Override
+ public void validateOffsetCommit(
+ String memberId,
+ String groupInstanceId,
+ int memberEpoch,
+ boolean isTransactional,
+ short apiVersion
+ ) throws UnknownMemberIdException, StaleMemberEpochException {
+ // When the member epoch is -1, the request comes from either the
admin client
+ // or a consumer which does not use the group management facility. In
this case,
+ // the request can commit offsets if the group is empty.
+ if (memberEpoch < 0 && members().isEmpty()) return;
+
+ // The TxnOffsetCommit API does not require the member ID, the
generation ID and the group instance ID fields.
+ // Hence, they are only validated if any of them is provided
+ if (isTransactional && memberEpoch ==
JoinGroupRequest.UNKNOWN_GENERATION_ID &&
+ memberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) &&
groupInstanceId == null)
+ return;
+
+ final StreamsGroupMember member = getOrMaybeCreateMember(memberId,
false);
+
+ // If the commit is not transactional and the member uses the new
streams protocol (KIP-1071),
+ // the member should be using the OffsetCommit API version >= 9.
+ if (!isTransactional && apiVersion < 9) {
+ throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
+ "by members using the streams group protocol");
+ }
+
+ validateMemberEpoch(memberEpoch, member.memberEpoch());
+ }
+
+ /**
+ * Validates the OffsetFetch request.
+ *
+ * @param memberId The member ID for streams groups.
+ * @param memberEpoch The member epoch for streams groups.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
+ */
+ @Override
+ public void validateOffsetFetch(
+ String memberId,
+ int memberEpoch,
+ long lastCommittedOffset
+ ) throws UnknownMemberIdException, StaleMemberEpochException {
+ // When the member ID is null and the member epoch is -1, the request
either comes
+ // from the admin client or from a client which does not provide them.
In this case,
+ // the fetch request is accepted.
+ if (memberId == null && memberEpoch < 0) {
+ return;
+ }
+
+ final StreamsGroupMember member = members.get(memberId,
lastCommittedOffset);
+ if (member == null) {
+ throw new UnknownMemberIdException(String.format("Member %s is not
a member of group %s.",
+ memberId, groupId));
+ }
+ validateMemberEpoch(memberEpoch, member.memberEpoch());
+ }
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() {
+ }
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ @Override
+ public void validateDeleteGroup() throws ApiException {
+ if (state() != StreamsGroupState.EMPTY) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+ }
+
+ @Override
+ public boolean isSubscribedToTopic(String topic) {
+ Optional<ConfiguredTopology> maybeConfiguredTopology =
configuredTopology.get();
+ if (maybeConfiguredTopology.isEmpty() ||
!maybeConfiguredTopology.get().isReady()) {
+ return false;
+ }
+ for (ConfiguredSubtopology sub :
maybeConfiguredTopology.get().subtopologies().orElse(new TreeMap<>()).values())
{
+ if (sub.sourceTopics().contains(topic) ||
sub.repartitionSourceTopics().containsKey(topic)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Populates the list of records with tombstone(s) for deleting the group.
+ *
+ * @param records The list of records.
+ */
+ @Override
+ public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
+ members().forEach((memberId, member) ->
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId(),
memberId))
+ );
+
+ members().forEach((memberId, member) ->
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId(),
memberId))
+ );
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+
+ members().forEach((memberId, member) ->
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(),
memberId))
+ );
+
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(groupId()));
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(groupId()));
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return state() == StreamsGroupState.EMPTY;
+ }
+
+ /**
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty if no
such condition exists.
+ */
+ @Override
+ public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+ return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata
-> offsetAndMetadata.commitTimestampMs));
+ }
+
+ @Override
+ public boolean isInStates(Set<String> statesFilter, long committedOffset) {
+ return
statesFilter.contains(state.get(committedOffset).toLowerCaseString());
+ }
+
+ /**
+ * Throws a StaleMemberEpochException if the received member epoch does
not match the expected member epoch.
+ */
+ private void validateMemberEpoch(
+ int receivedMemberEpoch,
+ int expectedMemberEpoch
+ ) throws StaleMemberEpochException {
+ if (receivedMemberEpoch != expectedMemberEpoch) {
+ throw new StaleMemberEpochException(String.format("The received
member epoch %d does not match "
+ + "the expected member epoch %d.", receivedMemberEpoch,
expectedMemberEpoch));
+ }
+ }
+
+ /**
+ * Updates the current state of the group.
+ */
+ private void maybeUpdateGroupState() {
+ StreamsGroupState previousState = state.get();
+ StreamsGroupState newState = STABLE;
+ if (members.isEmpty()) {
+ newState = EMPTY;
+ } else if (topology() == null || configuredTopology().isEmpty() ||
!configuredTopology().get().isReady()) {
+ newState = NOT_READY;
+ } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+ newState = ASSIGNING;
+ } else {
+ for (StreamsGroupMember member : members.values()) {
+ if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+ newState = RECONCILING;
+ break;
+ }
+ }
+ }
+
+ state.set(newState);
+ metrics.onStreamsGroupStateTransition(previousState, newState);
+ }
+
+ private void maybeUpdateConfiguredTopology() {
+ if (topology.get().isPresent()) {
+ final StreamsTopology streamsTopology = topology.get().get();
+
+ log.info("[GroupId {}] Configuring the topology {}", groupId,
streamsTopology);
+
this.configuredTopology.set(Optional.of(InternalTopicManager.configureTopics(logContext,
streamsTopology, partitionMetadata)));
+
+ } else {
+ configuredTopology.set(Optional.empty());
+ }
+ }
+
+ /**
+ * Updates the tasks process IDs based on the old and the new member.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ */
+ private void maybeUpdateTaskProcessId(
+ StreamsGroupMember oldMember,
+ StreamsGroupMember newMember
+ ) {
+ maybeRemoveTaskProcessId(oldMember);
+ addTaskProcessId(
+ newMember.assignedTasks(),
+ newMember.processId()
+ );
+ addTaskProcessId(
+ newMember.tasksPendingRevocation(),
+ newMember.processId()
+ );
+ }
+
+ /**
+ * Removes the task process IDs for the provided member.
+ *
+ * @param oldMember The old member.
+ */
+ private void maybeRemoveTaskProcessId(
+ StreamsGroupMember oldMember
+ ) {
+ if (oldMember != null) {
+ removeTaskProcessIds(oldMember.assignedTasks(),
oldMember.processId());
+ removeTaskProcessIds(oldMember.tasksPendingRevocation(),
oldMember.processId());
+ }
+ }
+
+ void removeTaskProcessIds(
+ TasksTuple tasks,
+ String processId
+ ) {
+ if (tasks != null) {
+ removeTaskProcessIds(tasks.activeTasks(),
currentActiveTaskToProcessId, processId);
+ removeTaskProcessIdsFromSet(tasks.standbyTasks(),
currentStandbyTaskToProcessIds, processId);
+ removeTaskProcessIdsFromSet(tasks.warmupTasks(),
currentWarmupTaskToProcessIds, processId);
+ }
+ }
+
+ /**
+ * Removes the task process IDs based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param expectedProcessId The expected process ID.
+ * @throws IllegalStateException if the process ID does not match the
expected one. package-private for testing.
+ */
+ private void removeTaskProcessIds(
+ Map<String, Set<Integer>> assignment,
+ TimelineHashMap<String, TimelineHashMap<Integer, String>>
currentTasksProcessId,
+ String expectedProcessId
+ ) {
+ assignment.forEach((subtopologyId, assignedPartitions) -> {
+ currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
+ if (partitionsOrNull != null) {
+ assignedPartitions.forEach(partitionId -> {
+ String prevValue =
partitionsOrNull.remove(partitionId);
+ if (!Objects.equals(prevValue, expectedProcessId)) {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s
from task %s_%s because the partition is " +
+ "still owned at a different process ID
%s", expectedProcessId, subtopologyId, partitionId, prevValue));
+ }
+ });
+ if (partitionsOrNull.isEmpty()) {
+ return null;
+ } else {
+ return partitionsOrNull;
+ }
+ } else {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s from %s
because it does not have any processId",
+ expectedProcessId, subtopologyId));
+ }
+ });
+ });
+ }
+
+ /**
+ * Removes the task process IDs based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param processIdToRemove The expected process ID.
+ * @throws IllegalStateException if the process ID does not match the
expected one. package-private for testing.
+ */
+ private void removeTaskProcessIdsFromSet(
+ Map<String, Set<Integer>> assignment,
+ TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>>
currentTasksProcessId,
+ String processIdToRemove
+ ) {
+ assignment.forEach((subtopologyId, assignedPartitions) -> {
+ currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
+ if (partitionsOrNull != null) {
+ assignedPartitions.forEach(partitionId -> {
+ if
(!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s
from task %s_%s because the task is " +
+ "not owned by this process ID",
processIdToRemove, subtopologyId, partitionId));
+ }
+ });
+ if (partitionsOrNull.isEmpty()) {
+ return null;
+ } else {
+ return partitionsOrNull;
+ }
+ } else {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s from %s
because it does not have any process ID",
+ processIdToRemove, subtopologyId));
+ }
+ });
+ });
+ }
+
+ /**
+ * Adds the partitions epoch based on the provided assignment.
+ *
+ * @param tasks The assigned tasks.
+ * @param processId The process ID.
+ * @throws IllegalStateException if the partition already has an epoch
assigned. package-private for testing.
+ */
+ void addTaskProcessId(
+ TasksTuple tasks,
+ String processId
+ ) {
+ if (tasks != null) {
+ addTaskProcessId(tasks.activeTasks(), processId,
currentActiveTaskToProcessId);
+ addTaskProcessIdToSet(tasks.standbyTasks(), processId,
currentStandbyTaskToProcessIds);
+ addTaskProcessIdToSet(tasks.warmupTasks(), processId,
currentWarmupTaskToProcessIds);
+ }
+ }
+
+ private void addTaskProcessId(
+ Map<String, Set<Integer>> tasks,
+ String processId,
+ TimelineHashMap<String, TimelineHashMap<Integer, String>>
currentTaskProcessId
+ ) {
+ tasks.forEach((subtopologyId, assignedTaskPartitions) -> {
+ currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull)
-> {
+ if (partitionsOrNull == null) {
+ partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedTaskPartitions.size());
+ }
+ for (Integer partitionId : assignedTaskPartitions) {
+ String prevValue = partitionsOrNull.put(partitionId,
processId);
+ if (prevValue != null) {
+ throw new IllegalStateException(
+ String.format("Cannot set the process ID of %s-%s
to %s because the partition is " +
+ "still owned by process ID %s", subtopologyId,
partitionId, processId, prevValue));
+ }
+ }
+ return partitionsOrNull;
+ });
+ });
+ }
+
+ private void addTaskProcessIdToSet(
+ Map<String, Set<Integer>> tasks,
+ String processId,
+ TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>>
currentTaskProcessId
+ ) {
+ tasks.forEach((subtopologyId, assignedTaskPartitions) -> {
+ currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull)
-> {
+ if (partitionsOrNull == null) {
+ partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedTaskPartitions.size());
+ }
+ for (Integer partitionId : assignedTaskPartitions) {
+ partitionsOrNull.computeIfAbsent(partitionId, ___ -> new
HashSet<>()).add(processId);
+ }
+ return partitionsOrNull;
+ });
+ });
+ }
+
+ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
+ long committedOffset
+ ) {
+ StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setGroupEpoch(groupEpoch.get(committedOffset))
+ .setGroupState(state.get(committedOffset).toString())
+ .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
+
.setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null));
+ members.entrySet(committedOffset).forEach(
+ entry -> describedGroup.members().add(
+ entry.getValue().asStreamsGroupDescribeMember(
+ targetAssignment.get(entry.getValue().memberId(),
committedOffset)
+ )
+ )
+ );
+ return describedGroup;
+ }
+
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
index 19a988373ef..f4fa3dc7aa7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
@@ -19,10 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
/**
* Immutable topic metadata, representing the current state of a topic in the
broker.
@@ -30,15 +27,12 @@ import java.util.Set;
* @param id The topic ID.
* @param name The topic name.
* @param numPartitions The number of partitions.
- * @param partitionRacks Map of every partition ID to a set of its rack IDs,
if they exist. If rack information is unavailable for all
- * partitions, this is an empty map.
*/
-public record TopicMetadata(Uuid id, String name, int numPartitions,
Map<Integer, Set<String>> partitionRacks) {
+public record TopicMetadata(Uuid id, String name, int numPartitions) {
public TopicMetadata(Uuid id,
String name,
- int numPartitions,
- Map<Integer, Set<String>> partitionRacks) {
+ int numPartitions) {
this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) {
throw new IllegalArgumentException("Topic id cannot be
ZERO_UUID.");
@@ -51,23 +45,12 @@ public record TopicMetadata(Uuid id, String name, int
numPartitions, Map<Integer
if (numPartitions <= 0) {
throw new IllegalArgumentException("Number of partitions must be
positive.");
}
- this.partitionRacks = Objects.requireNonNull(partitionRacks);
}
public static TopicMetadata
fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {
- // Converting the data type from a list stored in the record to a map
for the topic metadata.
- Map<Integer, Set<String>> partitionRacks = new HashMap<>();
- for (StreamsGroupPartitionMetadataValue.PartitionMetadata
partitionMetadata : record.partitionMetadata()) {
- partitionRacks.put(
- partitionMetadata.partition(),
- Set.copyOf(partitionMetadata.racks())
- );
- }
-
return new TopicMetadata(
record.topicId(),
record.topicName(),
- record.numPartitions(),
- partitionRacks);
+ record.numPartitions());
}
}
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
index 1f5eb8e8dcb..f9be55b9e42 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
@@ -28,14 +28,7 @@
{ "name": "TopicName", "versions": "0+", "type": "string",
"about": "The topic name." },
{ "name": "NumPartitions", "versions": "0+", "type": "int32",
- "about": "The number of partitions of the topic." },
- { "name": "PartitionMetadata", "versions": "0+", "type":
"[]PartitionMetadata",
- "about": "Partitions mapped to a set of racks. If the rack information
is unavailable for all the partitions, an empty list is stored.", "fields": [
- { "name": "Partition", "versions": "0+", "type": "int32",
- "about": "The partition number." },
- { "name": "Racks", "versions": "0+", "type": "[]string",
- "about": "The set of racks that the partition is mapped to." }
- ]}
+ "about": "The number of partitions of the topic." }
]}
]
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index 05da88f9f7a..e20640fb129 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.timeline.SnapshotRegistry;
import com.yammer.metrics.core.MetricsRegistry;
@@ -47,6 +48,7 @@ import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
+import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
import static
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertMetricsForTypeEqual;
import static
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
@@ -129,7 +131,37 @@ public class GroupCoordinatorMetricsTest {
GroupCoordinatorMetrics.METRICS_GROUP,
"The number of share groups in dead state.",
"protocol", Group.GroupType.SHARE.toString(),
- "state", GroupState.DEAD.toString())
+ "state", GroupState.DEAD.toString()),
+ metrics.metricName(
+ "group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("protocol",
Group.GroupType.STREAMS.toString())),
+ metrics.metricName("streams-group-rebalance-rate",
GroupCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName("streams-group-rebalance-count",
GroupCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state",
StreamsGroupState.EMPTY.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state",
StreamsGroupState.ASSIGNING.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state",
StreamsGroupState.RECONCILING.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state",
StreamsGroupState.STABLE.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state",
StreamsGroupState.DEAD.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state",
StreamsGroupState.NOT_READY.toString()))
));
try {
@@ -145,7 +177,7 @@ public class GroupCoordinatorMetricsTest {
));
assertMetricsForTypeEqual(registry, "kafka.coordinator.group",
expectedRegistry);
- expectedMetrics.forEach(metricName ->
assertTrue(metrics.metrics().containsKey(metricName)));
+ expectedMetrics.forEach(metricName ->
assertTrue(metrics.metrics().containsKey(metricName), metricName + " is
missing"));
}
assertMetricsForTypeEqual(registry, "kafka.coordinator.group",
Collections.emptySet());
expectedMetrics.forEach(metricName ->
assertFalse(metrics.metrics().containsKey(metricName)));
@@ -195,6 +227,10 @@ public class GroupCoordinatorMetricsTest {
IntStream.range(0, 5).forEach(__ ->
shard0.incrementNumShareGroups(ShareGroup.ShareGroupState.STABLE));
IntStream.range(0, 5).forEach(__ ->
shard1.incrementNumShareGroups(ShareGroup.ShareGroupState.EMPTY));
IntStream.range(0, 3).forEach(__ ->
shard1.decrementNumShareGroups(ShareGroup.ShareGroupState.DEAD));
+
+ IntStream.range(0, 5).forEach(__ ->
shard0.incrementNumStreamsGroups(StreamsGroupState.STABLE));
+ IntStream.range(0, 5).forEach(__ ->
shard1.incrementNumStreamsGroups(StreamsGroupState.EMPTY));
+ IntStream.range(0, 3).forEach(__ ->
shard1.decrementNumStreamsGroups(StreamsGroupState.DEAD));
assertEquals(4, shard0.numClassicGroups());
assertEquals(5, shard1.numClassicGroups());
@@ -228,6 +264,14 @@ public class GroupCoordinatorMetricsTest {
metrics.metricName("group-count", METRICS_GROUP,
Collections.singletonMap("protocol", "share")),
7
);
+
+ assertEquals(5, shard0.numStreamsGroups());
+ assertEquals(2, shard1.numStreamsGroups());
+ assertGaugeValue(
+ metrics,
+ metrics.metricName("group-count", METRICS_GROUP,
Collections.singletonMap("protocol", "streams")),
+ 7
+ );
}
@Test
@@ -269,6 +313,18 @@ public class GroupCoordinatorMetricsTest {
"The total number of share group rebalances",
"protocol", "share"
), 50);
+
+ shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);
+ assertMetricValue(metrics, metrics.metricName(
+ "streams-group-rebalance-rate",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ "The rate of streams group rebalances"
+ ), 5.0 / 3.0);
+ assertMetricValue(metrics, metrics.metricName(
+ "streams-group-rebalance-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ "The total number of streams group rebalances"
+ ), 50);
}
private void assertMetricValue(Metrics metrics, MetricName metricName,
double val) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 489d4fa0254..ab55dd3239b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -257,8 +257,8 @@ class StreamsCoordinatorRecordHelpersTest {
Uuid uuid1 = Uuid.randomUuid();
Uuid uuid2 = Uuid.randomUuid();
Map<String, TopicMetadata> newPartitionMetadata = Map.of(
- TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1, Map.of(0,
Set.of(RACK_1, RACK_2))),
- TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2, Map.of(1,
Set.of(RACK_3)))
+ TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1),
+ TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2)
);
StreamsGroupPartitionMetadataValue value = new
StreamsGroupPartitionMetadataValue();
@@ -266,21 +266,11 @@ class StreamsCoordinatorRecordHelpersTest {
.setTopicId(uuid1)
.setTopicName(TOPIC_1)
.setNumPartitions(1)
- .setPartitionMetadata(List.of(
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(0)
- .setRacks(List.of(RACK_1, RACK_2))
- ))
);
value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(uuid2)
.setTopicName(TOPIC_2)
.setNumPartitions(2)
- .setPartitionMetadata(List.of(
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(1)
- .setRacks(List.of(RACK_3))
- ))
);
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
new file mode 100644
index 00000000000..47dcf552fa6
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -0,0 +1,1099 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
+import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class StreamsGroupTest {
+
+ private static final LogContext LOG_CONTEXT = new LogContext();
+
+ private StreamsGroup createStreamsGroup(String groupId) {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ return new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ groupId,
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ }
+
+ @Test
+ public void testGetOrCreateMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ // Create a member.
+ member = streamsGroup.getOrMaybeCreateMember("member-id", true);
+ assertEquals("member-id", member.memberId());
+
+ // Add member to the group.
+ streamsGroup.updateMember(member);
+
+ // Get that member back.
+ member = streamsGroup.getOrMaybeCreateMember("member-id", false);
+ assertEquals("member-id", member.memberId());
+
+ assertThrows(UnknownMemberIdException.class, () ->
+ streamsGroup.getOrMaybeCreateMember("does-not-exist", false));
+ }
+
+ @Test
+ public void testUpdateMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+ member = new StreamsGroupMember.Builder(member).build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals(member, streamsGroup.getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testNoStaticMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ // Create a new member which is not static
+ streamsGroup.getOrMaybeCreateMember("member", true);
+ assertNull(streamsGroup.staticMember("instance-id"));
+ }
+
+ @Test
+ public void testGetStaticMemberByInstanceId() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+ member = new StreamsGroupMember.Builder(member)
+ .setInstanceId("instance")
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals(member, streamsGroup.staticMember("instance"));
+ assertEquals(member, streamsGroup.getOrMaybeCreateMember("member",
false));
+ assertEquals(member.memberId(),
streamsGroup.staticMemberId("instance"));
+ }
+
+ @Test
+ public void testRemoveMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ StreamsGroupMember member =
streamsGroup.getOrMaybeCreateMember("member", true);
+ streamsGroup.updateMember(member);
+ assertTrue(streamsGroup.hasMember("member"));
+
+ streamsGroup.removeMember("member");
+ assertFalse(streamsGroup.hasMember("member"));
+
+ }
+
+ @Test
+ public void testRemoveStaticMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setInstanceId("instance")
+ .build();
+
+ streamsGroup.updateMember(member);
+ assertTrue(streamsGroup.hasMember("member"));
+
+ streamsGroup.removeMember("member");
+ assertFalse(streamsGroup.hasMember("member"));
+ assertNull(streamsGroup.staticMember("instance"));
+ assertNull(streamsGroup.staticMemberId("instance"));
+ }
+
+ @Test
+ public void testUpdatingMemberUpdatesProcessId() {
+ String fooSubtopology = "foo-sub";
+ String barSubtopology = "bar-sub";
+ String zarSubtopology = "zar-sub";
+
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = new StreamsGroupMember.Builder("member")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process",
streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+ assertEquals("process",
streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+
+ member = new StreamsGroupMember.Builder(member)
+ .setProcessId("process1")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process1",
streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+ assertEquals("process1",
streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+ }
+
+ @Test
+ public void
testUpdatingMemberUpdatesTaskProcessIdWhenPartitionIsReassignedBeforeBeingRevoked()
{
+ String fooSubtopologyId = "foo-sub";
+
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = new StreamsGroupMember.Builder("member")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ emptyMap(),
+ emptyMap(),
+ emptyMap()
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+
+ member = new StreamsGroupMember.Builder(member)
+ .setProcessId("process1")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ )
+ )
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process1",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+ }
+
+ @Test
+ public void
testUpdatingMemberUpdatesTaskProcessIdWhenPartitionIsNotReleased() {
+ String fooSubtopologyId = "foo-sub";
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ emptyMap(),
+ emptyMap()
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(m1);
+
+ StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ emptyMap(),
+ emptyMap()
+ )
+ )
+ .build();
+
+ // m2 should not be able to acquire foo-1 because the partition is
+ // still owned by another member.
+ assertThrows(IllegalStateException.class, () ->
streamsGroup.updateMember(m2));
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testRemoveTaskProcessIds(TaskRole taskRole) {
+ String fooSubtopologyId = "foo-sub";
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ // Removing should fail because there is no epoch set.
+ assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
+ mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
+ "process"
+ ));
+
+ StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+ .setProcessId("process")
+ .setAssignedTasks(mkTasksTuple(taskRole, mkTasks(fooSubtopologyId,
1)))
+ .build();
+
+ streamsGroup.updateMember(m1);
+
+ // Removing should fail because the expected epoch is incorrect.
+ assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
+ mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
+ "process1"
+ ));
+ }
+
+ @Test
+ public void testAddTaskProcessIds() {
+ String fooSubtopologyId = "foo-sub";
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ streamsGroup.addTaskProcessId(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ ),
+ "process"
+ );
+
+ // Changing the epoch should fail because the owner of the partition
+ // should remove it first.
+ assertThrows(IllegalStateException.class, () ->
streamsGroup.addTaskProcessId(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ ),
+ "process"
+ ));
+ }
+
+ @Test
+ public void testDeletingMemberRemovesProcessId() {
+ String fooSubtopology = "foo-sub";
+ String barSubtopology = "bar-sub";
+ String zarSubtopology = "zar-sub";
+
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = new StreamsGroupMember.Builder("member")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process",
streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+ assertEquals(Collections.singleton("process"),
streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+ assertEquals(Collections.singleton("process"),
streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+ assertEquals("process",
streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+ assertEquals(Collections.singleton("process"),
streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+ assertEquals(Collections.singleton("process"),
streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+
+ streamsGroup.removeMember(member.memberId());
+
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 1));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 2));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 3));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 3));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 4));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 5));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(),
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+ }
+
+ @Test
+ public void testGroupState() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+ StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .build();
+
+ streamsGroup.updateMember(member1);
+ streamsGroup.setGroupEpoch(1);
+
+ assertEquals(MemberState.STABLE, member1.state());
+ assertEquals(StreamsGroup.StreamsGroupState.NOT_READY,
streamsGroup.state());
+
+ streamsGroup.setTopology(new StreamsTopology(1,
Collections.emptyMap()));
+
+ assertEquals(MemberState.STABLE, member1.state());
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
+
+ StreamsGroupMember member2 = new StreamsGroupMember.Builder("member2")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .build();
+
+ streamsGroup.updateMember(member2);
+ streamsGroup.setGroupEpoch(2);
+
+ assertEquals(MemberState.STABLE, member2.state());
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
+
+ streamsGroup.setTargetAssignmentEpoch(2);
+
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
+
+ member1 = new StreamsGroupMember.Builder(member1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(1)
+ .build();
+
+ streamsGroup.updateMember(member1);
+
+ assertEquals(MemberState.STABLE, member1.state());
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
+
+ // Member 2 is not stable so the group stays in reconciling state.
+ member2 = new StreamsGroupMember.Builder(member2)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(1)
+ .build();
+
+ streamsGroup.updateMember(member2);
+
+ assertEquals(MemberState.UNREVOKED_TASKS, member2.state());
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
+
+ member2 = new StreamsGroupMember.Builder(member2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(1)
+ .build();
+
+ streamsGroup.updateMember(member2);
+
+ assertEquals(MemberState.STABLE, member2.state());
+ assertEquals(StreamsGroup.StreamsGroupState.STABLE,
streamsGroup.state());
+
+ streamsGroup.removeMember("member1");
+ streamsGroup.removeMember("member2");
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY,
streamsGroup.state());
+ }
+
+ @Test
+ public void testMetadataRefreshDeadline() {
+ MockTime time = new MockTime();
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ // Group epoch starts at 0.
+ assertEquals(0, group.groupEpoch());
+
+ // The refresh time deadline should be empty when the group is created
or loaded.
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+
+ // Set the refresh deadline. The metadata remains valid because the
deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000,
group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000,
group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(),
group.metadataRefreshDeadline().epoch);
+
+ // Advance past the deadline. The metadata should have expired.
+ time.sleep(1001L);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+
+ // Set the refresh time deadline with a higher group epoch. The
metadata is considered
+ // as expired because the group epoch attached to the deadline is
higher than the
+ // current group epoch.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000,
group.groupEpoch() + 1);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000,
group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch() + 1,
group.metadataRefreshDeadline().epoch);
+
+ // Advance the group epoch.
+ group.setGroupEpoch(group.groupEpoch() + 1);
+
+ // Set the refresh deadline. The metadata remains valid because the
deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000,
group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000,
group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(),
group.metadataRefreshDeadline().epoch);
+
+ // Request metadata refresh. The metadata expires immediately.
+ group.requestMetadataRefresh();
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testValidateTransactionalOffsetCommit(short version) {
+ boolean isTransactional = true;
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+
+ // Simulate a call from the admin client without member ID and member
epoch.
+ // This should pass only if the group is empty.
+ group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("member-id", null, 0, isTransactional,
version));
+
+ // Create a member.
+ group.updateMember(new
StreamsGroupMember.Builder("member-id").setMemberEpoch(0).build());
+
+ // A call from the admin client should fail as the group is not empty.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("", "", -1, isTransactional, version));
+
+ // The member epoch is stale.
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10, isTransactional,
version));
+
+ // This should succeed.
+ group.validateOffsetCommit("member-id", "", 0, isTransactional,
version);
+
+ // This should succeed.
+ group.validateOffsetCommit("", null, -1, isTransactional, version);
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommit(short version) {
+ boolean isTransactional = false;
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ // Simulate a call from the admin client without member ID and member
epoch.
+ // This should pass only if the group is empty.
+ group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("member-id", null, 0, isTransactional,
version));
+
+ // Create members.
+ group.updateMember(
+ new StreamsGroupMember
+ .Builder("new-protocol-member-id").setMemberEpoch(0).build()
+ );
+
+ // A call from the admin client should fail as the group is not empty.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("", "", -1, isTransactional, version));
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("", null, -1, isTransactional,
version));
+
+ // The member epoch is stale.
+ if (version >= 9) {
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("new-protocol-member-id", "", 10,
isTransactional, version));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("new-protocol-member-id", "", 10,
isTransactional, version));
+ }
+
+ // This should succeed.
+ if (version >= 9) {
+ group.validateOffsetCommit("new-protocol-member-id", "", 0,
isTransactional, version);
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("new-protocol-member-id", "", 0,
isTransactional, version));
+ }
+ }
+
+ @Test
+ public void testAsListedGroup() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup group = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "group-foo",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ group.setGroupEpoch(1);
+ group.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+ group.setTargetAssignmentEpoch(1);
+ group.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+ snapshotRegistry.idempotentCreateSnapshot(1);
+
+ ListGroupsResponseData.ListedGroup listedGroup =
group.asListedGroup(1);
+
+ assertEquals("group-foo", listedGroup.groupId());
+ assertEquals("streams", listedGroup.protocolType());
+ assertEquals("Reconciling", listedGroup.groupState());
+ assertEquals("streams", listedGroup.groupType());
+ }
+
+ @Test
+ public void testValidateOffsetFetch() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup group = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "group-foo",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+
+ // Simulate a call from the admin client without member ID and member
epoch.
+ group.validateOffsetFetch(null, -1, Long.MAX_VALUE);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
+
+ // Create a member.
+ snapshotRegistry.idempotentCreateSnapshot(0);
+ group.updateMember(new
StreamsGroupMember.Builder("member-id").setMemberEpoch(0).build());
+
+ // The member does not exist at last committed offset 0.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetFetch("member-id", 0, 0));
+
+ // The member exists but the epoch is stale when the last committed
offset is not considered.
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE));
+
+ // This should succeed.
+ group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testValidateDeleteGroup() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+ assertDoesNotThrow(streamsGroup::validateDeleteGroup);
+
+ StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .build();
+ streamsGroup.updateMember(member1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.NOT_READY,
streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
+
+ streamsGroup.setTopology(new StreamsTopology(1,
Collections.emptyMap()));
+
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
+
+ streamsGroup.setGroupEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
+
+ streamsGroup.setTargetAssignmentEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.STABLE,
streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
+
+ streamsGroup.removeMember("member1");
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY,
streamsGroup.state());
+ assertDoesNotThrow(streamsGroup::validateDeleteGroup);
+ }
+
+ @Test
+ public void testOffsetExpirationCondition() {
+ long currentTimestamp = 30000L;
+ long commitTimestamp = 20000L;
+ long offsetsRetentionMs = 10000L;
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+ StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new
SnapshotRegistry(LOG_CONTEXT), "group-id",
mock(GroupCoordinatorMetricsShard.class));
+
+ Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
+ assertTrue(offsetExpirationCondition.isPresent());
+
+ OffsetExpirationConditionImpl condition =
(OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+ assertEquals(commitTimestamp,
condition.baseTimestamp().apply(offsetAndMetadata));
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+ }
+
+ @Test
+ public void testAsDescribedGroup() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
+ StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry,
"group-id-1", mock(GroupCoordinatorMetricsShard.class));
+ snapshotRegistry.idempotentCreateSnapshot(0);
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY.toString(),
group.stateAsString(0));
+
+ group.setGroupEpoch(1);
+ group.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+ group.setTargetAssignmentEpoch(1);
+ group.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setInstanceId("instance1")
+ .setRackId("rack1")
+ .setClientId("client1")
+ .setClientHost("host1")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(1)
+ .setProcessId("process1")
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092))
+ .setClientTags(Collections.singletonMap("tag1", "value1"))
+ .setAssignedTasks(new TasksTuple(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))
+ .setTasksPendingRevocation(new TasksTuple(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))
+ .build());
+ group.updateMember(new StreamsGroupMember.Builder("member2")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setInstanceId("instance2")
+ .setRackId("rack2")
+ .setClientId("client2")
+ .setClientHost("host2")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(1)
+ .setProcessId("process2")
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost("host2").setPort(9092))
+ .setClientTags(Collections.singletonMap("tag2", "value2"))
+ .setAssignedTasks(new TasksTuple(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))
+ .setTasksPendingRevocation(new TasksTuple(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))
+ .build());
+ snapshotRegistry.idempotentCreateSnapshot(1);
+
+ StreamsGroupDescribeResponseData.DescribedGroup expected = new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-id-1")
+ .setGroupState(StreamsGroup.StreamsGroupState.STABLE.toString())
+ .setGroupEpoch(1)
+ .setTopology(new
StreamsGroupDescribeResponseData.Topology().setEpoch(1).setSubtopologies(Collections.emptyList()))
+ .setAssignmentEpoch(1)
+ .setMembers(Arrays.asList(
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("member1")
+ .setMemberEpoch(1)
+ .setInstanceId("instance1")
+ .setRackId("rack1")
+ .setClientId("client1")
+ .setClientHost("host1")
+ .setTopologyEpoch(1)
+ .setProcessId("process1")
+ .setUserEndpoint(new
StreamsGroupDescribeResponseData.Endpoint().setHost("host1").setPort(9092))
+ .setClientTags(Collections.singletonList(new
StreamsGroupDescribeResponseData.KeyValue().setKey("tag1").setValue("value1")))
+ .setAssignment(new
StreamsGroupDescribeResponseData.Assignment())
+ .setTargetAssignment(new
StreamsGroupDescribeResponseData.Assignment()),
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("member2")
+ .setMemberEpoch(1)
+ .setInstanceId("instance2")
+ .setRackId("rack2")
+ .setClientId("client2")
+ .setClientHost("host2")
+ .setTopologyEpoch(1)
+ .setProcessId("process2")
+ .setUserEndpoint(new
StreamsGroupDescribeResponseData.Endpoint().setHost("host2").setPort(9092))
+ .setClientTags(Collections.singletonList(new
StreamsGroupDescribeResponseData.KeyValue().setKey("tag2").setValue("value2")))
+ .setAssignment(new
StreamsGroupDescribeResponseData.Assignment())
+ .setTargetAssignment(new
StreamsGroupDescribeResponseData.Assignment())
+ ));
+ StreamsGroupDescribeResponseData.DescribedGroup actual =
group.asDescribedGroup(1);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testStateTransitionMetrics() {
+ // Confirm metrics is not updated when a new StreamsGroup is created
but only when the group transitions
+ // its state.
+ GroupCoordinatorMetricsShard metrics =
mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(
+ LOG_CONTEXT,
+ new SnapshotRegistry(new LogContext()),
+ "group-id",
+ metrics
+ );
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY,
streamsGroup.state());
+ verify(metrics, times(0)).onStreamsGroupStateTransition(null,
StreamsGroup.StreamsGroupState.EMPTY);
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals(StreamsGroup.StreamsGroupState.NOT_READY,
streamsGroup.state());
+ verify(metrics,
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.EMPTY,
StreamsGroup.StreamsGroupState.NOT_READY);
+
+ streamsGroup.setTopology(new StreamsTopology(1,
Collections.emptyMap()));
+
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
+ verify(metrics,
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.NOT_READY,
StreamsGroup.StreamsGroupState.RECONCILING);
+
+ streamsGroup.setGroupEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
+ verify(metrics,
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.RECONCILING,
StreamsGroup.StreamsGroupState.ASSIGNING);
+
+ streamsGroup.setTargetAssignmentEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.STABLE,
streamsGroup.state());
+ verify(metrics,
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.ASSIGNING,
StreamsGroup.StreamsGroupState.STABLE);
+
+ streamsGroup.removeMember("member");
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY,
streamsGroup.state());
+ verify(metrics,
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.STABLE,
StreamsGroup.StreamsGroupState.EMPTY);
+ }
+
+ @Test
+ public void testIsInStatesCaseInsensitiveAndUnderscored() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ GroupCoordinatorMetricsShard metricsShard = new
GroupCoordinatorMetricsShard(
+ snapshotRegistry,
+ emptyMap(),
+ new TopicPartition("__consumer_offsets", 0)
+ );
+ StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry,
"group-foo", metricsShard);
+ snapshotRegistry.idempotentCreateSnapshot(0);
+ assertTrue(group.isInStates(Collections.singleton("empty"), 0));
+ assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
+
+ group.updateMember(new StreamsGroupMember.Builder("member1")
+ .build());
+ snapshotRegistry.idempotentCreateSnapshot(1);
+ assertTrue(group.isInStates(Collections.singleton("empty"), 0));
+ assertTrue(group.isInStates(Collections.singleton("not_ready"), 1));
+ assertFalse(group.isInStates(Collections.singleton("empty"), 1));
+ }
+
+ @Test
+ public void testSetTopologyUpdatesStateAndConfiguredTopology() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT,
snapshotRegistry, "test-group", metricsShard);
+
+ StreamsTopology topology = new StreamsTopology(1,
Collections.emptyMap());
+
+ ConfiguredTopology topo = mock(ConfiguredTopology.class);
+ when(topo.isReady()).thenReturn(true);
+
+ try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
+ mocked.when(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(Map.of()))).thenReturn(topo);
+ streamsGroup.setTopology(topology);
+ mocked.verify(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(Map.of())));
+ }
+
+ Optional<ConfiguredTopology> configuredTopology =
streamsGroup.configuredTopology();
+ assertTrue(configuredTopology.isPresent(), "Configured topology should
be present");
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+ streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+
+ assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
+ }
+
+ @Test
+ public void testSetPartitionMetadataUpdatesStateAndConfiguredTopology() {
+ Uuid topicUuid = Uuid.randomUuid();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT,
snapshotRegistry, "test-group", metricsShard);
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY,
streamsGroup.state());
+
+ Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
+ partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1",
1));
+
+ try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
+ streamsGroup.setPartitionMetadata(partitionMetadata);
+ mocked.verify(() -> InternalTopicManager.configureTopics(any(),
any(), any()), never());
+ }
+
+ assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured
topology should not be present");
+ assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
+
+ StreamsTopology topology = new StreamsTopology(1,
Collections.emptyMap());
+ streamsGroup.setTopology(topology);
+ ConfiguredTopology topo = mock(ConfiguredTopology.class);
+ when(topo.isReady()).thenReturn(true);
+
+ try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
+ mocked.when(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(partitionMetadata))).thenReturn(topo);
+ streamsGroup.setPartitionMetadata(partitionMetadata);
+ mocked.verify(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(partitionMetadata)));
+ }
+
+ Optional<ConfiguredTopology> configuredTopology =
streamsGroup.configuredTopology();
+ assertTrue(configuredTopology.isPresent(), "Configured topology should
be present");
+ assertEquals(topo, configuredTopology.get());
+ assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+ streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+
+ assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
+ }
+
+ @Test
+ public void testComputePartitionMetadata() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup streamsGroup = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "group-foo",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ TopicImage topicImage = mock(TopicImage.class);
+ when(topicImage.id()).thenReturn(Uuid.randomUuid());
+ when(topicImage.name()).thenReturn("topic1");
+ when(topicImage.partitions()).thenReturn(Collections.singletonMap(0,
null));
+ when(topicsImage.getTopic("topic1")).thenReturn(topicImage);
+ StreamsTopology topology = mock(StreamsTopology.class);
+
when(topology.requiredTopics()).thenReturn(Collections.singleton("topic1"));
+
+ Map<String, TopicMetadata> partitionMetadata =
streamsGroup.computePartitionMetadata(topicsImage, topology);
+
+ assertEquals(1, partitionMetadata.size());
+ assertTrue(partitionMetadata.containsKey("topic1"));
+ TopicMetadata topicMetadata = partitionMetadata.get("topic1");
+ assertNotNull(topicMetadata);
+ assertEquals(topicImage.id(), topicMetadata.id());
+ assertEquals("topic1", topicMetadata.name());
+ assertEquals(1, topicMetadata.numPartitions());
+ }
+
+ @Test
+ void testCreateGroupTombstoneRecords() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup streamsGroup = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "test-group",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+ List<CoordinatorRecord> records = new ArrayList<>();
+
+ streamsGroup.createGroupTombstoneRecords(records);
+
+ assertEquals(7, records.size());
+ for (CoordinatorRecord record : records) {
+ assertNotNull(record.key());
+ assertNull(record.value());
+ }
+ final Set<ApiMessage> keys =
records.stream().map(CoordinatorRecord::key).collect(Collectors.toSet());
+ assertTrue(keys.contains(new
StreamsGroupMetadataKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new
StreamsGroupTargetAssignmentMetadataKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new
StreamsGroupPartitionMetadataKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new
StreamsGroupTopologyKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new
StreamsGroupMemberMetadataKey().setGroupId("test-group").setMemberId("member1")));
+ assertTrue(keys.contains(new
StreamsGroupTargetAssignmentMemberKey().setGroupId("test-group").setMemberId("member1")));
+ assertTrue(keys.contains(new
StreamsGroupCurrentMemberAssignmentKey().setGroupId("test-group").setMemberId("member1")));
+ }
+
+ @Test
+ public void testIsSubscribedToTopic() {
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+ GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(logContext,
snapshotRegistry, "test-group", metricsShard);
+
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+ assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+ streamsGroup.setTopology(
+ new StreamsTopology(1,
+ Map.of("test-subtopology",
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("test-subtopology")
+ .setSourceTopics(List.of("test-topic1"))
+ .setRepartitionSourceTopics(List.of(new
StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
+ .setRepartitionSinkTopics(List.of("test-topic2"))
+ )
+ )
+ );
+
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+ assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+ streamsGroup.setPartitionMetadata(
+ Map.of(
+ "test-topic1", new TopicMetadata(Uuid.randomUuid(),
"test-topic1", 1),
+ "test-topic2", new TopicMetadata(Uuid.randomUuid(),
"test-topic2", 1)
+ )
+ );
+
+ assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
+ assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
+ assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+ }
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 114974558b8..0380d4cf5e7 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -45,7 +45,6 @@ import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
import static
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
@@ -144,8 +143,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -196,8 +195,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -261,8 +260,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -341,8 +340,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -429,8 +428,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6));
- String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6));
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
@@ -509,8 +508,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
@@ -581,8 +580,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", "instance-member-1",
mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
@@ -709,16 +708,14 @@ public class TargetAssignmentBuilderTest {
public String addSubtopologyWithSingleSourceTopic(
String topicName,
- int numTasks,
- Map<Integer, Set<String>> partitionRacks
+ int numTasks
) {
String subtopologyId = Uuid.randomUuid().toString();
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(
topicId,
topicName,
- numTasks,
- partitionRacks
+ numTasks
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId,
topicName, numTasks);
subtopologies.put(subtopologyId, new
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(),
Map.of()));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
index 38d63dab6ef..59712d5c954 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
@@ -21,11 +21,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadat
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,84 +30,60 @@ public class TopicMetadataTest {
@Test
public void testConstructor() {
assertDoesNotThrow(() ->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, new
HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3));
}
@Test
public void testConstructorWithZeroUuid() {
Exception exception = assertThrows(IllegalArgumentException.class, ()
->
- new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3, new
HashMap<>()));
+ new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3));
assertEquals("Topic id cannot be ZERO_UUID.", exception.getMessage());
}
@Test
public void testConstructorWithNullUuid() {
assertThrows(NullPointerException.class, () ->
- new TopicMetadata(null, "valid-topic", 3, new HashMap<>()));
+ new TopicMetadata(null, "valid-topic", 3));
}
@Test
public void testConstructorWithNullName() {
assertThrows(NullPointerException.class, () ->
- new TopicMetadata(Uuid.randomUuid(), null, 3, new HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), null, 3));
}
@Test
public void testConstructorWithEmptyName() {
Exception exception = assertThrows(IllegalArgumentException.class, ()
->
- new TopicMetadata(Uuid.randomUuid(), "", 3, new HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "", 3));
assertEquals("Topic name cannot be empty.", exception.getMessage());
}
@Test
public void testConstructorWithZeroNumPartitions() {
Exception exception = assertThrows(IllegalArgumentException.class, ()
->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0, new
HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0));
assertEquals("Number of partitions must be positive.",
exception.getMessage());
}
@Test
public void testConstructorWithNegativeNumPartitions() {
Exception exception = assertThrows(IllegalArgumentException.class, ()
->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1, new
HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1));
assertEquals("Number of partitions must be positive.",
exception.getMessage());
}
- @Test
- public void testConstructorWithNullPartitionRacks() {
- assertThrows(NullPointerException.class, () ->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, null));
- }
-
@Test
public void testFromRecord() {
StreamsGroupPartitionMetadataValue.TopicMetadata record = new
StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(Uuid.randomUuid())
.setTopicName("test-topic")
- .setNumPartitions(3)
- .setPartitionMetadata(List.of(
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(0)
- .setRacks(List.of("rack1", "rack2")),
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(1)
- .setRacks(List.of("rack3")),
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(2)
- .setRacks(List.of("rack4", "rack5"))
- ));
+ .setNumPartitions(3);
TopicMetadata topicMetadata = TopicMetadata.fromRecord(record);
assertEquals(record.topicId(), topicMetadata.id());
assertEquals(record.topicName(), topicMetadata.name());
assertEquals(record.numPartitions(), topicMetadata.numPartitions());
-
- Map<Integer, Set<String>> expectedPartitionRacks = new HashMap<>();
- expectedPartitionRacks.put(0, Set.of("rack1", "rack2"));
- expectedPartitionRacks.put(1, Set.of("rack3"));
- expectedPartitionRacks.put(2, Set.of("rack4", "rack5"));
-
- assertEquals(expectedPartitionRacks, topicMetadata.partitionRacks());
}
}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index 4d7fdc08d4c..6ec600a0c38 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -55,7 +55,7 @@ class InternalTopicManagerTest {
@Test
void
testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
- topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_1, 2, Collections.emptyMap()));
+ topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_1, 2));
// SOURCE_TOPIC_2 is missing from topicMetadata
StreamsTopology topology = makeTestTopology();
@@ -70,10 +70,10 @@ class InternalTopicManagerTest {
@Test
void testConfigureTopics() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
- topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_1, 2, Collections.emptyMap()));
- topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_2, 2, Collections.emptyMap()));
+ topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_1, 2));
+ topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_2, 2));
topicMetadata.put(STATE_CHANGELOG_TOPIC_2,
- new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2,
Collections.emptyMap()));
+ new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2));
StreamsTopology topology = makeTestTopology();
ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);