This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 972b76561a9 MINOR: Rename remaining `zkVersion` to `partitionEpoch` in
`PartitionTest` (#12147)
972b76561a9 is described below
commit 972b76561a916158e8fef30e9af116465f1f567e
Author: David Jacot <[email protected]>
AuthorDate: Tue May 17 08:58:43 2022 +0200
MINOR: Rename remaining `zkVersion` to `partitionEpoch` in `PartitionTest`
(#12147)
Reviewers: Kvicii <[email protected]>, dengziming
<[email protected]>, Jason Gustafson <[email protected]>
---
.../unit/kafka/cluster/AbstractPartitionTest.scala | 12 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 152 ++++++++++-----------
2 files changed, 82 insertions(+), 82 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 2117d87e455..969f8a2e793 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -49,8 +49,8 @@ class AbstractPartitionTest {
var logDir1: File = _
var logDir2: File = _
var logManager: LogManager = _
- var alterIsrManager: MockAlterPartitionManager = _
- var isrChangeListener: MockAlterPartitionListener = _
+ var alterPartitionManager: MockAlterPartitionManager = _
+ var alterPartitionListener: MockAlterPartitionListener = _
var logConfig: LogConfig = _
var configRepository: MockConfigRepository = _
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
@@ -73,18 +73,18 @@ class AbstractPartitionTest {
CleanerConfig(enableCleaner = false), time, interBrokerProtocolVersion)
logManager.startup(Set.empty)
- alterIsrManager = TestUtils.createAlterIsrManager()
- isrChangeListener = TestUtils.createIsrChangeListener()
+ alterPartitionManager = TestUtils.createAlterIsrManager()
+ alterPartitionListener = TestUtils.createIsrChangeListener()
partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = interBrokerProtocolVersion,
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
- alterIsrManager)
+ alterPartitionManager)
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString,
ArgumentMatchers.eq(topicPartition)))
.thenReturn(None)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 8645c4c9f72..e394118103a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -204,11 +204,11 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
- alterIsrManager) {
+ alterPartitionManager) {
override def createLog(isNew: Boolean, isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints,
None)
@@ -618,7 +618,7 @@ class PartitionTest extends AbstractPartitionTest {
updateFollowerFetchState(follower2, LogOffsetMetadata(2))
// Simulate successful ISR update
- alterIsrManager.completeIsrUpdate(2)
+ alterPartitionManager.completeIsrUpdate(2)
// At this point, the leader has gotten 5 writes, but followers have only
fetched two
assertEquals(2, partition.localLogOrException.highWatermark)
@@ -708,7 +708,7 @@ class PartitionTest extends AbstractPartitionTest {
updateFollowerFetchState(follower2, LogOffsetMetadata(5))
// Simulate successful ISR update
- alterIsrManager.completeIsrUpdate(6)
+ alterPartitionManager.completeIsrUpdate(6)
// Error goes away
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Some(IsolationLevel.READ_UNCOMMITTED)) match {
@@ -970,7 +970,7 @@ class PartitionTest extends AbstractPartitionTest {
// Expansion does not affect the ISR
assertEquals(Set[Integer](leader, follower2),
partition.partitionState.isr, "ISR")
assertEquals(Set[Integer](leader, follower1, follower2),
partition.partitionState.maximalIsr, "ISR")
- assertEquals(alterIsrManager.isrUpdates.head.leaderAndIsr.isr.toSet,
+ assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.toSet,
Set(leader, follower1, follower2), "AlterIsr")
}
@@ -1123,20 +1123,20 @@ class PartitionTest extends AbstractPartitionTest {
// Check that the isr didn't change and alter update is scheduled
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
- assertEquals(1, alterIsrManager.isrUpdates.size)
- assertEquals(Set(brokerId, remoteBrokerId),
alterIsrManager.isrUpdates.head.leaderAndIsr.isr.toSet)
+ assertEquals(1, alterPartitionManager.isrUpdates.size)
+ assertEquals(Set(brokerId, remoteBrokerId),
alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.toSet)
// Simulate invalid request failure
- alterIsrManager.failIsrUpdate(Errors.INVALID_REQUEST)
+ alterPartitionManager.failIsrUpdate(Errors.INVALID_REQUEST)
// Still no ISR change and no retry
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
- assertEquals(0, alterIsrManager.isrUpdates.size)
+ assertEquals(0, alterPartitionManager.isrUpdates.size)
- assertEquals(0, isrChangeListener.expands.get)
- assertEquals(0, isrChangeListener.shrinks.get)
- assertEquals(1, isrChangeListener.failures.get)
+ assertEquals(0, alterPartitionListener.expands.get)
+ assertEquals(0, alterPartitionListener.shrinks.get)
+ assertEquals(1, alterPartitionListener.failures.get)
}
@Test
@@ -1188,8 +1188,8 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 6L)
- assertEquals(alterIsrManager.isrUpdates.size, 1)
- val isrItem = alterIsrManager.isrUpdates.head
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
+ val isrItem = alterPartitionManager.isrUpdates.head
assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId))
assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
@@ -1200,12 +1200,12 @@ class PartitionTest extends AbstractPartitionTest {
)
// Complete the ISR expansion
- alterIsrManager.completeIsrUpdate(2)
+ alterPartitionManager.completeIsrUpdate(2)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
- assertEquals(isrChangeListener.expands.get, 1)
- assertEquals(isrChangeListener.shrinks.get, 0)
- assertEquals(isrChangeListener.failures.get, 0)
+ assertEquals(alterPartitionListener.expands.get, 1)
+ assertEquals(alterPartitionListener.shrinks.get, 0)
+ assertEquals(alterPartitionListener.failures.get, 0)
}
@Test
@@ -1247,7 +1247,7 @@ class PartitionTest extends AbstractPartitionTest {
// Follower state is updated, but the ISR has not expanded
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
- assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
@@ -1255,15 +1255,15 @@ class PartitionTest extends AbstractPartitionTest {
)
// Simulate failure callback
- alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
+ alterPartitionManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Still no ISR change and it doesn't retry
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
- assertEquals(alterIsrManager.isrUpdates.size, 0)
- assertEquals(isrChangeListener.expands.get, 0)
- assertEquals(isrChangeListener.shrinks.get, 0)
- assertEquals(isrChangeListener.failures.get, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionListener.expands.get, 0)
+ assertEquals(alterPartitionListener.shrinks.get, 0)
+ assertEquals(alterPartitionListener.failures.get, 1)
}
@Test
@@ -1284,7 +1284,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1294,26 +1294,26 @@ class PartitionTest extends AbstractPartitionTest {
// Try to shrink the ISR
partition.maybeShrinkIsr()
- assertEquals(alterIsrManager.isrUpdates.size, 1)
- assertEquals(alterIsrManager.isrUpdates.head.leaderAndIsr.isr,
List(brokerId))
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr,
List(brokerId))
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
// The shrink fails and we retry
- alterIsrManager.failIsrUpdate(Errors.NETWORK_EXCEPTION)
- assertEquals(0, isrChangeListener.shrinks.get)
- assertEquals(1, isrChangeListener.failures.get)
+ alterPartitionManager.failIsrUpdate(Errors.NETWORK_EXCEPTION)
+ assertEquals(0, alterPartitionListener.shrinks.get)
+ assertEquals(1, alterPartitionListener.failures.get)
assertEquals(1, partition.getPartitionEpoch)
- assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
assertEquals(0L, partition.localLogOrException.highWatermark)
// The shrink succeeds after retrying
- alterIsrManager.completeIsrUpdate(newPartitionEpoch = 2)
- assertEquals(1, isrChangeListener.shrinks.get)
+ alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
+ assertEquals(1, alterPartitionListener.shrinks.get)
assertEquals(2, partition.getPartitionEpoch)
- assertEquals(alterIsrManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId), partition.partitionState.maximalIsr)
assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark)
@@ -1337,7 +1337,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1357,19 +1357,19 @@ class PartitionTest extends AbstractPartitionTest {
// Shrink the ISR
partition.maybeShrinkIsr()
- assertEquals(0, isrChangeListener.shrinks.get)
- assertEquals(alterIsrManager.isrUpdates.size, 1)
- assertEquals(alterIsrManager.isrUpdates.head.leaderAndIsr.isr,
List(brokerId))
+ assertEquals(0, alterPartitionListener.shrinks.get)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr,
List(brokerId))
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
assertEquals(0L, partition.localLogOrException.highWatermark)
// After the ISR shrink completes, the ISR state should be updated and the
// high watermark should be advanced
- alterIsrManager.completeIsrUpdate(newPartitionEpoch = 2)
- assertEquals(1, isrChangeListener.shrinks.get)
+ alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
+ assertEquals(1, alterPartitionListener.shrinks.get)
assertEquals(2, partition.getPartitionEpoch)
- assertEquals(alterIsrManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId), partition.partitionState.maximalIsr)
assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark)
@@ -1393,7 +1393,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1416,7 +1416,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 2,
+ partitionEpoch = 2,
isNew = false
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1428,7 +1428,7 @@ class PartitionTest extends AbstractPartitionTest {
assertFalse(partition.partitionState.isInflight, "ISR should still be
committed and not inflight")
// Complete the AlterIsr update and now we can make modifications again
- alterIsrManager.completeIsrUpdate(10)
+ alterPartitionManager.completeIsrUpdate(10)
partition.maybeShrinkIsr()
assertTrue(partition.partitionState.isInflight, "ISR should be pending a
shrink")
}
@@ -1451,7 +1451,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1497,7 +1497,7 @@ class PartitionTest extends AbstractPartitionTest {
// time of the first fetch.
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
- assertEquals(alterIsrManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
}
@Test
@@ -1518,7 +1518,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1548,7 +1548,7 @@ class PartitionTest extends AbstractPartitionTest {
// The ISR should not be shrunk because the follower is caught up to the
leader's log end
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
- assertEquals(alterIsrManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
}
@Test
@@ -1569,7 +1569,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1585,16 +1585,16 @@ class PartitionTest extends AbstractPartitionTest {
// Enqueue and AlterIsr that will fail
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
- assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertEquals(0L, partition.localLogOrException.highWatermark)
// Simulate failure callback
- alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
+ alterPartitionManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Ensure ISR hasn't changed
assertEquals(partition.partitionState.getClass, classOf[PendingShrinkIsr])
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
- assertEquals(alterIsrManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
assertEquals(0L, partition.localLogOrException.highWatermark)
}
@@ -1604,7 +1604,7 @@ class PartitionTest extends AbstractPartitionTest {
(brokerId: Int, remoteBrokerId: Int, partition: Partition) => {
assertEquals(partition.partitionState.isr, Set(brokerId))
assertEquals(partition.partitionState.maximalIsr, Set(brokerId,
remoteBrokerId))
- assertEquals(alterIsrManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
})
}
@@ -1614,7 +1614,7 @@ class PartitionTest extends AbstractPartitionTest {
(brokerId: Int, remoteBrokerId: Int, partition: Partition) => {
assertEquals(partition.partitionState.isr, Set(brokerId))
assertEquals(partition.partitionState.maximalIsr, Set(brokerId,
remoteBrokerId))
- assertEquals(alterIsrManager.isrUpdates.size, 0)
+ assertEquals(alterPartitionManager.isrUpdates.size, 0)
})
}
@@ -1625,7 +1625,7 @@ class PartitionTest extends AbstractPartitionTest {
// We retry these
assertEquals(partition.partitionState.isr, Set(brokerId))
assertEquals(partition.partitionState.maximalIsr, Set(brokerId,
remoteBrokerId))
- assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
})
}
@@ -1645,7 +1645,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(10L, partition.localLogOrException.highWatermark)
@@ -1667,7 +1667,7 @@ class PartitionTest extends AbstractPartitionTest {
// Follower state is updated, but the ISR has not expanded
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
- assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = firstFetchTimeMs,
logStartOffset = 0L,
@@ -1675,7 +1675,7 @@ class PartitionTest extends AbstractPartitionTest {
)
// Failure
- alterIsrManager.failIsrUpdate(error)
+ alterPartitionManager.failIsrUpdate(error)
callback(brokerId, remoteBrokerId, partition)
}
@@ -1700,7 +1700,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1717,12 +1717,12 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(Set(brokerId, follower1, follower2, follower3),
partition.partitionState.maximalIsr)
// One AlterIsr request in-flight
- assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
// Try to modify ISR again, should do nothing
time.sleep(partition.replicaLagTimeMaxMs + 1)
partition.maybeShrinkIsr()
- assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterPartitionManager.isrUpdates.size, 1)
}
@Test
@@ -1744,7 +1744,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = IBP_2_6_IV0, // shouldn't matter, but set
this to a ZK isr version
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
@@ -1770,7 +1770,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
- zkVersion = 1,
+ partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1841,11 +1841,11 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
- alterIsrManager)
+ alterPartitionManager)
// partition2 should not yet be associated with the log, but should be
able to get ID
assertTrue(partition2.topicId.isDefined)
@@ -1885,11 +1885,11 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
- alterIsrManager)
+ alterPartitionManager)
// partition2 should not yet be associated with the log, but should be
able to get ID
assertTrue(partition2.topicId.isDefined)
@@ -2010,11 +2010,11 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
spyLogManager,
- alterIsrManager)
+ alterPartitionManager)
partition.createLog(isNew = true, isFutureReplica = false,
offsetCheckpoints, topicId = None)
@@ -2048,11 +2048,11 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
spyLogManager,
- alterIsrManager)
+ alterPartitionManager)
partition.createLog(isNew = true, isFutureReplica = false,
offsetCheckpoints, topicId = None)
@@ -2089,11 +2089,11 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
time,
- isrChangeListener,
+ alterPartitionListener,
delayedOperations,
metadataCache,
spyLogManager,
- alterIsrManager)
+ alterPartitionManager)
partition.createLog(isNew = true, isFutureReplica = false,
offsetCheckpoints, topicId = None)
@@ -2310,7 +2310,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpoch: Int,
isr: Seq[Int],
replicas: Seq[Int],
- zkVersion: Int,
+ partitionEpoch: Int,
isNew: Boolean,
partition: Partition = partition
): Boolean = {
@@ -2326,7 +2326,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr.map(Int.box).asJava)
- .setPartitionEpoch(zkVersion)
+ .setPartitionEpoch(partitionEpoch)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(isNew),
offsetCheckpoints,
@@ -2338,7 +2338,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(isr.toSet, partition.partitionState.isr)
assertEquals(isr.toSet, partition.partitionState.maximalIsr)
- assertEquals(zkVersion, partition.getPartitionEpoch)
+ assertEquals(partitionEpoch, partition.getPartitionEpoch)
newLeader
}