This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e19b7e0873701cc369d4027c3a9cc4c11219a49e Author: Bill <[email protected]> AuthorDate: Mon Nov 25 17:35:16 2024 -0500 Correctness updates from rebase --- .../clients/consumer/internals/AsyncKafkaConsumer.java | 4 +--- .../consumer/internals/StreamsMembershipManager.java | 1 + .../events/StreamsOnAssignmentCallbackNeededEvent.java | 1 - .../consumer/internals/AsyncKafkaConsumerTest.java | 5 +++-- .../internals/StreamsMembershipManagerTest.java | 2 +- .../kafka/coordinator/group/GroupCoordinatorShard.java | 17 +++++++++-------- .../kafka/coordinator/group/streams/StreamsGroup.java | 1 + .../group/streams/topics/InternalTopicManager.java | 1 + .../message/ConsumerGroupRegularExpressionKey.json | 4 ++-- .../message/StreamsGroupPartitionMetadataKey.json | 4 ++-- .../coordinator/group/GroupMetadataManagerTest.java | 3 +++ .../group/streams/StreamsConfiguredTopologyTest.java | 1 + .../coordinator/group/streams/StreamsGroupTest.java | 1 + .../coordinator/group/streams/StreamsTopologyTest.java | 1 + .../group/streams/topics/InternalTopicManagerTest.java | 1 + .../KafkaStreamsTelemetryIntegrationTest.java | 6 ++++++ .../integration/SmokeTestDriverIntegrationTest.java | 4 ++-- 17 files changed, 36 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 65cea62ea64..459d26c6010 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -276,7 +276,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { } }; - public AsyncKafkaConsumer(final ConsumerConfig config, + public AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer, final Optional<StreamsAssignmentInterface> streamsAssignmentInterface) { @@ -373,7 +373,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { metrics, offsetCommitCallbackInvoker, memberStateListener, - this::updateGroupMetadata, streamsAssignmentInterface ); final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, @@ -548,7 +547,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { metrics, offsetCommitCallbackInvoker, memberStateListener, - this::updateGroupMetadata, streamsInstanceMetadata ); Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java index ef39f661f67..fb34f50847b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; + import org.slf4j.Logger; import java.util.Collections; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java index 8c2f5a3bc81..5ef6cd9c462 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java @@ -39,4 +39,3 @@ public class StreamsOnAssignmentCallbackNeededEvent extends CompletableBackgroun ", assignment=" + assignment; } } - diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 3ba8e2a1cab..2079d0c44ac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -204,7 +204,8 @@ public class AsyncKafkaConsumerTest { a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, - backgroundEventQueue + backgroundEventQueue, + Optional.empty() ); } @@ -219,7 +220,7 @@ public class AsyncKafkaConsumerTest { (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, backgroundEventQueue, - Optional.empty() + Optional.empty() ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java index 4965f05198b..7d954477da9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java @@ -28,9 +28,9 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.utils.LogContext; - import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 895a7508696..4270e0d80c9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -110,6 +110,7 @@ import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; + import org.slf4j.Logger; import java.util.ArrayList; @@ -945,56 +946,56 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord ); break; - case 16: + case 15: groupMetadataManager.replay( (ConsumerGroupRegularExpressionKey) key.message(), (ConsumerGroupRegularExpressionValue) Utils.messageOrNull(value) ); break; - case 15: + case 16: groupMetadataManager.replay( (StreamsGroupMetadataKey) key.message(), (StreamsGroupMetadataValue) messageOrNull(value) ); break; - case 16: + case 17: groupMetadataManager.replay( (StreamsGroupPartitionMetadataKey) key.message(), (StreamsGroupPartitionMetadataValue) messageOrNull(value) ); break; - case 17: + case 18: groupMetadataManager.replay( (StreamsGroupMemberMetadataKey) key.message(), (StreamsGroupMemberMetadataValue) messageOrNull(value) ); break; - case 18: + case 19: groupMetadataManager.replay( (StreamsGroupTargetAssignmentMetadataKey) key.message(), (StreamsGroupTargetAssignmentMetadataValue) messageOrNull(value) ); break; - case 19: + case 20: groupMetadataManager.replay( (StreamsGroupTargetAssignmentMemberKey) key.message(), (StreamsGroupTargetAssignmentMemberValue) messageOrNull(value) ); break; - case 20: + case 21: groupMetadataManager.replay( (StreamsGroupCurrentMemberAssignmentKey) key.message(), (StreamsGroupCurrentMemberAssignmentValue) messageOrNull(value) ); break; - case 21: + case 22: groupMetadataManager.replay( (StreamsGroupTopologyKey) key.message(), (StreamsGroupTopologyValue) messageOrNull(value) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index fac219c5ed4..5ec4d1559bf 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -41,6 +41,7 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineInteger; import org.apache.kafka.timeline.TimelineObject; + import org.slf4j.Logger; import java.util.Collections; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java index cc2383fae42..9ebec6c30cb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.streams.TopicMetadata; + import org.slf4j.Logger; import java.util.Collection; diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json index 3f761b694e4..2cb82e4de10 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json @@ -20,9 +20,9 @@ "validVersions": "16", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "16", + { "name": "GroupId", "type": "string", "versions": "44", "about": "The group id." }, - { "name": "RegularExpression", "type": "string", "versions": "16", + { "name": "RegularExpression", "type": "string", "versions": "44", "about": "The regular expression." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json index 0d91a992d0c..618fbc78451 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupPartitionMetadataKey", - "validVersions": "16", + "validVersions": "56", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "16", + { "name": "GroupId", "type": "string", "versions": "56", "about": "The group id." } ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4cf7f099d26..e120a057642 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -114,7 +114,10 @@ import org.junit.jupiter.params.provider.ValueSource; import org.opentest4j.AssertionFailedError; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java index 9e91b36f0b2..e965ea58fac 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; + import org.junit.jupiter.api.Test; import java.util.Collections; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 1269587d6c4..dc928dd695d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState; import org.apache.kafka.timeline.SnapshotRegistry; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java index 54f8de1a3da..44074612a93 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java index ecde45d96d5..09e53428c08 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.streams.TopicMetadata; + import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index 18dbd2fa6d8..fc90c621435 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Metric; @@ -430,6 +431,11 @@ public class KafkaStreamsTelemetryIntegrationTest { return consumer; } + @Override + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { + return null; + } + @Override public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) { return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 498613ce783..4ed0e946426 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.integration; -import kafka.api.IntegrationTestHarness; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.utils.Exit; @@ -44,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") -public class SmokeTestDriverIntegrationTest extends IntegrationTestHarness { +public class SmokeTestDriverIntegrationTest { @Override public int brokerCount() {
