This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit dc5b2ef7ecd259fca8dc729de25339fce3bd7e04 Author: Bruno Cadonna <[email protected]> AuthorDate: Fri Aug 16 19:58:35 2024 +0200 Resolve conflict from 11/25 trunk rebase - Rebase on AK trunk 2024-08-15 --- .../internals/AbstractMembershipManager.java | 1 + .../consumer/internals/RequestManagers.java | 4 ++- .../StreamsGroupHeartbeatRequestManager.java | 18 ++++++------- .../StreamsGroupHeartbeatRequestManagerTest.java | 4 +-- .../events/ApplicationEventProcessorTest.java | 2 -- .../group/GroupCoordinatorRecordHelpers.java | 4 +-- .../coordinator/group/GroupMetadataManager.java | 2 +- .../streams/CoordinatorStreamsRecordHelpers.java | 31 ++++++++++++---------- .../StreamsGroupCurrentMemberAssignmentKey.json | 6 ++--- .../message/StreamsGroupMemberMetadataKey.json | 6 ++--- .../common/message/StreamsGroupMetadataKey.json | 4 +-- .../message/StreamsGroupPartitionMetadataKey.json | 4 +-- .../StreamsGroupTargetAssignmentMemberKey.json | 6 ++--- .../StreamsGroupTargetAssignmentMetadataKey.json | 4 +-- .../common/message/StreamsGroupTopologyKey.json | 4 +-- .../group/GroupCoordinatorServiceTest.java | 30 ++++++++++++++------- .../CoordinatorStreamsRecordHelpersTest.java | 2 +- .../group/streams/StreamsGroupTest.java | 6 ++--- .../assignment/TaskAssignorConvergenceTest.java | 4 +-- 19 files changed, 78 insertions(+), 64 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index c6aa70d805e..049944ed09f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -1270,6 +1270,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl protected void updateMemberEpoch(int newEpoch) { boolean newEpochReceived = this.memberEpoch != newEpoch; + log.info("Updating member epoch to {}", newEpoch); this.memberEpoch = newEpoch; // Simply notify based on epoch changes only, since the member will generate a member ID // at startup, and it will remain unchanged for its entire lifetime. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 0f7e7cf1178..7cf4a3e67f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -70,7 +70,7 @@ public class RequestManagers implements Closeable { Optional<CoordinatorRequestManager> coordinatorRequestManager, Optional<CommitRequestManager> commitRequestManager, Optional<ConsumerHeartbeatRequestManager> heartbeatRequestManager, - Optional<ConsumerMembershipManager> membershipManager) { + Optional<ConsumerMembershipManager> membershipManager, Optional<StreamsGroupHeartbeatRequestManager> streamsHeartbeatRequestManager, Optional<StreamsInitializeRequestManager> streamsInitializeRequestManager) { this.log = logContext.logger(RequestManagers.class); @@ -113,6 +113,8 @@ public class RequestManagers implements Closeable { this.shareHeartbeatRequestManager = shareHeartbeatRequestManager; this.consumerMembershipManager = Optional.empty(); this.shareMembershipManager = shareMembershipManager; + this.streamsHeartbeatRequestManager = Optional.empty(); + this.streamsInitializeRequestManager = Optional.empty(); this.offsetsRequestManager = null; this.topicMetadataRequestManager = null; this.fetchRequestManager = null; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 7b3713b4131..2b3b7c8f248 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -61,7 +61,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { private final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState; - private final MembershipManager membershipManager; + private final ConsumerMembershipManager membershipManager; private final StreamsInitializeRequestManager streamsInitializeRequestManager; @@ -83,7 +83,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, final StreamsInitializeRequestManager streamsInitializeRequestManager, - final MembershipManager membershipManager, + final ConsumerMembershipManager membershipManager, final BackgroundEventHandler backgroundEventHandler, final Metrics metrics, final StreamsAssignmentInterface streamsAssignmentInterface, @@ -158,7 +158,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); heartbeatRequestState.onSendAttempt(currentTimeMs); - membershipManager.onHeartbeatRequestSent(); + membershipManager.onHeartbeatRequestGenerated(); metricsManager.recordHeartbeatSentMs(currentTimeMs); return request; } @@ -335,7 +335,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { this.heartbeatState.reset(); this.heartbeatRequestState.onFailedAttempt(currentTimeMs); - membershipManager.onHeartbeatFailure(); + membershipManager.onHeartbeatFailure(false); switch (error) { case NOT_COORDINATOR: @@ -503,7 +503,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { static class HeartbeatState { - private final MembershipManager membershipManager; + private final ConsumerMembershipManager membershipManager; private final int rebalanceTimeoutMs; private final StreamsGroupHeartbeatRequestManager.HeartbeatState.SentFields sentFields; @@ -514,7 +514,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { public HeartbeatState( final StreamsAssignmentInterface streamsInterface, - final MembershipManager membershipManager, + final ConsumerMembershipManager membershipManager, final int rebalanceTimeoutMs) { this.membershipManager = membershipManager; this.rebalanceTimeoutMs = rebalanceTimeoutMs; @@ -552,9 +552,9 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { // Immutable -- only sent when joining if (joining) { data.setProcessId(streamsInterface.processID().toString()); -// data.setActiveTasks(Collections.emptyList()); -// data.setStandbyTasks(Collections.emptyList()); -// data.setWarmupTasks(Collections.emptyList()); + data.setActiveTasks(Collections.emptyList()); + data.setStandbyTasks(Collections.emptyList()); + data.setWarmupTasks(Collections.emptyList()); streamsInterface.endpoint().ifPresent(streamsEndpoint -> { data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() .setHost(streamsEndpoint.host) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index f4c67e1d9b9..aed1b4dafd7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -55,7 +55,6 @@ import java.util.Properties; import java.util.UUID; import java.util.stream.Collectors; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; @@ -74,6 +73,7 @@ class StreamsGroupHeartbeatRequestManagerTest { public static final int TEST_MEMBER_EPOCH = 5; public static final String TEST_INSTANCE_ID = "instanceId"; public static final int TEST_THROTTLE_TIME_MS = 5; + private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; private StreamsGroupHeartbeatRequestManager heartbeatRequestManager; private Time time; @@ -89,7 +89,7 @@ class StreamsGroupHeartbeatRequestManagerTest { private StreamsInitializeRequestManager streamsInitializeRequestManager; @Mock - private MembershipManager membershipManager; + private ConsumerMembershipManager membershipManager; @Mock private BackgroundEventHandler backgroundEventHandler; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 28f938c2657..c56bef00c9f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -97,8 +97,6 @@ public class ApplicationEventProcessorTest { withGroupId ? Optional.of(heartbeatRequestManager) : Optional.empty(), withGroupId ? Optional.of(membershipManager) : Optional.empty(), Optional.empty(), - Optional.empty(), - Optional.empty(), Optional.empty() ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 63d5523f137..7b7a2d701f8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -271,7 +271,7 @@ public class GroupCoordinatorRecordHelpers { * @param memberId The consumer group member id. * @return The record. */ - public static CoordinatorRecord newTargetAssignmentTombstoneRecord( + public static CoordinatorRecord newConsumerGroupTargetAssignmentTombstoneRecord( String groupId, String memberId ) { @@ -317,7 +317,7 @@ public class GroupCoordinatorRecordHelpers { * @param groupId The consumer group id. * @return The record. */ - public static CoordinatorRecord newConsumerTargetAssignmentEpochTombstoneRecord( + public static CoordinatorRecord newConsumerGroupTargetAssignmentEpochTombstoneRecord( String groupId ) { return new CoordinatorRecord( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index e934304b168..3e51e4b798a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3534,7 +3534,7 @@ public class GroupMetadataManager { if (!updatedMember.equals(member)) { records.add(newStreamsGroupMemberRecord(groupId, updatedMember)); - if (updatedMember.topologyId().equals(member.topologyId())) { + if (!updatedMember.topologyId().equals(member.topologyId())) { log.info("[GroupId {}] Member {} updated its topology ID to: {}.", groupId, memberId, updatedMember.topologyId()); return true; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java index f5cac3026aa..2f48151e137 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java @@ -58,7 +58,7 @@ public class CoordinatorStreamsRecordHelpers { new StreamsGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 14 + (short) 17 ), new ApiMessageAndVersion( new StreamsGroupMemberMetadataValue() @@ -96,7 +96,7 @@ public class CoordinatorStreamsRecordHelpers { new StreamsGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 14 + (short) 17 ), null // Tombstone. ); @@ -137,7 +137,7 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion( new StreamsGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 13 + (short) 16 ), new ApiMessageAndVersion( value, @@ -159,7 +159,7 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion( new StreamsGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 13 + (short) 16 ), null // Tombstone. ); @@ -173,7 +173,7 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion( new StreamsGroupMetadataKey() .setGroupId(groupId), - (short) 12 + (short) 15 ), new ApiMessageAndVersion( new StreamsGroupMetadataValue() @@ -196,7 +196,7 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion( new StreamsGroupMetadataKey() .setGroupId(groupId), - (short) 12 + (short) 15 ), null // Tombstone. ); @@ -239,7 +239,7 @@ public class CoordinatorStreamsRecordHelpers { new StreamsGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 16 + (short) 19 ), new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMemberValue() @@ -267,7 +267,7 @@ public class CoordinatorStreamsRecordHelpers { new StreamsGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 16 + (short) 19 ), null // Tombstone. ); @@ -282,7 +282,7 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 15 + (short) 18 ), new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMetadataValue() @@ -305,7 +305,7 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 15 + (short) 18 ), null // Tombstone. ); @@ -320,7 +320,7 @@ public class CoordinatorStreamsRecordHelpers { new StreamsGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 17 + (short) 20 ), new ApiMessageAndVersion( new StreamsGroupCurrentMemberAssignmentValue() @@ -352,7 +352,7 @@ public class CoordinatorStreamsRecordHelpers { new StreamsGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 17 + (short) 20 ), null // Tombstone ); @@ -402,7 +402,10 @@ public class CoordinatorStreamsRecordHelpers { .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); }); - return new CoordinatorRecord(new ApiMessageAndVersion(new StreamsGroupTopologyKey().setGroupId(groupId), (short) 18), + return new CoordinatorRecord(new ApiMessageAndVersion( + new StreamsGroupTopologyKey() + .setGroupId(groupId), + (short) 21), new ApiMessageAndVersion(value, (short) 0)); } @@ -419,7 +422,7 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion( new StreamsGroupTopologyKey() .setGroupId(groupId), - (short) 18 + (short) 21 ), null // Tombstone ); diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json index 771f7324a9e..a99e32754c6 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json @@ -17,12 +17,12 @@ { "type": "data", "name": "StreamsGroupCurrentMemberAssignmentKey", - "validVersions": "17", + "validVersions": "20", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "17", + { "name": "GroupId", "type": "string", "versions": "20", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "17", + { "name": "MemberId", "type": "string", "versions": "20", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json index bac9ac247cf..8e0f66deba7 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json @@ -17,12 +17,12 @@ { "type": "data", "name": "StreamsGroupMemberMetadataKey", - "validVersions": "14", + "validVersions": "17", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "14", + { "name": "GroupId", "type": "string", "versions": "17", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "14", + { "name": "MemberId", "type": "string", "versions": "17", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json index 26121bf2ba2..6c54fb8bd14 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupMetadataKey", - "validVersions": "12", + "validVersions": "15", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "12", + { "name": "GroupId", "type": "string", "versions": "15", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json index 546a8f80535..0d91a992d0c 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupPartitionMetadataKey", - "validVersions": "13", + "validVersions": "16", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "13", + { "name": "GroupId", "type": "string", "versions": "16", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json index 4fc8231ec3d..f96f6f89e01 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json @@ -17,12 +17,12 @@ { "type": "data", "name": "StreamsGroupTargetAssignmentMemberKey", - "validVersions": "16", + "validVersions": "19", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "16", + { "name": "GroupId", "type": "string", "versions": "19", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "16", + { "name": "MemberId", "type": "string", "versions": "19", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json index 02b40f727c4..514885f8ad7 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupTargetAssignmentMetadataKey", - "validVersions": "15", + "validVersions": "18", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "15", + { "name": "GroupId", "type": "string", "versions": "18", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json index 261b755cd51..2e51c117bd7 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupTopologyKey", - "validVersions": "18", + "validVersions": "21", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "18", + { "name": "GroupId", "type": "string", "versions": "21", "about": "The group id." } ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 5b1aad09027..dfd898c225c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -273,7 +273,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); StreamsGroupInitializeRequestData request = new StreamsGroupInitializeRequestData() @@ -298,7 +299,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); StreamsGroupInitializeRequestData request = new StreamsGroupInitializeRequestData() @@ -350,7 +352,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); StreamsGroupInitializeRequestData request = new StreamsGroupInitializeRequestData() @@ -385,7 +388,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData() @@ -410,7 +414,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData() @@ -461,7 +466,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData() @@ -1779,7 +1785,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); int partitionCount = 2; service.startup(() -> partitionCount); @@ -1821,7 +1828,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); int partitionCount = 1; service.startup(() -> partitionCount); @@ -1855,7 +1863,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); int partitionCount = 1; service.startup(() -> partitionCount); @@ -1887,7 +1896,8 @@ public class GroupCoordinatorServiceTest { new LogContext(), createConfig(), runtime, - new GroupCoordinatorMetrics() + new GroupCoordinatorMetrics(), + createConfigManager() ); when(runtime.scheduleReadOperation( ArgumentMatchers.eq("streams-group-describe"), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java index 44b46421205..cf1a003cd84 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java @@ -96,7 +96,7 @@ class CoordinatorStreamsRecordHelpersTest { new ApiMessageAndVersion( new StreamsGroupTopologyKey() .setGroupId("group-id"), - (short) 18), + (short) 21), new ApiMessageAndVersion( new StreamsGroupTopologyValue() .setTopology(expectedTopology), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 3da224f3a95..2fd5d461c99 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -649,7 +649,7 @@ public class StreamsGroupTest { group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE)); // Create a member. - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); group.updateMember(new StreamsGroupMember.Builder("member-id").build()); // The member does not exist at last committed offset 0. @@ -716,13 +716,13 @@ public class StreamsGroupTest { new TopicPartition("__consumer_offsets", 0) ); StreamsGroup group = new StreamsGroup(snapshotRegistry, "group-foo", metricsShard); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); assertTrue(group.isInStates(Collections.singleton("empty"), 0)); assertFalse(group.isInStates(Collections.singleton("Empty"), 0)); group.updateMember(new StreamsGroupMember.Builder("member1") .build()); - snapshotRegistry.getOrCreateSnapshot(1); + snapshotRegistry.idempotentCreateSnapshot((1)); assertTrue(group.isInStates(Collections.singleton("empty"), 0)); assertTrue(group.isInStates(Collections.singleton("stable"), 1)); assertFalse(group.isInStates(Collections.singleton("empty"), 1)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java index 936c9eb6a85..19eb361a1d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -471,8 +471,8 @@ public class TaskAssignorConvergenceTest { @ParameterizedTest @ValueSource(strings = { - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, +// StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +// StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY }) public void randomClusterPerturbationsShouldConverge(final String rackAwareStrategy) {
