This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 74d5c9165423c545b3ce08629074afadbeb767b7 Author: Anna Povzner <[email protected]> AuthorDate: Tue Aug 28 10:45:00 2018 -0700 KAFKA-7128; Follower has to catch up to offset within current leader epoch to join ISR (#5557) If follower is not in ISR, it has to fetch up to start offset of the current leader epoch. Otherwise we risk losing committed data. Added unit test to verify this behavior. Reviewers: Jason Gustafson <[email protected]> --- core/src/main/scala/kafka/cluster/Partition.scala | 25 ++++-- .../scala/unit/kafka/cluster/PartitionTest.scala | 89 +++++++++++++++++++++- .../unit/kafka/server/ReplicaManagerTest.scala | 4 +- 3 files changed, 107 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f953722..e3a8186 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -63,6 +63,9 @@ class Partition(val topic: String, private val leaderIsrUpdateLock = new ReentrantReadWriteLock private var zkVersion: Int = LeaderAndIsr.initialZKVersion @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 + // start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition), + // defined when this broker is leader for partition + @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None @volatile var leaderReplicaIdOpt: Option[Int] = None @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica] @@ -241,6 +244,7 @@ class Partition(val topic: String, allReplicasMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None + leaderEpochStartOffsetOpt = None removePartitionMetrics() logManager.asyncDelete(topicPartition) logManager.asyncDelete(topicPartition, isFuture = true) @@ -265,17 +269,19 @@ class Partition(val topic: String, // remove assigned replicas that have been removed by the controller (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica) inSyncReplicas = newInSyncReplicas + newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew)) - info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch") + val leaderReplica = getReplica().get + val leaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset + info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from " + + s"offset $leaderEpochStartOffset. Previous Leader Epoch was: $leaderEpoch") //We cache the leader epoch here, persisting it only if it's local (hence having a log dir) leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch - newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew)) - + leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionStateInfo.basePartitionState.zkVersion val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) - val leaderReplica = getReplica().get val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. @@ -321,6 +327,7 @@ class Partition(val topic: String, (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica) inSyncReplicas = Set.empty[Replica] leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch + leaderEpochStartOffsetOpt = None zkVersion = partitionStateInfo.basePartitionState.zkVersion // If the leader is unchanged and the epochs are no more than one change apart, indicate that no follower changes are required @@ -365,7 +372,11 @@ class Partition(val topic: String, /** * Check and maybe expand the ISR of the partition. - * A replica will be added to ISR if its LEO >= current hw of the partition. + * A replica will be added to ISR if its LEO >= current hw of the partition and it is caught up to + * an offset within the current leader epoch. A replica must be caught up to the current leader + * epoch before it can join ISR, because otherwise, if there is committed data between current + * leader's HW and LEO, the replica may become the leader before it fetches the committed data + * and the data will be lost. * * Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs, * even if its log end offset is >= HW. However, to be consistent with how the follower determines @@ -382,9 +393,11 @@ class Partition(val topic: String, case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark + val fetchOffset = logReadResult.info.fetchOffsetMetadata.messageOffset if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && - replica.logEndOffset.offsetDiff(leaderHW) >= 0) { + replica.logEndOffset.offsetDiff(leaderHW) >= 0 && + leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) { val newInSyncReplicas = inSyncReplicas + replica info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}") diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 6de8e55..b238b8e 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -22,10 +22,10 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import kafka.common.UnexpectedAppendOffsetException -import kafka.log.{Log, LogConfig, LogManager, CleanerConfig} +import kafka.log.{LogConfig, LogManager, CleanerConfig} import kafka.server._ import kafka.utils.{MockTime, TestUtils, MockScheduler} -import kafka.utils.timer.MockTimer +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.ReplicaNotAvailableException import org.apache.kafka.common.metrics.Metrics @@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.junit.{After, Before, Test} import org.junit.Assert._ import org.scalatest.Assertions.assertThrows +import org.easymock.EasyMock import scala.collection.JavaConverters._ @@ -69,10 +70,16 @@ class PartitionTest { val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) brokerProps.put("log.dir", logDir.getAbsolutePath) val brokerConfig = KafkaConfig.fromProps(brokerProps) + val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) replicaManager = new ReplicaManager( - config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time), + config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time), logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""), brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size)) + + EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(logProps).anyTimes() + EasyMock.expect(kafkaZkClient.conditionalUpdatePath(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn((true, 0)).anyTimes() + EasyMock.replay(kafkaZkClient) } @After @@ -185,6 +192,82 @@ class PartitionTest { assertFalse(partition.makeFollower(0, partitionStateInfo, 2)) } + @Test + def testFollowerDoesNotJoinISRUntilCaughtUpToOffsetWithinCurrentLeaderEpoch(): Unit = { + val controllerEpoch = 3 + val leader = brokerId + val follower1 = brokerId + 1 + val follower2 = brokerId + 2 + val controllerId = brokerId + 3 + val replicas = List[Integer](leader, follower1, follower2).asJava + val isr = List[Integer](leader, follower2).asJava + val leaderEpoch = 8 + val batch1 = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes))) + val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes), + new SimpleRecord("k4".getBytes, "v2".getBytes), + new SimpleRecord("k5".getBytes, "v3".getBytes))) + val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes), + new SimpleRecord("k7".getBytes, "v2".getBytes))) + + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + assertTrue("Expected first makeLeader() to return 'leader changed'", + partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)) + assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch) + assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId)) + + // after makeLeader(() call, partition should know about all the replicas + val leaderReplica = partition.getReplica(leader).get + val follower1Replica = partition.getReplica(follower1).get + val follower2Replica = partition.getReplica(follower2).get + + // append records with initial leader epoch + val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, isFromClient = true).lastOffset + partition.appendRecordsToLeader(batch2, isFromClient = true) + assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset) + + // let the follower in ISR move leader's HW to move further but below LEO + def readResult(fetchInfo: FetchDataInfo, leaderReplica: Replica): LogReadResult = { + LogReadResult(info = fetchInfo, + highWatermark = leaderReplica.highWatermark.messageOffset, + leaderLogStartOffset = leaderReplica.logStartOffset, + leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset, + followerLogStartOffset = 0, + fetchTimeMs = time.milliseconds, + readSize = 10240, + lastStableOffset = None) + } + partition.updateReplicaLogReadResult( + follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica)) + partition.updateReplicaLogReadResult( + follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(lastOffsetOfFirstBatch), batch2), leaderReplica)) + assertEquals("Expected leader's HW", lastOffsetOfFirstBatch, leaderReplica.highWatermark.messageOffset) + + // current leader becomes follower and then leader again (without any new records appended) + partition.makeFollower( + controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, replicas, false), 1) + assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()", + partition.makeLeader(controllerEpoch, new LeaderAndIsrRequest.PartitionState( + controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, false), 2)) + val currentLeaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset + + // append records with the latest leader epoch + partition.appendRecordsToLeader(batch3, isFromClient = true) + + // fetch from follower not in ISR from log start offset should not add this follower to ISR + partition.updateReplicaLogReadResult(follower1Replica, + readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica)) + partition.updateReplicaLogReadResult(follower1Replica, + readResult(FetchDataInfo(LogOffsetMetadata(lastOffsetOfFirstBatch), batch2), leaderReplica)) + assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId)) + + // fetch from the follower not in ISR from start offset of the current leader epoch should + // add this follower to ISR + partition.updateReplicaLogReadResult(follower1Replica, + readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica)) + assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId)) + } + def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) val builder = MemoryRecords.builder( diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 171bcf3..3be33a2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -627,7 +627,7 @@ class ReplicaManagerTest { val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache]) EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader) EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader)) - .andReturn((leaderEpochFromLeader, localLogOffset)) + .andReturn(localLogOffset) EasyMock.replay(mockLeaderEpochCache) val mockLog = new Log( dir = new File(new File(config.logDirs.head), s"$topic-0"), @@ -682,7 +682,7 @@ class ReplicaManagerTest { // Mock network client to show leader offset of 5 val quota = QuotaFactory.instantiate(config, metrics, time, "") val blockingSend = new ReplicaFetcherMockBlockingSend(Map(new TopicPartition(topic, topicPartition) -> - new EpochEndOffset(leaderEpochFromLeader, offsetFromLeader)).asJava, BrokerEndPoint(1, "host1" ,1), time) + new EpochEndOffset(offsetFromLeader)).asJava, BrokerEndPoint(1, "host1" ,1), time) val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr, new AtomicBoolean(false), quota, mockBrokerTopicStats, metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory,
