This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2b43c49f51d KAFKA-18050 Upgrade the checkstyle version to 10.20.2
(#17999)
2b43c49f51d is described below
commit 2b43c49f51d8f8bdac5125ea20d41967d1f830cf
Author: Ken Huang <[email protected]>
AuthorDate: Thu Dec 5 10:59:18 2024 +0800
KAFKA-18050 Upgrade the checkstyle version to 10.20.2 (#17999)
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/checkstyle.xml | 4 ++-
.../kafka/clients/admin/KafkaAdminClient.java | 12 ++++----
.../internals/OffsetsForLeaderEpochUtils.java | 2 +-
.../apache/kafka/server/authorizer/Authorizer.java | 12 ++++----
.../kafka/clients/admin/KafkaAdminClientTest.java | 12 ++++----
.../ListConsumerGroupOffsetsHandlerTest.java | 20 +++++++-------
.../consumer/internals/ConsumerProtocolTest.java | 4 +--
.../producer/internals/TransactionManagerTest.java | 8 +++---
.../apache/kafka/common/network/SelectorTest.java | 4 +--
.../common/requests/OffsetFetchResponseTest.java | 20 +++++++-------
.../kafka/common/requests/RequestResponseTest.java | 6 ++--
.../connect/mirror/MirrorSourceConnectorTest.java | 4 +--
.../kafka/connect/runtime/AbstractHerderTest.java | 4 +--
.../transforms/predicates/HasHeaderKey.java | 5 ++--
.../java/kafka/admin/AdminFenceProducersTest.java | 10 +++----
.../test/java/kafka/admin/ClientTelemetryTest.java | 2 +-
.../test/java/kafka/admin/ConfigCommandTest.java | 4 +--
.../admin/UserScramCredentialsCommandTest.java | 4 +--
.../BootstrapControllersIntegrationTest.java | 8 +++---
gradle/dependencies.gradle | 5 +---
gradle/resources/dependencycheck-suppressions.xml | 11 --------
.../ConfigurationControlManagerTest.java | 8 +++---
.../kafka/controller/QuorumControllerTest.java | 8 +++---
.../controller/ReplicationControlManagerTest.java | 32 +++++++++-------------
.../kafka/server/AssignmentsManagerTest.java | 10 +++----
.../kafka/storage/internals/log/LogSegment.java | 4 +--
.../storage/internals/log/LogValidatorTest.java | 26 +++++++++---------
.../examples/pageview/PageViewTypedDemo.java | 12 ++++----
...ighAvailabilityTaskAssignorIntegrationTest.java | 14 ++++++----
.../integration/QueryableStateIntegrationTest.java | 9 +++---
.../kafka/streams/integration/StoreQuerySuite.java | 16 +++++------
.../kstream/internals/StreamStreamJoinUtil.java | 2 +-
.../streams/processor/internals/TaskSuite.java | 14 +++++-----
.../streams/state/internals/RocksDBStoreTest.java | 6 ++--
.../org/apache/kafka/streams/utils/TestUtils.java | 1 +
.../common/test/api/ClusterTestExtensionsTest.java | 4 +--
.../java/org/apache/kafka/tools/ToolsUtils.java | 2 +-
.../kafka/tools/BrokerApiVersionsCommandTest.java | 2 +-
.../kafka/tools/MetadataQuorumCommandTest.java | 8 +++---
.../reassign/ReassignPartitionsCommandTest.java | 32 +++++++++++-----------
.../org/apache/kafka/trogdor/fault/Kibosh.java | 4 +--
.../org/apache/kafka/trogdor/rest/TaskState.java | 10 +++----
.../kafka/trogdor/workload/RecordProcessor.java | 2 +-
43 files changed, 186 insertions(+), 201 deletions(-)
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 6b8aedb6e47..fa1d5873a2c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -39,7 +39,9 @@
<module name="EqualsHashCode"/>
<module name="SimplifyBooleanExpression"/>
<module name="OneStatementPerLine"/>
- <module name="UnnecessaryParentheses" />
+ <module name="UnnecessaryParentheses">
+ <property name="tokens" value="IDENT, NUM_DOUBLE, LAMBDA,
TEXT_BLOCK_LITERAL_BEGIN, UNARY_MINUS, UNARY_PLUS, INC, DEC, POST_INC,
POST_DEC" />
+ </module>
<module name="SimplifyBooleanReturn"/>
<!-- style -->
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e6986c8a378..bd135acc74b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4822,12 +4822,12 @@ public class KafkaAdminClient extends AdminClient {
setHost(endpoint.host()).
setPort(endpoint.port())));
return new AddRaftVoterRequest.Builder(
- new AddRaftVoterRequestData().
- setClusterId(options.clusterId().orElse(null)).
- setTimeoutMs(timeoutMs).
- setVoterId(voterId) .
- setVoterDirectoryId(voterDirectoryId).
- setListeners(listeners));
+ new AddRaftVoterRequestData().
+ setClusterId(options.clusterId().orElse(null)).
+ setTimeoutMs(timeoutMs).
+ setVoterId(voterId) .
+ setVoterDirectoryId(voterDirectoryId).
+ setListeners(listeners));
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
index 033daa09876..ae4b9f71f6c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
@@ -44,7 +44,7 @@ public final class OffsetsForLeaderEpochUtils {
private static final Logger LOG =
LoggerFactory.getLogger(OffsetsForLeaderEpochUtils.class);
- private OffsetsForLeaderEpochUtils(){}
+ private OffsetsForLeaderEpochUtils() {}
static AbstractRequest.Builder<OffsetsForLeaderEpochRequest>
prepareRequest(
Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
diff --git
a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index ca2a76b4716..58241c51ea0 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -202,14 +202,14 @@ public interface Authorizer extends Configurable,
Closeable {
EnumMap<PatternType, Set<String>> denyPatterns =
new EnumMap<>(PatternType.class) {{
- put(PatternType.LITERAL, new HashSet<>());
- put(PatternType.PREFIXED, new HashSet<>());
- }};
+ put(PatternType.LITERAL, new HashSet<>());
+ put(PatternType.PREFIXED, new HashSet<>());
+ }};
EnumMap<PatternType, Set<String>> allowPatterns =
new EnumMap<>(PatternType.class) {{
- put(PatternType.LITERAL, new HashSet<>());
- put(PatternType.PREFIXED, new HashSet<>());
- }};
+ put(PatternType.LITERAL, new HashSet<>());
+ put(PatternType.PREFIXED, new HashSet<>());
+ }};
boolean hasWildCardAllow = false;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cf5fbc938e3..ca70af6d114 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2061,9 +2061,9 @@ public class KafkaAdminClientTest {
new DescribeConfigsResponseData().setResults(asList(new
DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(brokerResource.name()).setResourceType(brokerResource.type().id()).setErrorCode(Errors.NONE.code())
.setConfigs(emptyList()),
- new DescribeConfigsResponseData.DescribeConfigsResult()
-
.setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code())
- .setConfigs(emptyList())))), env.cluster().nodeById(0));
+ new DescribeConfigsResponseData.DescribeConfigsResult()
+
.setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code())
+ .setConfigs(emptyList())))),
env.cluster().nodeById(0));
Map<ConfigResource, KafkaFuture<Config>> result =
env.adminClient().describeConfigs(asList(
brokerResource,
brokerLoggerResource)).values();
@@ -2102,9 +2102,9 @@ public class KafkaAdminClientTest {
new DescribeConfigsResponseData().setResults(asList(new
DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(topic.name()).setResourceType(topic.type().id()).setErrorCode(Errors.NONE.code())
.setConfigs(emptyList()),
- new DescribeConfigsResponseData.DescribeConfigsResult()
-
.setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code())
- .setConfigs(emptyList())))));
+ new DescribeConfigsResponseData.DescribeConfigsResult()
+
.setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code())
+ .setConfigs(emptyList())))));
Map<ConfigResource, KafkaFuture<Config>> result =
env.adminClient().describeConfigs(singletonList(
topic)).values();
assertEquals(new HashSet<>(singletonList(topic)), result.keySet());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index d9935fbdca6..e3bb56347a8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -177,11 +177,11 @@ public class ListConsumerGroupOffsetsHandlerTest {
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapTwo =
Collections.singletonMap(t2p2, new OffsetAndMetadata(10L));
Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult =
- new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>() {{
- put(groupZero, offsetAndMetadataMapZero);
- put(groupOne, offsetAndMetadataMapOne);
- put(groupTwo, offsetAndMetadataMapTwo);
- }};
+ new HashMap<>() {{
+ put(groupZero, offsetAndMetadataMapZero);
+ put(groupOne, offsetAndMetadataMapOne);
+ put(groupTwo, offsetAndMetadataMapTwo);
+ }};
assertCompletedForMultipleGroups(
handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION),
expectedResult);
@@ -304,11 +304,11 @@ public class ListConsumerGroupOffsetsHandlerTest {
responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", Errors.NONE));
Map<String, Map<TopicPartition, PartitionData>> responseData =
- new HashMap<String, Map<TopicPartition, PartitionData>>() {{
- put(groupZero, responseDataZero);
- put(groupOne, responseDataOne);
- put(groupTwo, responseDataTwo);
- }};
+ new HashMap<>() {{
+ put(groupZero, responseDataZero);
+ put(groupOne, responseDataOne);
+ put(groupTwo, responseDataTwo);
+ }};
Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
return new OffsetFetchResponse(0, errorMap, responseData);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 07808f29806..b73576757e8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -298,8 +298,8 @@ public class ConsumerProtocolTest {
if (version >= 1) {
assertEquals(
Set.of(
- new
ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)),
- new
ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0)
+ new
ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)),
+ new
ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0)
)),
Set.copyOf(parsedSubscription.ownedPartitions())
);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 9ae80dc19ba..6143daaa967 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1815,10 +1815,10 @@ public class TransactionManagerTest {
@ParameterizedTest
@EnumSource(names = {
- "UNKNOWN_TOPIC_OR_PARTITION",
- "REQUEST_TIMED_OUT",
- "COORDINATOR_LOAD_IN_PROGRESS",
- "CONCURRENT_TRANSACTIONS"
+ "UNKNOWN_TOPIC_OR_PARTITION",
+ "REQUEST_TIMED_OUT",
+ "COORDINATOR_LOAD_IN_PROGRESS",
+ "CONCURRENT_TRANSACTIONS"
})
public void testRetriableErrors(Errors error) {
// Ensure FindCoordinator retries.
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index b4c73d64d38..d955c7939ae 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -907,8 +907,8 @@ public class SelectorTest {
}
assertNotNull(selector.lowestPriorityChannel());
for (int i = conns - 1; i >= 0; i--) {
- if (i != 2)
- assertEquals("", blockingRequest(String.valueOf(i), ""));
+ if (i != 2)
+ assertEquals("", blockingRequest(String.valueOf(i), ""));
time.sleep(10);
}
assertEquals("2", selector.lowestPriorityChannel().id());
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index c85d26dac57..d0ef79b4479 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -390,16 +390,16 @@ public class OffsetFetchResponseTest {
.setErrorCode(Errors.NOT_COORDINATOR.code())
.setThrottleTimeMs(throttleTimeMs)
.setTopics(Collections.singletonList(
- new OffsetFetchResponseTopic()
- .setName(topicOne)
- .setPartitions(Collections.singletonList(
- new OffsetFetchResponsePartition()
- .setPartitionIndex(partitionOne)
- .setCommittedOffset(offset)
-
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
- .setMetadata(metadata))
- ))
+ new OffsetFetchResponseTopic()
+ .setName(topicOne)
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setCommittedOffset(offset)
+
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+ .setMetadata(metadata))
+ ))
);
assertEquals(expectedData, response.data());
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cb6a2458261..0d32fd442af 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2187,9 +2187,9 @@ public class RequestResponseTest {
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
Collections.singleton(
- new JoinGroupRequestData.JoinGroupRequestProtocol()
- .setName("consumer-range")
- .setMetadata(new byte[0])).iterator()
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("consumer-range")
+ .setMetadata(new byte[0])).iterator()
);
JoinGroupRequestData data = new JoinGroupRequestData()
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index a410adde944..588baf8c090 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -167,10 +167,10 @@ public class MirrorSourceConnectorTest {
new DefaultReplicationPolicy(), x -> true,
getConfigPropertyFilter());
assertFalse(connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.WRITE,
AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
+ new AccessControlEntry("kafka", "", AclOperation.WRITE,
AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
assertTrue(connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.ALL,
AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
+ new AccessControlEntry("kafka", "", AclOperation.ALL,
AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
}
@Test
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 01f847d94f6..aa715667d24 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -1196,7 +1196,7 @@ public class AbstractHerderTest {
keys.putAll(configDef.configKeys());
}
- protected void addValue(List<ConfigValue> values, String name, String
value, String...errors) {
+ protected void addValue(List<ConfigValue> values, String name, String
value, String... errors) {
values.add(new ConfigValue(name, value, new ArrayList<>(),
Arrays.asList(errors)));
}
@@ -1211,7 +1211,7 @@ public class AbstractHerderTest {
assertNull(info.configKey());
}
- protected void assertInfoValue(ConfigInfos infos, String name, String
value, String...errors) {
+ protected void assertInfoValue(ConfigInfos infos, String name, String
value, String... errors) {
ConfigValueInfo info = findInfo(infos, name).configValue();
assertEquals(name, info.name());
assertEquals(value, info.value());
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
index fe0a99c4abe..566bdfab238 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
@@ -35,9 +35,8 @@ public class HasHeaderKey<R extends ConnectRecord<R>>
implements Predicate<R>, V
private static final String NAME_CONFIG = "name";
public static final String OVERVIEW_DOC = "A predicate which is true for
records with at least one header with the configured name.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(NAME_CONFIG, ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
- new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
- "The header name.");
+ .define(NAME_CONFIG, ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
+ new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The header name.");
private String name;
@Override
diff --git a/core/src/test/java/kafka/admin/AdminFenceProducersTest.java
b/core/src/test/java/kafka/admin/AdminFenceProducersTest.java
index 4b8178032e0..d4b87ef8ed5 100644
--- a/core/src/test/java/kafka/admin/AdminFenceProducersTest.java
+++ b/core/src/test/java/kafka/admin/AdminFenceProducersTest.java
@@ -50,11 +50,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(serverProperties = {
- @ClusterConfigProperty(key =
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
- @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
- @ClusterConfigProperty(key =
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
value = "2000")
+ @ClusterConfigProperty(key =
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
+ @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
value = "2000")
})
@ExtendWith(ClusterTestExtensions.class)
public class AdminFenceProducersTest {
diff --git a/core/src/test/java/kafka/admin/ClientTelemetryTest.java
b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
index 56d466bd27b..f701d3f3845 100644
--- a/core/src/test/java/kafka/admin/ClientTelemetryTest.java
+++ b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
@@ -75,7 +75,7 @@ public class ClientTelemetryTest {
types = Type.KRAFT,
brokers = 3,
serverProperties = {
- @ClusterConfigProperty(key =
METRIC_REPORTER_CLASSES_CONFIG, value =
"kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"),
+ @ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG,
value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"),
})
public void testClientInstanceId(ClusterInstance clusterInstance) throws
InterruptedException, ExecutionException {
Map<String, Object> configs = new HashMap<>();
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 70b1faee43b..10c24111e47 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -420,7 +420,7 @@ public class ConfigCommandTest {
assertEquals("[[1, 2], [3, 4]]", addedProps.getProperty("nested"));
}
- public void testExpectedEntityTypeNames(List<String> expectedTypes,
List<String> expectedNames, List<String> connectOpts, String...args) {
+ public void testExpectedEntityTypeNames(List<String> expectedTypes,
List<String> expectedNames, List<String> connectOpts, String... args) {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0),
connectOpts.get(1), "--describe"), Arrays.asList(args)));
createOpts.checkArgs();
assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes));
@@ -1434,7 +1434,7 @@ public class ConfigCommandTest {
}
@SafeVarargs
- public static <K, V> Map<K, V> concat(Map<K, V>...maps) {
+ public static <K, V> Map<K, V> concat(Map<K, V>... maps) {
Map<K, V> res = new HashMap<>();
Stream.of(maps)
.map(Map::entrySet)
diff --git
a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
index 55db18893f8..697dda07363 100644
--- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
+++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
@@ -62,10 +62,10 @@ public class UserScramCredentialsCommandTest {
}
}
- private ConfigCommandResult runConfigCommandViaBroker(String...args) {
+ private ConfigCommandResult runConfigCommandViaBroker(String... args) {
AtomicReference<OptionalInt> exitStatus = new
AtomicReference<>(OptionalInt.empty());
Exit.setExitProcedure((status, __) -> {
- exitStatus.set(OptionalInt.of((Integer) status));
+ exitStatus.set(OptionalInt.of(status));
throw new RuntimeException();
});
diff --git
a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
index 1462e1c7f64..75cd070b93d 100644
--- a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
+++ b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
@@ -311,16 +311,16 @@ public class BootstrapControllersIntegrationTest {
}
@ClusterTest(serverProperties = {
- @ClusterConfigProperty(key =
StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
"org.apache.kafka.metadata.authorizer.StandardAuthorizer")
+ @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG,
value = "User:ANONYMOUS"),
+ @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
"org.apache.kafka.metadata.authorizer.StandardAuthorizer")
})
public void testAclsByControllers(ClusterInstance clusterInstance) throws
Exception {
testAcls(clusterInstance, true);
}
@ClusterTest(serverProperties = {
- @ClusterConfigProperty(key =
StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
"org.apache.kafka.metadata.authorizer.StandardAuthorizer")
+ @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG,
value = "User:ANONYMOUS"),
+ @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
"org.apache.kafka.metadata.authorizer.StandardAuthorizer")
})
public void testAcls(ClusterInstance clusterInstance) throws Exception {
testAcls(clusterInstance, false);
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 59b523ed337..bc4d6fcea34 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -58,10 +58,7 @@ versions += [
// but currently, tests are failing in >=3.1.2. Therefore, we are
temporarily using version 3.1.1.
// The failing tests should be fixed under KAFKA-18089, allowing us to
upgrade to >=3.1.2.
caffeine: "3.1.1",
- // when updating checkstyle, check whether the exclusion of
- // CVE-2023-2976 and CVE-2020-8908 can be dropped from
- // gradle/resources/dependencycheck-suppressions.xml
- checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion :
"8.36.2",
+ checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion :
"10.20.2",
commonsCli: "1.4",
commonsIo: "2.14.0", // ZooKeeper dependency. Do not use, this is going away.
commonsValidator: "1.9.0",
diff --git a/gradle/resources/dependencycheck-suppressions.xml
b/gradle/resources/dependencycheck-suppressions.xml
index 5ce34df1d2e..b28adc90028 100644
--- a/gradle/resources/dependencycheck-suppressions.xml
+++ b/gradle/resources/dependencycheck-suppressions.xml
@@ -23,17 +23,6 @@
]]></notes>
<cve>CVE-2023-35116</cve>
</suppress>
- <suppress>
- <notes><![CDATA[
- This older version of Guava is only included in checkstyle.
- CVE-2023-2976 and CVE-2020-8908 are irrelevant for checkstyle,
- as it is not executed with elevated privileges.
- This suppression will no longer be needed when checkstyle
- is updated to 10.5.0 or later.
- ]]></notes>
- <cve>CVE-2020-8908</cve>
- <cve>CVE-2023-2976</cve>
- </suppress>
<suppress>
<notes><![CDATA[
Kafka does not use CgiServlet
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 2ce417f63aa..aa92a7ed0d7 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -316,10 +316,10 @@ public class ConfigurationControlManagerTest {
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("foo.bar").setValue("123"),
CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
- new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
- setName("quux").setValue("456"),
CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
- new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
- setName("broker.config.to.remove").setValue(null),
CONFIG_RECORD.highestSupportedVersion())
+ new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
+ setName("quux").setValue("456"),
CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
+ new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
+
setName("broker.config.to.remove").setValue(null),
CONFIG_RECORD.highestSupportedVersion())
),
toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
"Expected:
AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 6e64483a311..ee91e5f0582 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -867,10 +867,10 @@ public class QuorumControllerTest {
Arrays.asList(new CreatableReplicaAssignment().
setPartitionIndex(0).
setBrokerIds(Arrays.asList(0, 1, 2)),
- new CreatableReplicaAssignment().
- setPartitionIndex(1).
- setBrokerIds(Arrays.asList(1, 2, 0))).
- iterator()))).iterator())),
+ new CreatableReplicaAssignment().
+ setPartitionIndex(1).
+ setBrokerIds(Arrays.asList(1, 2, 0))).
+ iterator()))).iterator())),
Collections.singleton("foo")).get();
fooId = fooData.topics().find("foo").topicId();
active.allocateProducerIds(ANONYMOUS_CONTEXT,
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 55ed04c15a7..58d97c80b7c 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -1800,7 +1800,7 @@ public class ReplicationControlManagerTest {
setReplicas(asList(0, 2, 1)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(asList(0, 2, 1)))),
- new ReassignableTopic().setName("bar"))));
+ new ReassignableTopic().setName("bar"))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(asList(
new
ReassignableTopicResponse().setName("foo").setPartitions(asList(
@@ -2151,27 +2151,21 @@ public class ReplicationControlManagerTest {
setReplicas(asList(5, 6, 7)),
new ReassignablePartition().setPartitionIndex(3).
setReplicas(Collections.emptyList()))),
- new
ReassignableTopic().setName("bar").setPartitions(singletonList(
+ new
ReassignableTopic().setName("bar").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(1, 2, 3, 4, 0)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
- setErrorMessage(null).setResponses(asList(
- new
ReassignableTopicResponse().setName("foo").setPartitions(asList(
- new ReassignablePartitionResponse().setPartitionIndex(0).
- setErrorMessage(null),
- new ReassignablePartitionResponse().setPartitionIndex(1).
- setErrorMessage(null),
- new ReassignablePartitionResponse().setPartitionIndex(2).
- setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
- setErrorMessage("The manual partition assignment includes
broker 5, " +
- "but no such broker is registered."),
- new ReassignablePartitionResponse().setPartitionIndex(3).
- setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
- setErrorMessage("The manual partition assignment includes
an empty " +
- "replica list."))),
- new
ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
- new ReassignablePartitionResponse().setPartitionIndex(0).
- setErrorMessage(null))))),
+ setErrorMessage(null).
+ setResponses(asList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(asList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null),
+ new
ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null),
+ new
ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
+ setErrorMessage("The manual partition assignment
includes broker 5, but no such broker is registered."),
+ new
ReassignablePartitionResponse().setPartitionIndex(3).setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
+ setErrorMessage("The manual partition assignment
includes an empty replica list."))),
+ new
ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
alterResult.response());
ctx.replay(alterResult.records());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[]
{1, 2, 4}).setIsr(new int[] {1, 2, 4}).
diff --git
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 543e67d03e2..f426c355ceb 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -493,11 +493,11 @@ public class AssignmentsManagerTest {
setPartitions(Collections.singletonList(
new
AssignReplicasToDirsRequestData.PartitionData().
setPartitionIndex(2))),
- new AssignReplicasToDirsRequestData.TopicData().
- setTopicId(TOPIC_2).
- setPartitions(Collections.singletonList(
- new AssignReplicasToDirsRequestData.PartitionData().
- setPartitionIndex(5))))),
+ new AssignReplicasToDirsRequestData.TopicData().
+ setTopicId(TOPIC_2).
+ setPartitions(Collections.singletonList(
+ new
AssignReplicasToDirsRequestData.PartitionData().
+ setPartitionIndex(5))))),
new AssignReplicasToDirsRequestData.DirectoryData().
setId(DIR_3).
setTopics(Collections.singletonList(
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index faf0ece6067..4ff976bdf78 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -177,8 +177,8 @@ public class LogSegment implements Closeable {
public void sanityCheck(boolean timeIndexFileNewlyCreated) throws
IOException {
if (offsetIndexFile().exists()) {
// Resize the time index file to 0 if it is newly created.
- if (timeIndexFileNewlyCreated)
- timeIndex().resize(0);
+ if (timeIndexFileNewlyCreated)
+ timeIndex().resize(0);
// Sanity checks for time index and offset index are skipped
because
// we will recover the segments above the recovery point in
recoverLog()
// in any case so sanity checking them here is redundant.
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
index 6f6d9ed37d0..cc9c8f17b8d 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
@@ -132,12 +132,12 @@ public class LogValidatorTest {
@ParameterizedTest
@CsvSource({
- "0,gzip,none", "1,gzip,none", "2,gzip,none",
- "0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
- "0,snappy,gzip", "1,snappy,gzip", "2,snappy,gzip",
- "0,lz4,gzip", "1,lz4,gzip", "2,lz4,gzip",
- "2,none,none", "2,none,gzip",
- "2,zstd,gzip",
+ "0,gzip,none", "1,gzip,none", "2,gzip,none",
+ "0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
+ "0,snappy,gzip", "1,snappy,gzip", "2,snappy,gzip",
+ "0,lz4,gzip", "1,lz4,gzip", "2,lz4,gzip",
+ "2,none,none", "2,none,gzip",
+ "2,zstd,gzip",
})
public void checkOnlyOneBatch(Byte magic, String sourceCompression,
String targetCompression) {
@@ -712,9 +712,9 @@ public class LogValidatorTest {
@ParameterizedTest
@CsvSource({
- "0,gzip,gzip", "1,gzip,gzip",
- "0,lz4,lz4", "1,lz4,lz4",
- "0,snappy,snappy", "1,snappy,snappy",
+ "0,gzip,gzip", "1,gzip,gzip",
+ "0,lz4,lz4", "1,lz4,lz4",
+ "0,snappy,snappy", "1,snappy,snappy",
})
public void checkInvalidChecksum(byte magic, String compressionName,
String typeName) {
Compression compression = Compression.of(compressionName).build();
@@ -813,10 +813,10 @@ public class LogValidatorTest {
@ParameterizedTest
@CsvSource({
- "0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
- "0,lz4,lz4", "1,lz4,lz4", "2,lz4,lz4",
- "0,snappy,snappy", "1,snappy,snappy", "2,snappy,snappy",
- "2,zstd,zstd"
+ "0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
+ "0,lz4,lz4", "1,lz4,lz4", "2,lz4,lz4",
+ "0,snappy,snappy", "1,snappy,snappy", "2,snappy,snappy",
+ "2,zstd,zstd"
})
public void checkNoKeyCompactedTopic(byte magic, String compressionName,
String typeName) {
Compression codec = Compression.of(compressionName).build();
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index efa550222f8..6de15ef91a4 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -134,12 +134,12 @@ public class PageViewTypedDemo {
@SuppressWarnings("DefaultAnnotationParam") // being explicit for the
example
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "_t")
@JsonSubTypes({
- @JsonSubTypes.Type(value = PageView.class, name = "pv"),
- @JsonSubTypes.Type(value = UserProfile.class, name =
"up"),
- @JsonSubTypes.Type(value = PageViewByRegion.class, name
= "pvbr"),
- @JsonSubTypes.Type(value =
WindowedPageViewByRegion.class, name = "wpvbr"),
- @JsonSubTypes.Type(value = RegionCount.class, name =
"rc")
- })
+ @JsonSubTypes.Type(value = PageView.class, name = "pv"),
+ @JsonSubTypes.Type(value = UserProfile.class, name = "up"),
+ @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
+ @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name =
"wpvbr"),
+ @JsonSubTypes.Type(value = RegionCount.class, name = "rc")
+ })
public interface JSONSerdeCompatible {
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index df4d981fa71..ce05eabb566 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -97,9 +97,10 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
@ParameterizedTest
@ValueSource(strings = {
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY})
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+ })
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final String
rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a
probing rebalance, and the minimum
// value is one minute
@@ -108,9 +109,10 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
@ParameterizedTest
@ValueSource(strings = {
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY})
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+ })
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final String
rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a
probing rebalance, and the minimum
// value is one minute
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 6b67af943fc..ca3936633fc 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -982,10 +982,11 @@ public class QueryableStateIntegrationTest {
streamOne,
batch1,
TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()
+ ),
mockTime);
final KStream<String, String> s1 = builder.stream(streamOne);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java
index 08a163de59e..e6b212e7d11 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java
@@ -36,13 +36,13 @@ import org.junit.platform.suite.api.Suite;
*/
@Suite
@SelectClasses({
- CompositeReadOnlyKeyValueStoreTest.class,
- CompositeReadOnlyWindowStoreTest.class,
- CompositeReadOnlySessionStoreTest.class,
- GlobalStateStoreProviderTest.class,
- StreamThreadStateStoreProviderTest.class,
- WrappingStoreProviderTest.class,
- QueryableStateIntegrationTest.class,
- })
+ CompositeReadOnlyKeyValueStoreTest.class,
+ CompositeReadOnlyWindowStoreTest.class,
+ CompositeReadOnlySessionStoreTest.class,
+ GlobalStateStoreProviderTest.class,
+ StreamThreadStateStoreProviderTest.class,
+ WrappingStoreProviderTest.class,
+ QueryableStateIntegrationTest.class,
+})
public class StoreQuerySuite {
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
index ab23663bbc7..3e2c97f6e24 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
public final class StreamStreamJoinUtil {
- private StreamStreamJoinUtil(){
+ private StreamStreamJoinUtil() {
}
public static <KIn, VIn, KOut, VOut> boolean skipRecord(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
index 172321fab11..06a994a5aa8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
@@ -31,13 +31,13 @@ import org.junit.platform.suite.api.Suite;
*/
@Suite
@SelectClasses({
- StreamTaskTest.class,
- StandbyTaskTest.class,
- GlobalStateTaskTest.class,
- TaskManagerTest.class,
- TaskMetricsTest.class,
- LegacyStickyTaskAssignorTest.class,
- StreamsPartitionAssignorTest.class,
+ StreamTaskTest.class,
+ StandbyTaskTest.class,
+ GlobalStateTaskTest.class,
+ TaskManagerTest.class,
+ TaskMetricsTest.class,
+ LegacyStickyTaskAssignorTest.class,
+ StreamsPartitionAssignorTest.class,
})
public class TaskSuite {
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 2c98c3427d0..f5dd717f5b5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -235,7 +235,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
}
public static class RocksDBConfigSetterWithUserProvidedStatistics
implements RocksDBConfigSetter {
- public RocksDBConfigSetterWithUserProvidedStatistics(){}
+ public RocksDBConfigSetterWithUserProvidedStatistics() {}
public void setConfig(final String storeName, final Options options,
final Map<String, Object> configs) {
lastStatistics = new Statistics();
@@ -306,7 +306,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
public static class
RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements
RocksDBConfigSetter {
- public
RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){}
+ public
RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig() {}
public void setConfig(final String storeName, final Options options,
final Map<String, Object> configs) {
options.setTableFormatConfig(new BlockBasedTableConfig());
@@ -335,7 +335,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
}
public static class
RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig implements
RocksDBConfigSetter {
- public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig(){}
+ public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig()
{}
public void setConfig(final String storeName, final Options options,
final Map<String, Object> configs) {
options.setTableFormatConfig(new PlainTableConfig());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 89a229f7ddc..96d19dbb47e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -133,6 +133,7 @@ public class TestUtils {
private WrapperRecorder recorder;
+ @SuppressWarnings("unchecked")
@Override
public void configure(final Map<String, ?> configs) {
if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
diff --git
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index bc20aa14273..af7a39a92be 100644
---
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -116,7 +116,7 @@ public class ClusterTestExtensionsTest {
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(key = "default.key", value =
"overwrite.value")
}, tags = {
- "default.display.key1", "default.display.key2"
+ "default.display.key1", "default.display.key2"
}),
@ClusterTest(types = {Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@@ -126,7 +126,7 @@ public class ClusterTestExtensionsTest {
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(key = "default.key", value =
"overwrite.value")
}, tags = {
- "default.display.key1", "default.display.key2"
+ "default.display.key1", "default.display.key2"
})
})
public void testClusterTests() throws ExecutionException,
InterruptedException {
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index a2ec83be81e..1f1914df268 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -148,7 +148,7 @@ public class ToolsUtils {
* @param <T> Element type.
*/
@SuppressWarnings("unchecked")
- public static <T> Set<T> minus(Set<T> set, T...toRemove) {
+ public static <T> Set<T> minus(Set<T> set, T... toRemove) {
Set<T> res = new HashSet<>(set);
for (T t : toRemove)
res.remove(t);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index f83024837ba..74919740170 100644
---
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(serverProperties = {
- @ClusterConfigProperty(key =
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"),
})
public class BrokerApiVersionsCommandTest {
@ClusterTest
diff --git
a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index fa3803a6eaf..484f09ec5ef 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -63,10 +63,10 @@ class MetadataQuorumCommandTest {
assertTrue(header.matches("NodeId\\s+DirectoryId\\s+LogEndOffset\\s+Lag\\s+LastFetchTimestamp\\s+LastCaughtUpTimestamp\\s+Status\\s+"));
- if (cluster.type() == Type.CO_KRAFT)
- assertEquals(Math.max(cluster.config().numControllers(),
cluster.config().numBrokers()), data.size());
- else
- assertEquals(cluster.config().numBrokers() +
cluster.config().numControllers(), data.size());
+ if (cluster.type() == Type.CO_KRAFT)
+ assertEquals(Math.max(cluster.config().numControllers(),
cluster.config().numBrokers()), data.size());
+ else
+ assertEquals(cluster.config().numBrokers() +
cluster.config().numControllers(), data.size());
Pattern leaderPattern =
Pattern.compile("\\d+\\s+\\S+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+-?\\d+\\s+Leader\\s*");
assertTrue(leaderPattern.matcher(data.get(0)).find());
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index 389b1584c99..ae174d63c6a 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -101,16 +101,16 @@ import static
org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = {
- // shorter backoff to reduce test durations when no active partitions
are eligible for fetching due to throttling
- @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value =
"100"),
- // Don't move partition leaders automatically.
- @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG,
value = "false"),
- @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value =
"1000"),
- @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
- @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"),
- @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"),
- @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"),
- @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"),
+ // shorter backoff to reduce test durations when no active partitions are
eligible for fetching due to throttling
+ @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value =
"100"),
+ // Don't move partition leaders automatically.
+ @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value =
"false"),
+ @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value =
"1000"),
+ @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
+ @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"),
+ @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"),
+ @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"),
+ @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"),
})
@ExtendWith(ClusterTestExtensions.class)
public class ReassignPartitionsCommandTest {
@@ -133,7 +133,7 @@ public class ReassignPartitionsCommandTest {
}
@ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion
= IBP_3_3_IV0)
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion =
IBP_3_3_IV0)
})
public void testReassignmentWithAlterPartitionDisabled() throws Exception {
// Test reassignment when the IBP is on an older version which does
not use
@@ -145,11 +145,11 @@ public class ReassignPartitionsCommandTest {
}
@ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties
= {
- @ClusterConfigProperty(id = 1, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
- @ClusterConfigProperty(id = 2, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
- @ClusterConfigProperty(id = 3, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
- })
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
+ @ClusterConfigProperty(id = 1, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+ @ClusterConfigProperty(id = 2, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+ @ClusterConfigProperty(id = 3, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+ })
})
public void testReassignmentCompletionDuringPartialUpgrade() throws
Exception {
// Test reassignment during a partial upgrade when some brokers are
relying on
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
index b6d2801bdff..eb3de147228 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
@@ -41,8 +41,8 @@ public final class Kibosh {
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class,
name = "unreadable"),
- })
+ @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name
= "unreadable"),
+ })
public abstract static class KiboshFaultSpec {
@Override
public final boolean equals(Object o) {
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 6bb9e7f4f1f..1f7fa30e6c4 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -30,11 +30,11 @@ import com.fasterxml.jackson.databind.node.NullNode;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "state")
@JsonSubTypes({
- @JsonSubTypes.Type(value = TaskPending.class, name =
TaskStateType.Constants.PENDING_VALUE),
- @JsonSubTypes.Type(value = TaskRunning.class, name =
TaskStateType.Constants.RUNNING_VALUE),
- @JsonSubTypes.Type(value = TaskStopping.class, name =
TaskStateType.Constants.STOPPING_VALUE),
- @JsonSubTypes.Type(value = TaskDone.class, name =
TaskStateType.Constants.DONE_VALUE)
- })
+ @JsonSubTypes.Type(value = TaskPending.class, name =
TaskStateType.Constants.PENDING_VALUE),
+ @JsonSubTypes.Type(value = TaskRunning.class, name =
TaskStateType.Constants.RUNNING_VALUE),
+ @JsonSubTypes.Type(value = TaskStopping.class, name =
TaskStateType.Constants.STOPPING_VALUE),
+ @JsonSubTypes.Type(value = TaskDone.class, name =
TaskStateType.Constants.DONE_VALUE)
+})
public abstract class TaskState extends Message {
private final TaskSpec spec;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
index 39a2f8b56b6..821c3a486c4 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name =
"timestamp"),
+ @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name =
"timestamp"),
})
public interface RecordProcessor {
void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords);