This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e19b7e0873701cc369d4027c3a9cc4c11219a49e
Author: Bill <[email protected]>
AuthorDate: Mon Nov 25 17:35:16 2024 -0500

    Correctness updates from rebase
---
 .../clients/consumer/internals/AsyncKafkaConsumer.java  |  4 +---
 .../consumer/internals/StreamsMembershipManager.java    |  1 +
 .../events/StreamsOnAssignmentCallbackNeededEvent.java  |  1 -
 .../consumer/internals/AsyncKafkaConsumerTest.java      |  5 +++--
 .../internals/StreamsMembershipManagerTest.java         |  2 +-
 .../kafka/coordinator/group/GroupCoordinatorShard.java  | 17 +++++++++--------
 .../kafka/coordinator/group/streams/StreamsGroup.java   |  1 +
 .../group/streams/topics/InternalTopicManager.java      |  1 +
 .../message/ConsumerGroupRegularExpressionKey.json      |  4 ++--
 .../message/StreamsGroupPartitionMetadataKey.json       |  4 ++--
 .../coordinator/group/GroupMetadataManagerTest.java     |  3 +++
 .../group/streams/StreamsConfiguredTopologyTest.java    |  1 +
 .../coordinator/group/streams/StreamsGroupTest.java     |  1 +
 .../coordinator/group/streams/StreamsTopologyTest.java  |  1 +
 .../group/streams/topics/InternalTopicManagerTest.java  |  1 +
 .../KafkaStreamsTelemetryIntegrationTest.java           |  6 ++++++
 .../integration/SmokeTestDriverIntegrationTest.java     |  4 ++--
 17 files changed, 36 insertions(+), 21 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 65cea62ea64..459d26c6010 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -276,7 +276,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     };
 
-   public AsyncKafkaConsumer(final ConsumerConfig config,
+    public AsyncKafkaConsumer(final ConsumerConfig config,
                               final Deserializer<K> keyDeserializer,
                               final Deserializer<V> valueDeserializer,
                               final Optional<StreamsAssignmentInterface> 
streamsAssignmentInterface) {
@@ -373,7 +373,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     metrics,
                     offsetCommitCallbackInvoker,
                     memberStateListener,
-                    this::updateGroupMetadata,
                     streamsAssignmentInterface
             );
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
@@ -548,7 +547,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             metrics,
             offsetCommitCallbackInvoker,
             memberStateListener,
-            this::updateGroupMetadata,
             streamsInstanceMetadata
         );
         Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index ef39f661f67..fb34f50847b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+
 import org.slf4j.Logger;
 
 import java.util.Collections;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java
index 8c2f5a3bc81..5ef6cd9c462 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAssignmentCallbackNeededEvent.java
@@ -39,4 +39,3 @@ public class StreamsOnAssignmentCallbackNeededEvent extends 
CompletableBackgroun
             ", assignment=" + assignment;
     }
 }
-
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 3ba8e2a1cab..2079d0c44ac 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -204,7 +204,8 @@ public class AsyncKafkaConsumerTest {
             a -> backgroundEventReaper,
             (a, b, c, d, e, f, g) -> fetchCollector,
             (a, b, c, d) -> metadata,
-            backgroundEventQueue
+            backgroundEventQueue,
+                Optional.empty()
         );
     }
 
@@ -219,7 +220,7 @@ public class AsyncKafkaConsumerTest {
             (a, b, c, d, e, f, g) -> fetchCollector,
             (a, b, c, d) -> metadata,
             backgroundEventQueue,
-            Optional.empty()
+                Optional.empty()
         );
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 4965f05198b..7d954477da9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -28,9 +28,9 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
-
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 895a7508696..4270e0d80c9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -110,6 +110,7 @@ import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
+
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -945,56 +946,56 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 );
                 break;
 
-            case 16:
+            case 15:
                 groupMetadataManager.replay(
                     (ConsumerGroupRegularExpressionKey) key.message(),
                     (ConsumerGroupRegularExpressionValue) 
Utils.messageOrNull(value)
                 );
                 break;
 
-            case 15:
+            case 16:
                 groupMetadataManager.replay(
                     (StreamsGroupMetadataKey) key.message(),
                     (StreamsGroupMetadataValue) messageOrNull(value)
                 );
                 break;
 
-            case 16:
+            case 17:
                 groupMetadataManager.replay(
                     (StreamsGroupPartitionMetadataKey) key.message(),
                     (StreamsGroupPartitionMetadataValue) messageOrNull(value)
                 );
                 break;
 
-            case 17:
+            case 18:
                 groupMetadataManager.replay(
                     (StreamsGroupMemberMetadataKey) key.message(),
                     (StreamsGroupMemberMetadataValue) messageOrNull(value)
                 );
                 break;
 
-            case 18:
+            case 19:
                 groupMetadataManager.replay(
                     (StreamsGroupTargetAssignmentMetadataKey) key.message(),
                     (StreamsGroupTargetAssignmentMetadataValue) 
messageOrNull(value)
                 );
                 break;
 
-            case 19:
+            case 20:
                 groupMetadataManager.replay(
                     (StreamsGroupTargetAssignmentMemberKey) key.message(),
                     (StreamsGroupTargetAssignmentMemberValue) 
messageOrNull(value)
                 );
                 break;
 
-            case 20:
+            case 21:
                 groupMetadataManager.replay(
                     (StreamsGroupCurrentMemberAssignmentKey) key.message(),
                     (StreamsGroupCurrentMemberAssignmentValue) 
messageOrNull(value)
                 );
                 break;
 
-            case 21:
+            case 22:
                 groupMetadataManager.replay(
                     (StreamsGroupTopologyKey) key.message(),
                     (StreamsGroupTopologyValue) messageOrNull(value)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index fac219c5ed4..5ec4d1559bf 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -41,6 +41,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineInteger;
 import org.apache.kafka.timeline.TimelineObject;
+
 import org.slf4j.Logger;
 
 import java.util.Collections;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index cc2383fae42..9ebec6c30cb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -22,6 +22,7 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
 import org.slf4j.Logger;
 
 import java.util.Collection;
diff --git 
a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json
 
b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json
index 3f761b694e4..2cb82e4de10 100644
--- 
a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json
@@ -20,9 +20,9 @@
   "validVersions": "16",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "16",
+    { "name": "GroupId", "type": "string", "versions": "44",
       "about": "The group id." },
-    { "name": "RegularExpression", "type": "string", "versions": "16",
+    { "name": "RegularExpression", "type": "string", "versions": "44",
       "about": "The regular expression." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
index 0d91a992d0c..618fbc78451 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupPartitionMetadataKey",
-  "validVersions": "16",
+  "validVersions": "56",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "16",
+    { "name": "GroupId", "type": "string", "versions": "56",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4cf7f099d26..e120a057642 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -114,7 +114,10 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.opentest4j.AssertionFailedError;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java
index 9e91b36f0b2..e965ea58fac 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsConfiguredTopologyTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import 
org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 1269587d6c4..dc928dd695d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
 import org.apache.kafka.timeline.SnapshotRegistry;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
index 54f8de1a3da..44074612a93 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index ecde45d96d5..09e53428c08 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index 18dbd2fa6d8..fc90c621435 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Metric;
@@ -430,6 +431,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
             return consumer;
         }
 
+        @Override
+        public Consumer<byte[], byte[]> 
getStreamsRebalanceProtocolConsumer(Map<String, Object> config, 
StreamsAssignmentInterface assignmentInterface) {
+            return null;
+        }
+
         @Override
         public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, 
Object> config) {
             return new KafkaConsumer<>(config, new ByteArrayDeserializer(), 
new ByteArrayDeserializer());
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 498613ce783..4ed0e946426 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
-import kafka.api.IntegrationTestHarness;
+
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.utils.Exit;
@@ -44,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(600)
 @Tag("integration")
-public class SmokeTestDriverIntegrationTest extends IntegrationTestHarness {
+public class SmokeTestDriverIntegrationTest {
 
     @Override
     public int brokerCount() {

Reply via email to