This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 832627fc784 MINOR: Various cleanups in metadata (#14734)
832627fc784 is described below
commit 832627fc78484fdc7c8d6da8a2d20e7691dbf882
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Nov 14 09:25:09 2023 +0100
MINOR: Various cleanups in metadata (#14734)
- Remove unused code, suppression
- Simplify/fix test assertions
- Javadoc cleanups
Reviewers: Josep Prat <[email protected]>
---
.../metadata/BrokerRegistrationFencingChange.java | 2 +-
...okerRegistrationInControlledShutdownChange.java | 2 +-
.../apache/kafka/metadata/DelegationTokenData.java | 4 +-
.../apache/kafka/metadata/LeaderRecoveryState.java | 2 +-
.../kafka/metadata/PartitionRegistration.java | 4 +-
.../metadata/authorizer/StandardAuthorizer.java | 2 +-
.../metadata/placement/StripedReplicaPlacer.java | 82 ++++++++++------------
.../kafka/metadata/util/BatchFileReader.java | 5 +-
.../apache/kafka/metadata/util/RecordRedactor.java | 2 +-
.../controller/ClusterControlManagerTest.java | 8 +--
.../controller/ProducerIdControlManagerTest.java | 2 -
.../QuorumControllerMetricsIntegrationTest.java | 6 +-
.../kafka/controller/QuorumControllerTest.java | 14 ++--
.../kafka/controller/QuorumControllerTestEnv.java | 2 -
.../controller/ReplicationControlManagerTest.java | 33 +++++----
.../java/org/apache/kafka/image/AclsDeltaTest.java | 2 +-
.../org/apache/kafka/image/ClusterImageTest.java | 3 -
.../kafka/image/DelegationTokenImageTest.java | 2 +-
.../org/apache/kafka/image/FakeSnapshotWriter.java | 4 +-
.../org/apache/kafka/image/ImageDowngradeTest.java | 2 +-
.../org/apache/kafka/image/MetadataImageTest.java | 4 --
.../kafka/image/MetadataVersionChangeTest.java | 9 +--
.../loader/metrics/MetadataLoaderMetricsTest.java | 4 --
.../kafka/image/publisher/SnapshotEmitterTest.java | 2 +-
.../org/apache/kafka/metadata/BrokerStateTest.java | 3 -
.../apache/kafka/metadata/ListenerInfoTest.java | 16 ++---
.../kafka/metadata/PartitionRegistrationTest.java | 16 +++--
.../org/apache/kafka/metadata/RecordTestUtils.java | 1 -
.../apache/kafka/metadata/VersionRangeTest.java | 1 -
.../metadata/bootstrap/BootstrapDirectoryTest.java | 4 +-
.../metadata/bootstrap/BootstrapMetadataTest.java | 10 +--
.../CapturingDelegationTokenMigrationClient.java | 2 +-
.../metadata/util/BatchFileWriterReaderTest.java | 2 +-
.../org/apache/kafka/metalog/LocalLogManager.java | 13 ++--
.../apache/kafka/metalog/LocalLogManagerTest.java | 2 +-
.../kafka/metalog/MockMetaLogManagerListener.java | 26 ++-----
36 files changed, 128 insertions(+), 170 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
index ab0bfc6a81f..ee007dd5378 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
@@ -35,7 +35,7 @@ public enum BrokerRegistrationFencingChange {
private final static Map<Byte, BrokerRegistrationFencingChange>
VALUE_TO_ENUM =
Arrays.stream(BrokerRegistrationFencingChange.values()).
- collect(Collectors.toMap(v -> Byte.valueOf(v.value()),
Function.identity()));
+ collect(Collectors.toMap(v -> v.value(), Function.identity()));
public static Optional<BrokerRegistrationFencingChange> fromValue(byte
value) {
return Optional.ofNullable(VALUE_TO_ENUM.get(value));
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
index 39f8abf595e..3d49b495da5 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
@@ -36,7 +36,7 @@ public enum BrokerRegistrationInControlledShutdownChange {
private final static Map<Byte,
BrokerRegistrationInControlledShutdownChange> VALUE_TO_ENUM =
Arrays.stream(BrokerRegistrationInControlledShutdownChange.values()).
- collect(Collectors.toMap(v -> Byte.valueOf(v.value()),
Function.identity()));
+ collect(Collectors.toMap(v -> v.value(), Function.identity()));
public static Optional<BrokerRegistrationInControlledShutdownChange>
fromValue(byte value) {
return Optional.ofNullable(VALUE_TO_ENUM.get(value));
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
b/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
index 269cdd2458c..df853169e19 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
@@ -33,7 +33,7 @@ import java.util.Objects;
*/
public final class DelegationTokenData {
- private TokenInformation tokenInformation;
+ private final TokenInformation tokenInformation;
public static DelegationTokenData fromRecord(DelegationTokenRecord record)
{
List<KafkaPrincipal> renewers = new ArrayList<>();
@@ -62,7 +62,7 @@ public final class DelegationTokenData {
return new DelegationTokenRecord()
.setOwner(tokenInformation.ownerAsString())
.setRequester(tokenInformation.tokenRequesterAsString())
- .setRenewers(new
ArrayList<String>(tokenInformation.renewersAsString()))
+ .setRenewers(new ArrayList<>(tokenInformation.renewersAsString()))
.setIssueTimestamp(tokenInformation.issueTimestamp())
.setMaxTimestamp(tokenInformation.maxTimestamp())
.setExpirationTimestamp(tokenInformation.expiryTimestamp())
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
b/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
index 08086751b70..432226e0d41 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
@@ -56,7 +56,7 @@ public enum LeaderRecoveryState {
private final byte value;
- private LeaderRecoveryState(byte value) {
+ LeaderRecoveryState(byte value) {
this.value = value;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 635bff2f501..f5a8b2133bc 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -358,8 +358,8 @@ public class PartitionRegistration {
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
} else {
- for (int i = 0; i < directories.length; i++) {
- if (!DirectoryId.UNASSIGNED.equals(directories[i])) {
+ for (Uuid directory : directories) {
+ if (!DirectoryId.UNASSIGNED.equals(directory)) {
options.handleLoss("the directory assignment state of one
or more replicas");
break;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 47f066c34b4..7e9f779093e 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -122,7 +122,7 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
for (Endpoint endpoint : serverInfo.endpoints()) {
if (serverInfo.earlyStartListeners().contains(
- endpoint.listenerName().orElseGet(() -> ""))) {
+ endpoint.listenerName().orElse(""))) {
result.put(endpoint, CompletableFuture.completedFuture(null));
} else {
result.put(endpoint, initialLoadFuture);
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index a3ac776cded..f13e8ce9897 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -33,89 +33,86 @@ import org.apache.kafka.metadata.OptionalStringComparator;
/**
* The striped replica placer.
- *
- *
- * GOALS
- * The design of this placer attempts to satisfy a few competing goals.
Firstly, we want
- * to spread the replicas as evenly as we can across racks. In the simple
case where
- * broker racks have not been configured, this goal is a no-op, of course.
But it is the
+ * <p>
+ * <h3>Goals</h3>
+ * <p>The design of this placer attempts to satisfy a few competing goals.
Firstly, we want
+ * to spread the replicas as evenly as we can across racks. In the simple case
where
+ * broker racks have not been configured, this goal is a no-op, of course. But
it is the
* highest priority goal in multi-rack clusters.
*
- * Our second goal is to spread the replicas evenly across brokers. Since we
are placing
+ * <p>Our second goal is to spread the replicas evenly across brokers. Since
we are placing
* multiple partitions, we try to avoid putting each partition on the same set
of
- * replicas, even if it does satisfy the rack placement goal. If any specific
broker is
+ * replicas, even if it does satisfy the rack placement goal. If any specific
broker is
* fenced, we would like the new leaders to distributed evenly across the
remaining
* brokers.
*
- * However, we treat the rack placement goal as higher priority than this
goal-- if you
+ * <p>However, we treat the rack placement goal as higher priority than this
goal-- if you
* configure 10 brokers in rack A and B, and 1 broker in rack C, you will end
up with a
* lot of partitions on that one broker in rack C. If you were to place a lot
of
* partitions with replication factor 3, each partition would try to get a
replica there.
* In general racks are supposed to be about the same size -- if they aren't,
this is a
* user error.
*
- * Finally, we would prefer to place replicas on unfenced brokers, rather than
on fenced
+ * <p>Finally, we would prefer to place replicas on unfenced brokers, rather
than on fenced
* brokers.
- *
- *
- * CONSTRAINTS
- * In addition to these goals, we have two constraints. Unlike the goals,
these are not
- * optional -- they are mandatory. Placement will fail if a constraint cannot
be
- * satisfied. The first constraint is that we can't place more than one
replica on the
- * same broker. This imposes an upper limit on replication factor-- for
example, a 3-node
- * cluster can't have any topics with replication factor 4. This constraint
comes from
+ * <p>
+ * <h3>Constraints</h3>
+ * In addition to these goals, we have two constraints. Unlike the goals,
these are not
+ * optional -- they are mandatory. Placement will fail if a constraint cannot
be
+ * satisfied. The first constraint is that we can't place more than one
replica on the
+ * same broker. This imposes an upper limit on replication factor-- for
example, a 3-node
+ * cluster can't have any topics with replication factor 4. This constraint
comes from
* Kafka's internal design.
*
- * The second constraint is that the leader of each partition must be an
unfenced broker.
- * This constraint is a bit arbitrary. In theory, we could allow people to
create
- * new topics even if every broker were fenced. However, this would be
confusing for
+ * <p>The second constraint is that the leader of each partition must be an
unfenced broker.
+ * This constraint is a bit arbitrary. In theory, we could allow people to
create
+ * new topics even if every broker were fenced. However, this would be
confusing for
* users.
- *
- *
- * ALGORITHM
- * The StripedReplicaPlacer constructor loads the broker data into rack
objects. Each
+ * <p>
+ * <h3>Algorithm</h3>
+ * <p>The StripedReplicaPlacer constructor loads the broker data into rack
objects. Each
* rack object contains a sorted list of fenced brokers, and a separate sorted
list of
- * unfenced brokers. The racks themselves are organized into a sorted list,
stored inside
+ * unfenced brokers. The racks themselves are organized into a sorted list,
stored inside
* the top-level RackList object.
*
- * The general idea is that we place replicas on to racks in a round-robin
fashion. So if
+ * <p>The general idea is that we place replicas on to racks in a round-robin
fashion. So if
* we had racks A, B, C, and D, and we were creating a new partition with
replication
* factor 3, our first replica might come from A, our second from B, and our
third from C.
* Of course our placement would not be very fair if we always started with
rack A.
- * Therefore, we generate a random starting offset when the RackList is
created. So one
- * time we might go B, C, D. Another time we might go C, D, A. And so forth.
+ * Therefore, we generate a random starting offset when the RackList is
created. So one
+ * time we might go B, C, D. Another time we might go C, D, A. And so forth.
*
- * Note that each partition we generate advances the starting offset by one.
+ * <p>Note that each partition we generate advances the starting offset by one.
* So in our 4-rack cluster, with 3 partitions, we might choose these racks:
- *
+ * <pre>
* partition 1: A, B, C
* partition 2: B, C, A
* partition 3: C, A, B
- *
+ * </pre>
* This is what generates the characteristic "striped" pattern of this placer.
*
- * So far I haven't said anything about how we choose a replica from within a
rack. In
- * fact, this is also done in a round-robin fashion. So if rack A had replica
A0, A1, A2,
+ * <p>So far I haven't said anything about how we choose a replica from within
a rack. In
+ * fact, this is also done in a round-robin fashion. So if rack A had replica
A0, A1, A2,
* and A3, we might return A0 the first time, A1, the second, A2 the third,
and so on.
* Just like with the racks, we add a random starting offset to mix things up
a bit.
*
- * So let's say you had a cluster with racks A, B, and C, and each rack had 3
replicas,
+ * <p>So let's say you had a cluster with racks A, B, and C, and each rack had
3 replicas,
* for 9 nodes in total.
* If all the offsets were 0, you'd get placements like this:
- *
+ * <pre>
* partition 1: A0, B0, C0
* partition 2: B1, C1, A1
* partition 3: C2, A2, B2
- *
- * One additional complication with choosing a replica within a rack is that
we want to
- * choose the unfenced replicas first. In a big cluster with lots of nodes
available,
- * we'd prefer not to place a new partition on a node that is fenced.
Therefore, we
+ * </pre>
+ * <p>One additional complication with choosing a replica within a rack is
that we want to
+ * choose the unfenced replicas first. In a big cluster with lots of nodes
available,
+ * we'd prefer not to place a new partition on a node that is fenced.
Therefore, we
* actually maintain two lists, rather than the single list I described above.
* We only start using the fenced node list when the unfenced node list is
totally
* exhausted.
*
- * Furthermore, we cannot place the first replica (the leader) of a new
partition on a
- * fenced replica. Therefore, we have some special logic to ensure that this
doesn't
+ * <p>Furthermore, we cannot place the first replica (the leader) of a new
partition on a
+ * fenced replica. Therefore, we have some special logic to ensure that this
doesn't
* happen.
*/
public class StripedReplicaPlacer implements ReplicaPlacer {
@@ -272,7 +269,6 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
/**
* The names of all the racks in the cluster.
- *
* Racks which have at least one unfenced broker come first (in sorted
order),
* followed by racks which have only fenced brokers (also in sorted
order).
*/
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index e82d37cf78d..174fb416add 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -113,8 +113,7 @@ public final class BatchFileReader implements
Iterator<BatchFileReader.BatchAndT
private BatchAndType nextControlBatch(FileChannelRecordBatch input) {
List<ApiMessageAndVersion> messages = new ArrayList<>();
- for (Iterator<Record> iter = input.iterator(); iter.hasNext(); ) {
- Record record = iter.next();
+ for (Record record : input) {
try {
short typeId = ControlRecordType.parseTypeId(record.key());
ControlRecordType type = ControlRecordType.fromTypeId(typeId);
@@ -179,6 +178,6 @@ public final class BatchFileReader implements
Iterator<BatchFileReader.BatchAndT
} catch (Exception e) {
log.error("Error closing fileRecords", e);
}
- this.batchIterator =
Collections.<FileChannelRecordBatch>emptyList().iterator();
+ this.batchIterator = Collections.emptyIterator();
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
index 51c160cfa78..4373fd1de6d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
@@ -49,7 +49,7 @@ public final class RecordRedactor {
case USER_SCRAM_CREDENTIAL_RECORD: {
UserScramCredentialRecord record = (UserScramCredentialRecord)
message;
return "UserScramCredentialRecord("
- + "name=" + ((record.name() == null) ? "null" : "'" +
record.name().toString() + "'")
+ + "name=" + ((record.name() == null) ? "null" : "'" +
record.name() + "'")
+ ", mechanism=" + record.mechanism()
+ ", salt=(redacted)"
+ ", storedKey=(redacted)"
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 69a9d67e3d8..6a66bd3c6db 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -240,7 +240,7 @@ public class ClusterControlManagerTest {
}
@Test
- public void testRegistrationWithIncorrectClusterId() throws Exception {
+ public void testRegistrationWithIncorrectClusterId() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
FeatureControlManager featureControl = new
FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
@@ -317,7 +317,7 @@ public class ClusterControlManagerTest {
}
@Test
- public void testUnregister() throws Exception {
+ public void testUnregister() {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerId(1).
setBrokerEpoch(100).
@@ -365,7 +365,7 @@ public class ClusterControlManagerTest {
@ParameterizedTest
@ValueSource(ints = {3, 10})
- public void testPlaceReplicas(int numUsableBrokers) throws Exception {
+ public void testPlaceReplicas(int numUsableBrokers) {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
FeatureControlManager featureControl = new
FeatureControlManager.Builder().
@@ -418,7 +418,7 @@ public class ClusterControlManagerTest {
@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2",
"IBP_3_3_IV3"})
- public void testRegistrationsToRecords(MetadataVersion metadataVersion)
throws Exception {
+ public void testRegistrationsToRecords(MetadataVersion metadataVersion) {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
FeatureControlManager featureControl = new
FeatureControlManager.Builder().
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 900e8d2c7af..1d3ed8bf00e 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -122,8 +122,6 @@ public class ProducerIdControlManagerTest {
@Test
public void testUnknownBrokerOrEpoch() {
- ControllerResult<ProducerIdsBlock> result;
-
assertThrows(StaleBrokerEpochException.class, () ->
producerIdControlManager.generateNextProducerId(99, 0));
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
index 52049b5626d..f48063cda55 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -30,8 +30,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
@@ -46,13 +44,13 @@ import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.f
import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class QuorumControllerMetricsIntegrationTest {
- private final static Logger log =
LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class);
static class MockControllerMetrics extends QuorumControllerMetrics {
final AtomicBoolean closed = new AtomicBoolean(false);
@@ -179,7 +177,7 @@ public class QuorumControllerMetricsIntegrationTest {
for (QuorumController controller : controlEnv.controllers()) {
// Inactive controllers don't set these metrics.
if (!controller.isActive()) {
- assertEquals(false,
controller.controllerMetrics().active());
+ assertFalse(controller.controllerMetrics().active());
assertEquals(0L,
controller.controllerMetrics().timedOutHeartbeats());
assertEquals(0L,
controller.controllerMetrics().operationsTimedOut());
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 298cf0dece9..ef7162014fe 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1102,8 +1102,8 @@ public class QuorumControllerTest {
}
static class InitialSnapshot implements AutoCloseable {
- File tempDir = null;
- BatchFileWriter writer = null;
+ File tempDir;
+ BatchFileWriter writer;
public InitialSnapshot(List<ApiMessageAndVersion> records) throws
Exception {
tempDir = TestUtils.tempDirectory();
@@ -1292,7 +1292,7 @@ public class QuorumControllerTest {
controllerBuilder.setZkMigrationEnabled(migrationEnabled);
}).
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test")).
- build();
+ build()
) {
QuorumController active = controlEnv.activeController();
ZkMigrationState zkMigrationState = active.appendReadEvent("read
migration state", OptionalLong.empty(),
@@ -1317,7 +1317,7 @@ public class QuorumControllerTest {
controllerBuilder.setZkMigrationEnabled(true);
}).
setBootstrapMetadata(bootstrapMetadata).
- build();
+ build()
) {
QuorumController active = controlEnv.activeController();
assertEquals(active.featureControl().zkMigrationState(),
ZkMigrationState.MIGRATION);
@@ -1453,7 +1453,7 @@ public class QuorumControllerTest {
@Test
public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws
Exception {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).build();
+ LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).build()
) {
QuorumControllerTestEnv.Builder controlEnvBuilder = new
QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
@@ -1571,7 +1571,7 @@ public class QuorumControllerTest {
@Test
public void testFailoverDuringMigrationTransaction() throws Exception {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).build();
+ LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).build()
) {
QuorumControllerTestEnv.Builder controlEnvBuilder = new
QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder ->
controllerBuilder.setZkMigrationEnabled(true)).
@@ -1616,7 +1616,7 @@ public class QuorumControllerTest {
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0",
"IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
public void testBrokerHeartbeatDuringMigration(MetadataVersion
metadataVersion) throws Exception {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).build();
+ LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).build()
) {
QuorumControllerTestEnv.Builder controlEnvBuilder = new
QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder ->
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 4750fb61fae..cc7ed106774 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,7 +17,6 @@
package org.apache.kafka.controller;
-import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
@@ -98,7 +97,6 @@ public class QuorumControllerTestEnv implements AutoCloseable
{
int numControllers = logEnv.logManagers().size();
this.controllers = new ArrayList<>(numControllers);
try {
- ApiVersions apiVersions = new ApiVersions();
List<Integer> nodeIds = IntStream.range(0,
numControllers).boxed().collect(Collectors.toList());
for (int nodeId = 0; nodeId < numControllers; nodeId++) {
QuorumController.Builder builder = new
QuorumController.Builder(nodeId, logEnv.clusterId());
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d9f848d8e4d..8c48024a764 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -921,8 +921,8 @@ public class ReplicationControlManagerTest {
shrinkIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl,
topicIdPartition, shrinkIsrResponse);
PartitionRegistration partition =
replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
- assertTrue(Arrays.equals(new int[]{1, 2}, partition.elr),
partition.toString());
- assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr),
partition.toString());
+ assertArrayEquals(new int[]{1, 2}, partition.elr,
partition.toString());
+ assertArrayEquals(new int[]{}, partition.lastKnownElr,
partition.toString());
PartitionData expandIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1),
LeaderRecoveryState.RECOVERED);
@@ -932,8 +932,8 @@ public class ReplicationControlManagerTest {
expandIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl,
topicIdPartition, expandIsrResponse);
partition =
replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
- assertTrue(Arrays.equals(new int[]{}, partition.elr),
partition.toString());
- assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr),
partition.toString());
+ assertArrayEquals(new int[]{}, partition.elr, partition.toString());
+ assertArrayEquals(new int[]{}, partition.lastKnownElr,
partition.toString());
}
@Test
@@ -952,19 +952,19 @@ public class ReplicationControlManagerTest {
ctx.fenceBrokers(Utils.mkSet(2, 3));
PartitionRegistration partition =
replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
- assertTrue(Arrays.equals(new int[]{3}, partition.elr),
partition.toString());
- assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr),
partition.toString());
+ assertArrayEquals(new int[]{3}, partition.elr, partition.toString());
+ assertArrayEquals(new int[]{}, partition.lastKnownElr,
partition.toString());
ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
partition =
replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
- assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr),
partition.toString());
- assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr),
partition.toString());
+ assertArrayEquals(new int[]{1, 3}, partition.elr,
partition.toString());
+ assertArrayEquals(new int[]{}, partition.lastKnownElr,
partition.toString());
ctx.unfenceBrokers(0, 1, 2, 3);
partition =
replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
- assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr),
partition.toString());
- assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr),
partition.toString());
+ assertArrayEquals(new int[]{1, 3}, partition.elr,
partition.toString());
+ assertArrayEquals(new int[]{}, partition.lastKnownElr,
partition.toString());
}
@Test
@@ -1000,16 +1000,16 @@ public class ReplicationControlManagerTest {
ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
PartitionRegistration partition =
replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
- assertTrue(Arrays.equals(new int[]{2, 3}, partition.elr),
partition.toString());
- assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr),
partition.toString());
+ assertArrayEquals(new int[]{2, 3}, partition.elr,
partition.toString());
+ assertArrayEquals(new int[]{}, partition.lastKnownElr,
partition.toString());
ctx.unfenceBrokers(2);
ctx.fenceBrokers(Utils.mkSet(0, 1));
partition =
replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
- assertTrue(Arrays.equals(new int[]{0, 3}, partition.elr),
partition.toString());
- assertTrue(Arrays.equals(new int[]{2}, partition.isr),
partition.toString());
+ assertArrayEquals(new int[]{0, 3}, partition.elr,
partition.toString());
+ assertArrayEquals(new int[]{2}, partition.isr, partition.toString());
assertEquals(2, partition.leader, partition.toString());
- assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr),
partition.toString());
+ assertArrayEquals(new int[]{}, partition.lastKnownElr,
partition.toString());
}
@ParameterizedTest
@@ -1152,7 +1152,7 @@ public class ReplicationControlManagerTest {
long brokerEpoch,
Uuid topicId,
AlterPartitionRequestData.PartitionData partitionData
- ) throws Exception {
+ ) {
AlterPartitionRequestData request = new AlterPartitionRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch);
@@ -1424,7 +1424,6 @@ public class ReplicationControlManagerTest {
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(createTopicsRequestContext,
request, Collections.singleton("foo"));
- CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
CreatableTopicResult createdTopic =
createResult.response().topics().find("foo");
assertEquals(NONE.code(), createdTopic.errorCode());
ctx.replay(createResult.records());
diff --git a/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java
b/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java
index 13d3aceb5c8..08f67c4ff6d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java
@@ -37,7 +37,7 @@ import java.util.Optional;
@Timeout(40)
public class AclsDeltaTest {
- private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+ private final Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
@Test
public void testRemovesDeleteIfNotInImage() {
diff --git
a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index 1bde2c0a42c..309d5cef7e6 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -37,8 +37,6 @@ import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -55,7 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class ClusterImageTest {
- private static final Logger log =
LoggerFactory.getLogger(ClusterImageTest.class);
public final static ClusterImage IMAGE1;
diff --git
a/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java
index 84781eb8832..7d33323b53a 100644
---
a/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java
@@ -56,7 +56,7 @@ public class DelegationTokenImageTest {
tokenId,
SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" +
"fred"),
SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" +
"fred"),
- new ArrayList<KafkaPrincipal>(),
+ new ArrayList<>(),
0,
1000,
expireTimestamp);
diff --git
a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
index d76b1c81f21..f3dabe8d793 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
@@ -28,7 +28,7 @@ import java.util.List;
public class FakeSnapshotWriter implements
SnapshotWriter<ApiMessageAndVersion> {
private final OffsetAndEpoch snapshotId;
- private List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
+ private final List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
private boolean frozen = false;
private boolean closed = false;
@@ -79,7 +79,7 @@ public class FakeSnapshotWriter implements
SnapshotWriter<ApiMessageAndVersion>
@Override
public long freeze() {
frozen = true;
- return batches.size() * 100;
+ return batches.size() * 100L;
}
@Override
diff --git
a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
index d781280e151..213e3c63393 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
@@ -103,7 +103,7 @@ public class ImageDowngradeTest {
* Test downgrading to a MetadataVersion that doesn't support
inControlledShutdown.
*/
@Test
- public void testPreControlledShutdownStateVersion() throws Throwable {
+ public void testPreControlledShutdownStateVersion() {
writeWithExpectedLosses(MetadataVersion.IBP_3_3_IV2,
Arrays.asList(
"the inControlledShutdown state of one or more
brokers"),
diff --git
a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index e6c1ec1cee2..0a0d97dd558 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -118,10 +118,6 @@ public class MetadataImageTest {
.build(), Optional.empty());
}
- private static void testToImage(MetadataImage image, ImageWriterOptions
options) {
- testToImage(image, options, Optional.empty());
- }
-
static void testToImage(MetadataImage image, ImageWriterOptions options,
Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image,
options)));
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
index bef49adc37d..5356f59b82e 100644
---
a/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
@@ -19,8 +19,6 @@ package org.apache.kafka.image;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
@@ -31,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class MetadataVersionChangeTest {
- private static final Logger log =
LoggerFactory.getLogger(MetadataVersionChangeTest.class);
private final static MetadataVersionChange CHANGE_3_0_IV1_TO_3_3_IV0 =
new MetadataVersionChange(IBP_3_0_IV1, IBP_3_3_IV0);
@@ -40,19 +37,19 @@ public class MetadataVersionChangeTest {
new MetadataVersionChange(IBP_3_3_IV0, IBP_3_0_IV1);
@Test
- public void testIsUpgrade() throws Throwable {
+ public void testIsUpgrade() {
assertTrue(CHANGE_3_0_IV1_TO_3_3_IV0.isUpgrade());
assertFalse(CHANGE_3_3_IV0_TO_3_0_IV1.isUpgrade());
}
@Test
- public void testIsDowngrade() throws Throwable {
+ public void testIsDowngrade() {
assertFalse(CHANGE_3_0_IV1_TO_3_3_IV0.isDowngrade());
assertTrue(CHANGE_3_3_IV0_TO_3_0_IV1.isDowngrade());
}
@Test
- public void testMetadataVersionChangeExceptionToString() throws Throwable {
+ public void testMetadataVersionChangeExceptionToString() {
assertEquals("org.apache.kafka.image.MetadataVersionChangeException:
The metadata " +
"version is changing from 3.0-IV1 to 3.3-IV0",
new
MetadataVersionChangeException(CHANGE_3_0_IV1_TO_3_3_IV0).toString());
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
index 5e3c6c6f571..3e518f5f56c 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -45,10 +45,6 @@ public class MetadataLoaderMetricsTest {
new AtomicReference<>(MetadataProvenance.EMPTY);
final MetadataLoaderMetrics metrics;
- FakeMetadataLoaderMetrics() {
- this(Optional.empty());
- }
-
FakeMetadataLoaderMetrics(MetricsRegistry registry) {
this(Optional.of(registry));
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
index 3c73eda3631..088b6c921c3 100644
---
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -135,7 +135,7 @@ public class SnapshotEmitterTest {
}
@Test
- public void testEmit() throws Exception {
+ public void testEmit() {
MockRaftClient mockRaftClient = new MockRaftClient();
MockTime time = new MockTime(0, 10000L, 20000L);
SnapshotEmitter emitter = new SnapshotEmitter.Builder().
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java
index d590f01a5a0..788fe923ce5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java
@@ -21,12 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Timeout(value = 40)
public class BrokerStateTest {
- private static final Logger log =
LoggerFactory.getLogger(BrokerStateTest.class);
@Test
public void testFromValue() {
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
index 257d1984e55..f4f2a843c25 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
@@ -116,14 +116,14 @@ public class ListenerInfoTest {
}
@Test
- public void testToControllerRegistrationRequestFailsOnNullHost() throws
Exception {
+ public void testToControllerRegistrationRequestFailsOnNullHost() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toControllerRegistrationRequest());
}
@Test
- public void testToControllerRegistrationRequestFailsOnZeroPort() throws
Exception {
+ public void testToControllerRegistrationRequestFailsOnZeroPort() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
@@ -141,14 +141,14 @@ public class ListenerInfoTest {
}
@Test
- public void testToControllerRegistrationRecordFailsOnNullHost() throws
Exception {
+ public void testToControllerRegistrationRecordFailsOnNullHost() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toControllerRegistrationRecord());
}
@Test
- public void testToControllerRegistrationRecordFailsOnZeroPort() throws
Exception {
+ public void testToControllerRegistrationRecordFailsOnZeroPort() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
@@ -166,14 +166,14 @@ public class ListenerInfoTest {
}
@Test
- public void testToBrokerRegistrationRequestFailsOnNullHost() throws
Exception {
+ public void testToBrokerRegistrationRequestFailsOnNullHost() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toBrokerRegistrationRequest());
}
@Test
- public void testToBrokerRegistrationRequestFailsOnZeroPort() throws
Exception {
+ public void testToBrokerRegistrationRequestFailsOnZeroPort() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
@@ -191,14 +191,14 @@ public class ListenerInfoTest {
}
@Test
- public void testToBrokerRegistrationRecordFailsOnNullHost() throws
Exception {
+ public void testToBrokerRegistrationRecordFailsOnNullHost() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toBrokerRegistrationRecord());
}
@Test
- public void testToBrokerRegistrationRecordFailsOnZeroPort() throws
Exception {
+ public void testToBrokerRegistrationRecordFailsOnZeroPort() {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index c2934c77508..39b5ab8133a 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -280,8 +280,8 @@ public class PartitionRegistrationTest {
PartitionRecord expectRecord = new PartitionRecord().
setTopicId(topicID).
setPartitionId(0).
- setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
- setIsr(Arrays.asList(new Integer[]{0, 1})).
+ setReplicas(Arrays.asList(0, 1, 2, 3, 4)).
+ setIsr(Arrays.asList(0, 1)).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
setLeaderEpoch(0).
@@ -290,8 +290,8 @@ public class PartitionRegistrationTest {
when(metadataVersion.partitionRecordVersion()).thenReturn(version);
if (version > 0) {
expectRecord.
- setEligibleLeaderReplicas(Arrays.asList(new Integer[]{2, 3})).
- setLastKnownELR(Arrays.asList(new Integer[]{4}));
+ setEligibleLeaderReplicas(Arrays.asList(2, 3)).
+ setLastKnownELR(Arrays.asList(4));
} else {
when(metadataVersion.isElrSupported()).thenReturn(false);
}
@@ -318,6 +318,7 @@ public class PartitionRegistrationTest {
assertEquals(Replicas.toList(Replicas.NONE),
Replicas.toList(partitionRegistration.addingReplicas));
}
+ @Test
public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
PartitionRegistration.Builder builder = new
PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 2, 3, 4}).
@@ -331,8 +332,8 @@ public class PartitionRegistrationTest {
PartitionRecord expectRecord = new PartitionRecord().
setTopicId(topicID).
setPartitionId(0).
- setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
- setIsr(Arrays.asList(new Integer[]{0, 1})).
+ setReplicas(Arrays.asList(0, 1, 2, 3, 4)).
+ setIsr(Arrays.asList(0, 1)).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
setLeaderEpoch(0).
@@ -342,8 +343,9 @@ public class PartitionRegistrationTest {
setMetadataVersion(MetadataVersion.latest()).
setLossHandler(exceptions::add).
build();
- assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2),
partitionRegistration.toRecord(topicID, 0, options));
+ assertEquals(new ApiMessageAndVersion(expectRecord, (short) 1),
partitionRegistration.toRecord(topicID, 0, options));
assertEquals(Replicas.toList(Replicas.NONE),
Replicas.toList(partitionRegistration.addingReplicas));
+ assertTrue(exceptions.isEmpty());
}
@Property
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 2dcccb6dda2..b35f8075645 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -273,7 +273,6 @@ public class RecordTestUtils {
*
* @param o The input object. It will be modified in-place.
*/
- @SuppressWarnings("unchecked")
public static void deepSortRecords(Object o) throws Exception {
if (o == null) {
return;
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
index d31e8f81396..4e8cec759ee 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
@@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class VersionRangeTest {
- @SuppressWarnings("unchecked")
private static VersionRange v(int a, int b) {
assertTrue(a <= Short.MAX_VALUE);
assertTrue(a >= Short.MIN_VALUE);
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
index 73ad2b07d9f..e52edd61fc3 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
@@ -48,7 +48,7 @@ public class BootstrapDirectoryTest {
static class BootstrapTestDirectory implements AutoCloseable {
File directory = null;
- synchronized BootstrapTestDirectory createDirectory() throws Exception
{
+ synchronized BootstrapTestDirectory createDirectory() {
directory = TestUtils.tempDirectory("BootstrapTestDirectory");
return this;
}
@@ -98,7 +98,7 @@ public class BootstrapDirectoryTest {
}
@Test
- public void testMissingDirectory() throws Exception {
+ public void testMissingDirectory() {
assertEquals("No such directory as ./non/existent/directory",
assertThrows(RuntimeException.class, () ->
new BootstrapDirectory("./non/existent/directory",
Optional.empty()).read()).getMessage());
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 4dbb90f7c70..3c75f4cadcb 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -48,7 +48,7 @@ public class BootstrapMetadataTest {
setFeatureLevel((short) 6), (short) 0)));
@Test
- public void testFromVersion() throws Exception {
+ public void testFromVersion() {
assertEquals(new BootstrapMetadata(Collections.singletonList(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(FEATURE_NAME).
@@ -58,20 +58,20 @@ public class BootstrapMetadataTest {
}
@Test
- public void testFromRecordsList() throws Exception {
+ public void testFromRecordsList() {
assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, IBP_3_3_IV2,
"bar"),
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "bar"));
}
@Test
- public void testFromRecordsListWithoutMetadataVersion() throws Exception {
+ public void testFromRecordsListWithoutMetadataVersion() {
assertEquals("No FeatureLevelRecord for metadata.version was found in
the bootstrap " +
"metadata from quux", assertThrows(RuntimeException.class,
() -> BootstrapMetadata.fromRecords(emptyList(),
"quux")).getMessage());
}
@Test
- public void testCopyWithOnlyVersion() throws Exception {
+ public void testCopyWithOnlyVersion() {
assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3),
IBP_3_3_IV2, "baz"),
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1,
"baz").copyWithOnlyVersion());
}
@@ -82,7 +82,7 @@ public class BootstrapMetadataTest {
setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0)));
@Test
- public void testFromRecordsListWithOldMetadataVersion() throws Exception {
+ public void testFromRecordsListWithOldMetadataVersion() {
RuntimeException exception = assertThrows(RuntimeException.class,
() ->
BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux"));
assertEquals("Bootstrap metadata versions before 3.3-IV0 are not
supported. Can't load " +
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
index 67503e77ebb..0ee6168e99a 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
@@ -36,7 +36,7 @@ public class CapturingDelegationTokenMigrationClient
implements DelegationTokenM
@Override
public List<String> getDelegationTokens() {
- return new ArrayList<String>();
+ return new ArrayList<>();
}
@Override
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
index a68a434bf74..d2de01b824d 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
@@ -102,7 +102,7 @@ final public class BatchFileWriterReaderTest {
assertEquals(0, apiMessageAndVersion.version());
SnapshotFooterRecord footerRecord = (SnapshotFooterRecord)
apiMessageAndVersion.message();
- assertEquals(0, headerRecord.version());
+ assertEquals(0, footerRecord.version());
}
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index c33cdf2a1b6..f2e9d22bb59 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -176,6 +176,11 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
*/
private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
+ /**
+ * Maps committed offset to snapshot reader.
+ */
+ private final NavigableMap<Long, RawSnapshotReader> snapshots = new
TreeMap<>();
+
/**
* The current leader.
*/
@@ -192,11 +197,6 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
*/
private long initialMaxReadOffset = Long.MAX_VALUE;
- /**
- * Maps committed offset to snapshot reader.
- */
- private NavigableMap<Long, RawSnapshotReader> snapshots = new
TreeMap<>();
-
public SharedLogData(Optional<RawSnapshotReader> snapshot) {
if (snapshot.isPresent()) {
RawSnapshotReader initialSnapshot = snapshot.get();
@@ -515,7 +515,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
* result is half the records getting appended with leader election
following that.
* This is done to emulate having some of the records not getting
committed.
*/
- private AtomicBoolean resignAfterNonAtomicCommit = new
AtomicBoolean(false);
+ private final AtomicBoolean resignAfterNonAtomicCommit = new
AtomicBoolean(false);
public LocalLogManager(LogContext logContext,
int nodeId,
@@ -827,7 +827,6 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
// the leader epoch has already advanced. resign is a no op.
log.debug("Ignoring call to resign from epoch {}. Either we are
not the leader or the provided epoch is " +
"smaller than the current epoch {}", epoch, currentEpoch);
- return;
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index dfa093478f1..b0745ecc424 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -108,7 +108,7 @@ public class LocalLogManagerTest {
long highestOffset = -1;
for (String event : listener.serializedEvents()) {
if (event.startsWith(LAST_COMMITTED_OFFSET)) {
- long offset = Long.valueOf(
+ long offset = Long.parseLong(
event.substring(LAST_COMMITTED_OFFSET.length() + 1));
if (offset < highestOffset) {
throw new RuntimeException("Invalid offset: " + offset
+
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
index 3d7267d94a5..77183f5dcb6 100644
---
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++
b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -54,13 +54,9 @@ public class MockMetaLogManagerListener implements
RaftClient.Listener<ApiMessag
for (ApiMessageAndVersion messageAndVersion : batch.records())
{
ApiMessage message = messageAndVersion.message();
- StringBuilder bld = new StringBuilder();
- bld.append(COMMIT).append(" ").append(message.toString());
- serializedEvents.add(bld.toString());
+ serializedEvents.add(COMMIT + " " + message.toString());
}
- StringBuilder bld = new StringBuilder();
- bld.append(LAST_COMMITTED_OFFSET).append("
").append(lastCommittedOffset);
- serializedEvents.add(bld.toString());
+ serializedEvents.add(LAST_COMMITTED_OFFSET + " " +
lastCommittedOffset);
}
} finally {
reader.close();
@@ -76,13 +72,9 @@ public class MockMetaLogManagerListener implements
RaftClient.Listener<ApiMessag
for (ApiMessageAndVersion messageAndVersion : batch.records())
{
ApiMessage message = messageAndVersion.message();
- StringBuilder bld = new StringBuilder();
- bld.append(SNAPSHOT).append("
").append(message.toString());
- serializedEvents.add(bld.toString());
+ serializedEvents.add(SNAPSHOT + " " + message.toString());
}
- StringBuilder bld = new StringBuilder();
- bld.append(LAST_COMMITTED_OFFSET).append("
").append(lastCommittedOffset);
- serializedEvents.add(bld.toString());
+ serializedEvents.add(LAST_COMMITTED_OFFSET + " " +
lastCommittedOffset);
}
} finally {
reader.close();
@@ -95,14 +87,10 @@ public class MockMetaLogManagerListener implements
RaftClient.Listener<ApiMessag
this.leaderAndEpoch = newLeaderAndEpoch;
if (newLeaderAndEpoch.isLeader(nodeId)) {
- StringBuilder bld = new StringBuilder();
- bld.append(NEW_LEADER).append(" ").
- append(nodeId).append(" ").append(newLeaderAndEpoch.epoch());
- serializedEvents.add(bld.toString());
+ String bld = NEW_LEADER + " " + nodeId + " " +
newLeaderAndEpoch.epoch();
+ serializedEvents.add(bld);
} else if (oldLeaderAndEpoch.isLeader(nodeId)) {
- StringBuilder bld = new StringBuilder();
- bld.append(RENOUNCE).append(" ").append(newLeaderAndEpoch.epoch());
- serializedEvents.add(bld.toString());
+ serializedEvents.add(RENOUNCE + " " + newLeaderAndEpoch.epoch());
}
}