This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 3b9d2e13f51 KAFKA-18100 `Using` block suppresses all errors (#17970)
3b9d2e13f51 is described below
commit 3b9d2e13f5155ed113712d330dddd9cf29d543cc
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Nov 29 00:16:09 2024 +0800
KAFKA-18100 `Using` block suppresses all errors (#17970)
Reviewers: Chia-Ping Tsai <[email protected]>
---
...onTokenEndToEndAuthorizationWithOwnerTest.scala | 6 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 2 +-
.../scala/integration/kafka/api/SaslSetup.scala | 2 +-
.../kafka/server/RaftClusterSnapshotTest.scala | 4 +-
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 146 +++++++--------------
.../group/CoordinatorLoaderImplTest.scala | 27 ++--
.../kafka/integration/KafkaServerTestHarness.scala | 6 +-
.../unit/kafka/server/ReplicationQuotasTest.scala | 4 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 2 +-
10 files changed, 75 insertions(+), 126 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
index e57e96c10a8..823ffa990b0 100644
---
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
+++
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
@@ -65,7 +65,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest
extends DelegationTokenE
override def configureSecurityAfterServersStart(): Unit = {
// Create the Acls before calling super which will create the additiona
tokens
- Using(createPrivilegedAdminClient()) { superuserAdminClient =>
+ Using.resource(createPrivilegedAdminClient()) { superuserAdminClient =>
superuserAdminClient.createAcls(List(AclTokenOtherDescribe,
AclTokenCreate, AclTokenDescribe).asJava).values
brokers.foreach { s =>
@@ -105,8 +105,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest
extends DelegationTokenE
@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
def testDescribeTokenForOtherUserFails(quorum: String): Unit = {
- Using(createScramAdminClient(kafkaClientSaslMechanism,
describeTokenFailPrincipal.getName, describeTokenFailPassword)) {
describeTokenFailAdminClient =>
- Using(createScramAdminClient(kafkaClientSaslMechanism,
otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
+ Using.resource(createScramAdminClient(kafkaClientSaslMechanism,
describeTokenFailPrincipal.getName, describeTokenFailPassword)) {
describeTokenFailAdminClient =>
+ Using.resource(createScramAdminClient(kafkaClientSaslMechanism,
otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
otherClientAdminClient.createDelegationToken().delegationToken().get()
val tokens = describeTokenFailAdminClient.describeDelegationToken(
new
DescribeDelegationTokenOptions().owners(Collections.singletonList(otherClientPrincipal))
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index ce5606fbd9d..f48bcc19088 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1459,7 +1459,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
Integer.MAX_VALUE.toString)
newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString)
- Using(createConsumer(configOverrides = newConsumerConfig)) { consumer =>
+ Using.resource(createConsumer(configOverrides = newConsumerConfig)) {
consumer =>
consumer.subscribe(Collections.singletonList(testTopicName))
val records = consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
assertNotEquals(0, records.count)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 5a9bd4b9db2..e5ceccd34eb 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -201,7 +201,7 @@ trait SaslSetup {
def createScramCredentials(zkConnect: String, userName: String, password:
String): Unit = {
val zkClientConfig = new ZKClientConfig()
- Using(KafkaZkClient(
+ Using.resource(KafkaZkClient(
zkConnect, JaasUtils.isZkSaslEnabled ||
KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig =
zkClientConfig, enableEntityConfigControllerCheck = false)) { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)
diff --git
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index baef43a768c..88530b8dd00 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -41,7 +41,7 @@ class RaftClusterSnapshotTest {
val numberOfBrokers = 3
val numberOfControllers = 3
- Using(
+ Using.resource(
new KafkaClusterTestKit
.Builder(
new TestKitNodes.Builder()
@@ -74,7 +74,7 @@ class RaftClusterSnapshotTest {
// For every controller and broker perform some sanity checks against
the latest snapshot
for ((_, raftManager) <- cluster.raftManagers().asScala) {
- Using(
+ Using.resource(
RecordsSnapshotReader.of(
raftManager.replicatedLog.latestSnapshot.get(),
new MetadataRecordSerde(),
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index cca56f7aa96..1c65fd5073c 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -124,10 +124,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes())
}
@@ -211,9 +208,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
// Simulate log cleanup that advances the LSO
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1,
LogStartOffsetIncrementReason.SegmentDeletion)
@@ -246,10 +241,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
assertThrows(
classOf[IllegalArgumentException],
@@ -295,10 +287,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId),
"Creating an existing snapshot should not do anything")
@@ -342,10 +331,7 @@ final class KafkaMetadataLogTest {
val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch)
-
- Using(log.createNewSnapshotUnchecked(sameEpochSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, sameEpochSnapshotId)
assertTrue(log.truncateToLatestSnapshot())
assertEquals(sameEpochSnapshotId.offset, log.startOffset)
@@ -356,10 +342,7 @@ final class KafkaMetadataLogTest {
val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch
+ 1)
append(log, numberOfRecords, epoch)
-
- Using(log.createNewSnapshotUnchecked(greaterEpochSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, greaterEpochSnapshotId)
assertTrue(log.truncateToLatestSnapshot())
assertEquals(greaterEpochSnapshotId.offset, log.startOffset)
@@ -376,27 +359,18 @@ final class KafkaMetadataLogTest {
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId1)
append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId2)
append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId3)
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
- append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot
=>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, greaterSnapshotId)
assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId())
assertTrue(log.truncateToLatestSnapshot())
@@ -487,7 +461,7 @@ final class KafkaMetadataLogTest {
metadataDir: File,
snapshotId: OffsetAndEpoch
): Unit = {
- Using(FileRawSnapshotWriter.create(metadataDir.toPath,
snapshotId))(_.freeze())
+ Using.resource(FileRawSnapshotWriter.create(metadataDir.toPath,
snapshotId))(_.freeze())
}
@Test
@@ -499,18 +473,14 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
val olderEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch - 1)
- Using(log.createNewSnapshotUnchecked(olderEpochSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
-
+ createNewSnapshotUnckecked(log, olderEpochSnapshotId)
assertFalse(log.truncateToLatestSnapshot())
append(log, numberOfRecords, epoch)
+
val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(olderOffsetSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, olderOffsetSnapshotId)
assertFalse(log.truncateToLatestSnapshot())
}
@@ -523,10 +493,7 @@ final class KafkaMetadataLogTest {
val snapshotId = new OffsetAndEpoch(1, epoch)
append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
-
+ createNewSnapshotUnckecked(log, snapshotId)
log.close()
// Create a few partial snapshots
@@ -560,27 +527,19 @@ final class KafkaMetadataLogTest {
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId1)
append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId2)
append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId3)
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot
=>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, greaterSnapshotId)
log.close()
@@ -609,9 +568,7 @@ final class KafkaMetadataLogTest {
val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1)
append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
log.close()
@@ -707,9 +664,7 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords,
epoch - 1)
assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@@ -727,9 +682,8 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
+
// Simulate log cleaning advancing the LSO
log.log.maybeIncrementLogStartOffset(offset,
LogStartOffsetIncrementReason.SegmentDeletion)
@@ -749,9 +703,7 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
@@ -766,9 +718,7 @@ final class KafkaMetadataLogTest {
val log = buildMetadataLog(tempDir, mockTime)
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
log.truncateToLatestSnapshot()
@@ -790,9 +740,7 @@ final class KafkaMetadataLogTest {
val log = buildMetadataLog(tempDir, mockTime)
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
log.truncateToLatestSnapshot()
append(log, numOfRecords, epoch = 3)
@@ -872,16 +820,10 @@ final class KafkaMetadataLogTest {
assertFalse(log.maybeClean(), "Should not clean since no snapshots exist")
val snapshotId1 = new OffsetAndEpoch(1000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
- append(snapshot, 100)
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId1)
val snapshotId2 = new OffsetAndEpoch(2000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
- append(snapshot, 100)
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId2)
val lsoBefore = log.startOffset()
assertTrue(log.maybeClean(), "Expected to clean since there was at least
one snapshot")
@@ -910,10 +852,7 @@ final class KafkaMetadataLogTest {
for (offset <- Seq(100, 200, 300, 400, 500, 600)) {
val snapshotId = new OffsetAndEpoch(offset, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- append(snapshot, 10)
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
}
assertEquals(6, log.snapshotCount())
@@ -945,14 +884,14 @@ final class KafkaMetadataLogTest {
// Then generate two snapshots
val snapshotId1 = new OffsetAndEpoch(1000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
+ Using.resource(log.createNewSnapshotUnchecked(snapshotId1).get()) {
snapshot =>
append(snapshot, 500)
snapshot.freeze()
}
// Then generate a snapshot
val snapshotId2 = new OffsetAndEpoch(2000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
+ Using.resource(log.createNewSnapshotUnchecked(snapshotId2).get()) {
snapshot =>
append(snapshot, 500)
snapshot.freeze()
}
@@ -992,17 +931,14 @@ final class KafkaMetadataLogTest {
log.log.logSegments.asScala.drop(1).head.baseOffset,
1
)
- Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId1)
+
// Generate second snapshots that includes the second segment by using the
base offset of the third segment
val snapshotId2 = new OffsetAndEpoch(
log.log.logSegments.asScala.drop(2).head.baseOffset,
1
)
- Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId2)
// Sleep long enough to trigger a possible segment delete because of the
default retention
val defaultLogRetentionMs = LogConfig.DEFAULT_RETENTION_MS * 2
@@ -1074,6 +1010,18 @@ object KafkaMetadataLogTest {
log
}
+ def createNewSnapshot(log: KafkaMetadataLog, snapshotId: OffsetAndEpoch):
Unit = {
+ Using.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
+ snapshot.freeze()
+ }
+ }
+
+ def createNewSnapshotUnckecked(log: KafkaMetadataLog, snapshotId:
OffsetAndEpoch): Unit = {
+ Using.resource(log.createNewSnapshotUnchecked(snapshotId).get()) {
snapshot =>
+ snapshot.freeze()
+ }
+ }
+
def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int):
LogAppendInfo = {
log.appendAsLeader(
MemoryRecords.withRecords(
@@ -1103,4 +1051,4 @@ object KafkaMetadataLogTest {
}
dir
}
-}
+}
\ No newline at end of file
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index 767549a7b53..dee5ffd581d 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -58,7 +58,7 @@ class CoordinatorLoaderImplTest {
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -78,7 +78,7 @@ class CoordinatorLoaderImplTest {
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -99,7 +99,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -202,7 +202,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -245,7 +245,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -285,7 +285,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -311,7 +311,8 @@ class CoordinatorLoaderImplTest {
.thenThrow(new RuntimeException("Error!"))
val ex = assertFutureThrows(loader.load(tp, coordinator),
classOf[RuntimeException])
- assertEquals("Error!", ex.getMessage)
+
+ assertEquals(s"Error!", ex.getMessage)
}
}
@@ -326,7 +327,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -358,7 +359,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
val time = new MockTime()
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time,
replicaManager = replicaManager,
deserializer = serde,
@@ -413,7 +414,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -488,7 +489,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -514,7 +515,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -590,7 +591,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 6b4badb388e..4b7d8821367 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -155,7 +155,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
adminClientConfig: Properties = new Properties
): Unit = {
if (isKRaftTest()) {
- Using(createAdminClient(brokers, listenerName, adminClientConfig)) {
admin =>
+ Using.resource(createAdminClient(brokers, listenerName,
adminClientConfig)) { admin =>
TestUtils.createOffsetsTopicWithAdmin(admin, brokers,
controllerServers)
}
} else {
@@ -234,7 +234,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
listenerName: ListenerName = listenerName
): Unit = {
if (isKRaftTest()) {
- Using(createAdminClient(brokers, listenerName)) { admin =>
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = topic,
@@ -428,7 +428,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
def changeClientIdConfig(sanitizedClientId: String, configs: Properties):
Unit = {
if (isKRaftTest()) {
- Using(createAdminClient(brokers, listenerName)) {
+ Using.resource(createAdminClient(brokers, listenerName)) {
admin => {
admin.alterClientQuotas(Collections.singleton(
new ClientQuotaAlteration(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index ff697dad370..3e952ae400f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -112,7 +112,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
//replicate for each of the two follower brokers.
if (!leaderThrottle) throttle = throttle * 3
- Using(createAdminClient(brokers, listenerName)) { admin =>
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
if (isKRaftTest()) {
(106 to 107).foreach(registerBroker)
}
@@ -223,7 +223,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
val expectedDuration = 4
val throttle: Long = msg.length * msgCount / expectedDuration
- Using(createAdminClient(brokers, listenerName)) { admin =>
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
if (isKRaftTest()) {
registerBroker(101)
}
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cb73a53c3d2..941599eda94 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -288,7 +288,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness
with Logging {
}
private def createTopic(topic: String, partitionReplicaAssignment:
collection.Map[Int, Seq[Int]]): Unit = {
- Using(createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
+ Using.resource(createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 82c2bfd03ad..72748ef67cf 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -555,7 +555,7 @@ class DumpLogSegmentsTest {
val lastContainedLogTimestamp = 10000
- Using(
+ Using.resource(
new RecordsSnapshotWriter.Builder()
.setTime(new MockTime)
.setLastContainedLogTimestamp(lastContainedLogTimestamp)