This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0953df59d7b7c81842eb672dbb7918d554ca7266 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon Oct 7 13:36:14 2024 +0200 MINOR: Fix unit tests (#17374) * MINOR: Fix unit tests Some unit tests were failing on the KIP1071 feature branch. This fixes them. * spotlessApply --- .../StreamsGroupHeartbeatRequestManager.java | 1 + .../StreamsGroupInitializeRequestManager.java | 1 + .../org/apache/kafka/common/protocol/Errors.java | 10 +++++----- .../requests/StreamsGroupDescribeRequest.java | 7 ++++--- .../requests/StreamsGroupDescribeResponse.java | 7 ++++--- .../StreamsGroupHeartbeatRequestManagerTest.java | 11 ++++++----- .../StreamsGroupInitializeRequestManagerTest.java | 1 + .../scala/unit/kafka/server/RequestQuotaTest.scala | 9 +++++++++ .../coordinator/group/GroupMetadataManager.java | 4 ++-- .../coordinator/group/streams/StreamsTopology.java | 4 ++-- .../group/taskassignor/MockAssignor.java | 1 - .../coordinator/group/streams/AssignmentTest.java | 1 + .../CoordinatorStreamsRecordHelpersTest.java | 1 + .../streams/CurrentAssignmentBuilderTest.java | 1 + .../group/streams/StreamsGroupBuilder.java | 3 ++- .../group/streams/StreamsGroupMemberTest.java | 1 + .../group/streams/StreamsGroupTest.java | 1 + .../group/streams/StreamsTopologyTest.java | 22 ++++++++++++---------- .../group/streams/TargetAssignmentBuilderTest.java | 1 + .../group/streams/TaskAssignmentTestUtil.java | 2 +- .../SmokeTestDriverIntegrationTest.java | 5 +++-- .../org/apache/kafka/streams/StreamsConfig.java | 2 +- .../internals/DefaultKafkaClientSupplier.java | 2 +- .../streams/processor/internals/StreamThread.java | 2 +- 24 files changed, 62 insertions(+), 38 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 3a80765206d..7e31859571d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; + import org.slf4j.Logger; import java.util.ArrayList; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java index 64565f347b1..b81beab78d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.StreamsGroupInitializeRequest; import org.apache.kafka.common.requests.StreamsGroupInitializeResponse; import org.apache.kafka.common.utils.LogContext; + import org.slf4j.Logger; import java.util.ArrayList; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 00ed250ec73..db5194d4a2b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -20,11 +20,6 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.BrokerNotAvailableException; -import org.apache.kafka.common.errors.StreamsInconsistentTopologyException; -import org.apache.kafka.common.errors.StreamsInvalidTopologyException; -import org.apache.kafka.common.errors.StreamsGroupUninitializedException; -import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; -import org.apache.kafka.common.errors.StreamsShutdownApplicationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; import org.apache.kafka.common.errors.ControllerMovedException; @@ -124,6 +119,11 @@ import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.errors.SnapshotNotFoundException; import org.apache.kafka.common.errors.StaleBrokerEpochException; import org.apache.kafka.common.errors.StaleMemberEpochException; +import org.apache.kafka.common.errors.StreamsGroupUninitializedException; +import org.apache.kafka.common.errors.StreamsInconsistentTopologyException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; +import org.apache.kafka.common.errors.StreamsShutdownApplicationException; import org.apache.kafka.common.errors.TelemetryTooLargeException; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.errors.TimeoutException; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java index 2a3834d16a8..1f0c46fafe1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java @@ -16,15 +16,16 @@ */ package org.apache.kafka.common.requests; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.stream.Collectors; import org.apache.kafka.common.message.StreamsGroupDescribeRequestData; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + public class StreamsGroupDescribeRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder<StreamsGroupDescribeRequest> { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java index cf1d5529623..83db6700a4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java @@ -16,14 +16,15 @@ */ package org.apache.kafka.common.requests; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + /** * Possible error codes. * diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index a635d7d418e..f4e5d814a57 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -37,10 +38,10 @@ import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.utils.LogContext; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -290,9 +291,9 @@ class StreamsGroupHeartbeatRequestManagerTest { mockResponse(data); - ArgumentCaptor<ConsumerGroupHeartbeatResponseData> captor = ArgumentCaptor.forClass(ConsumerGroupHeartbeatResponseData.class); - verify(membershipManager, times(1)).onHeartbeatSuccess(new ConsumerGroupHeartbeatResponse(captor.capture())); - ConsumerGroupHeartbeatResponseData response = captor.getValue(); + ArgumentCaptor<ConsumerGroupHeartbeatResponse> captor = ArgumentCaptor.forClass(ConsumerGroupHeartbeatResponse.class); + verify(membershipManager, times(1)).onHeartbeatSuccess(captor.capture()); + ConsumerGroupHeartbeatResponseData response = captor.getValue().data(); assertEquals(Errors.NONE.code(), response.errorCode()); assertEquals(TEST_MEMBER_ID, response.memberId()); assertEquals(TEST_MEMBER_EPOCH, response.memberEpoch()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java index ebba584b0db..11c92389f6a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.StreamsGroupInitializeRequest; import org.apache.kafka.common.utils.LogContext; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 4e481db1a21..154814e35c7 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -739,6 +739,15 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => new ReadShareGroupStateSummaryRequest.Builder(new ReadShareGroupStateSummaryRequestData(), true) + case ApiKeys.STREAMS_GROUP_DESCRIBE => + new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData(), true) + + case ApiKeys.STREAMS_GROUP_HEARTBEAT => + new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData(), true) + + case ApiKeys.STREAMS_GROUP_INITIALIZE => + new StreamsGroupInitializeRequest.Builder(new StreamsGroupInitializeRequestData(), true) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 80785af9c76..a8f742f0d26 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -141,8 +141,8 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuild import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; -import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.coordinator.group.streams.StreamsTopology; +import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicImage; @@ -222,8 +222,8 @@ import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecor import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupMemberTombstoneRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord; -import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedActiveTasksChanged; import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedStandbyTasksChanged; import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedWarmupTasksChanged; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java index 16b8f6d5178..27f138be05f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java @@ -16,14 +16,14 @@ */ package org.apache.kafka.coordinator.group.streams; -import java.util.HashMap; -import java.util.List; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java index 1a7502b9b52..dbe9809632e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java @@ -112,4 +112,3 @@ public class MockAssignor implements TaskAssignor { } } } - diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java index 65675136eb7..c3b3aebf42e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group.streams; import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; + import org.junit.jupiter.api.Test; import java.util.ArrayList; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java index 839897d4dae..7ddbc832d80 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.server.common.ApiMessageAndVersion; + import org.junit.jupiter.api.Test; import java.util.Collections; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java index d6f83025c7f..533b7d6ab3a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.FencedMemberEpochException; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java index e05365dbac8..b8b4ea8bcc1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java @@ -16,11 +16,12 @@ */ package org.apache.kafka.coordinator.group.streams; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; public class StreamsGroupBuilder { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java index 00f6ef6c676..9e7e09ac8c6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAss import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue.TaskIds; import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.KeyValue; + import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index a7066ec7915..fbe9ca1821e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.timeline.SnapshotRegistry; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java index 8cb9c9aa645..e83abcbfe7c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java @@ -16,11 +16,12 @@ */ package org.apache.kafka.coordinator.group.streams; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -28,11 +29,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; -import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; -import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; -import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; -import org.junit.jupiter.api.Test; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class StreamsTopologyTest { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java index e7b52deae42..e9324bcf696 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.coordinator.group.taskassignor.GroupSpecImpl; import org.apache.kafka.coordinator.group.taskassignor.MemberAssignment; import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.image.TopicsImage; + import org.junit.jupiter.api.Test; import java.util.ArrayList; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java index 24a0b42eec8..c3980d0343f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java @@ -23,8 +23,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Objects; import java.util.Map; +import java.util.Objects; import java.util.Set; public class TaskAssignmentTestUtil { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 78c23184f4f..ab8fb57d3fb 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -16,9 +16,8 @@ */ package org.apache.kafka.streams.integration; -import java.util.Locale; - import kafka.api.IntegrationTestHarness; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.GroupProtocol; @@ -26,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.tests.SmokeTestClient; import org.apache.kafka.streams.tests.SmokeTestDriver; + import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.CsvSource; import java.time.Duration; import java.util.ArrayList; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 6e171e022b5..476e582933e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams; -import java.util.Locale; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.admin.Admin; @@ -65,6 +64,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java index 1ec2053b3ec..e336aab5d03 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.Optional; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -30,6 +29,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; import java.util.Map; +import java.util.Optional; public class DefaultKafkaClientSupplier implements KafkaClientSupplier { @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f26f9d55089..0b69a099663 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -69,7 +69,6 @@ import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.Def import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.ThreadCache; -import java.util.List; import org.slf4j.Logger; import java.time.Duration; @@ -78,6 +77,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue;
