This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e9e181b3269a8ff60aadf940378be52f5539c17f Author: Jason Gustafson <[email protected]> AuthorDate: Sat Jul 10 10:47:46 2021 -0700 KAFKA-13056; Do not rely on broker for snapshots if controller is co-resident (#11013) When a node is serving as both broker and controller, we should only rely on the controller to write new snapshots. Reviewers: Luke Chen <[email protected]>, Colin P. McCabe <[email protected]> --- .../src/main/scala/kafka/server/BrokerServer.scala | 22 +++-- .../server/metadata/BrokerMetadataListener.scala | 19 ++-- .../metadata/BrokerMetadataListenerTest.scala | 101 ++++++++++++++------- 3 files changed, 94 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index a29e759..16856f8 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -31,6 +31,7 @@ import kafka.metrics.KafkaYammerMetrics import kafka.network.SocketServer import kafka.raft.RaftManager import kafka.security.CredentialProvider +import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder} import kafka.utils.{CoreUtils, KafkaScheduler} import org.apache.kafka.snapshot.SnapshotWriter @@ -145,7 +146,7 @@ class BrokerServer( val clusterId: String = metaProps.clusterId - var metadataSnapshotter: BrokerMetadataSnapshotter = null + var metadataSnapshotter: Option[BrokerMetadataSnapshotter] = None var metadataListener: BrokerMetadataListener = null @@ -289,10 +290,16 @@ class BrokerServer( ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, None), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) - metadataSnapshotter = new BrokerMetadataSnapshotter(config.nodeId, - time, - threadNamePrefix, - new BrokerSnapshotWriterBuilder(raftManager.client)) + if (!config.processRoles.contains(ControllerRole)) { + // If no controller is defined, we rely on the broker to generate snapshots. + metadataSnapshotter = Some(new BrokerMetadataSnapshotter( + config.nodeId, + time, + threadNamePrefix, + new BrokerSnapshotWriterBuilder(raftManager.client) + )) + } + metadataListener = new BrokerMetadataListener(config.nodeId, time, threadNamePrefix, @@ -437,9 +444,8 @@ class BrokerServer( if (metadataListener != null) { CoreUtils.swallow(metadataListener.close(), this) } - if (metadataSnapshotter != null) { - CoreUtils.swallow(metadataSnapshotter.close(), this) - } + metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this)) + if (transactionCoordinator != null) CoreUtils.swallow(transactionCoordinator.shutdown(), this) if (groupCoordinator != null) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 9369384..c4c027a 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -39,7 +39,7 @@ class BrokerMetadataListener( time: Time, threadNamePrefix: Option[String], val maxBytesBetweenSnapshots: Long, - val snapshotter: MetadataSnapshotter + val snapshotter: Option[MetadataSnapshotter] ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup { private val logContext = new LogContext(s"[BrokerMetadataListener id=${brokerId}] ") private val log = logContext.logger(classOf[BrokerMetadataListener]) @@ -121,14 +121,17 @@ class BrokerMetadataListener( } finally { reader.close() } - _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes _publisher.foreach(publish(_, results.highestMetadataOffset)) - if (shouldSnapshot()) { - if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset, - _highestEpoch, - _highestTimestamp, - _delta.apply())) { - _bytesSinceLastSnapshot = 0L + + snapshotter.foreach { snapshotter => + _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes + if (shouldSnapshot()) { + if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset, + _highestEpoch, + _highestTimestamp, + _delta.apply())) { + _bytesSinceLastSnapshot = 0L + } } } } diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index 81a22e1..735c779 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -30,19 +30,20 @@ import org.apache.kafka.server.common.ApiMessageAndVersion import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Test +import scala.jdk.CollectionConverters._ class BrokerMetadataListenerTest { @Test def testCreateAndClose(): Unit = { val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L, - (_, _, _, _) => throw new UnsupportedOperationException()) + snapshotter = None) listener.close() } @Test def testPublish(): Unit = { val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L, - (_, _, _, _) => throw new UnsupportedOperationException()) + snapshotter = None) try { listener.handleCommit(RecordTestUtils.mockBatchReader(100L, util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord(). @@ -50,7 +51,7 @@ class BrokerMetadataListenerTest { setBrokerEpoch(100L). setFenced(false). setRack(null). - setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 1)))); + setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 0.toShort)))) val imageRecords = listener.getImageRecords().get() assertEquals(0, imageRecords.size()) assertEquals(100L, listener.highestMetadataOffset()) @@ -60,7 +61,7 @@ class BrokerMetadataListenerTest { setBrokerEpoch(200L). setFenced(true). setRack(null). - setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 1)))); + setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 0.toShort)))) listener.startPublishing(new MetadataPublisher { override def publish(newHighestMetadataOffset: Long, delta: MetadataDelta, @@ -142,37 +143,39 @@ class BrokerMetadataListenerTest { } @Test + def testHandleCommitsWithNoSnapshotterDefined(): Unit = { + val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, + snapshotter = None) + try { + val brokerIds = 0 to 3 + + registerBrokers(listener, brokerIds, endOffset = 100L) + createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L) + listener.getImageRecords().get() + assertEquals(200L, listener.highestMetadataOffset()) + + generateManyRecords(listener, endOffset = 1000L) + assertEquals(1000L, listener.highestMetadataOffset()) + } finally { + listener.close() + } + } + + @Test def testCreateSnapshot(): Unit = { val snapshotter = new MockMetadataSnapshotter() - val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, snapshotter) + val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, Some(snapshotter)) try { - (0 to 3).foreach { - id => listener.handleCommit(RecordTestUtils.mockBatchReader(100L, - util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord(). - setBrokerId(id). - setBrokerEpoch(100L). - setFenced(false). - setRack(null). - setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + id)), 1)))) - } - listener.handleCommit(RecordTestUtils.mockBatchReader(200L, - util.Arrays.asList(new ApiMessageAndVersion(new TopicRecord(). - setName("foo"). - setTopicId(FOO_ID), 1.toShort), - new ApiMessageAndVersion(new PartitionRecord(). - setPartitionId(0). - setTopicId(FOO_ID). - setIsr(util.Arrays.asList(0, 1, 2)). - setLeader(0). - setReplicas(util.Arrays.asList(0, 1, 2)). - setRemovingReplicas(util.Arrays.asList(0, 1, 2)). - setAddingReplicas(util.Arrays.asList(0, 1, 2)), 1.toShort)))) + val brokerIds = 0 to 3 + + registerBrokers(listener, brokerIds, endOffset = 100L) + createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L) listener.getImageRecords().get() assertEquals(200L, listener.highestMetadataOffset()) // Check that we generate at least one snapshot once we see enough records. assertEquals(-1L, snapshotter.prevCommittedOffset) - generateManyRecords(listener, 1000L); + generateManyRecords(listener, 1000L) assertEquals(1000L, snapshotter.prevCommittedOffset) assertEquals(1000L, snapshotter.activeSnapshotOffset) snapshotter.activeSnapshotOffset = -1L @@ -180,18 +183,18 @@ class BrokerMetadataListenerTest { // Test creating a new snapshot after publishing it. val publisher = new MockMetadataPublisher() listener.startPublishing(publisher).get() - generateManyRecords(listener, 2000L); + generateManyRecords(listener, 2000L) listener.getImageRecords().get() assertEquals(2000L, snapshotter.activeSnapshotOffset) assertEquals(2000L, snapshotter.prevCommittedOffset) // Test how we handle the snapshotter returning false. - generateManyRecords(listener, 3000L); + generateManyRecords(listener, 3000L) assertEquals(2000L, snapshotter.activeSnapshotOffset) - generateManyRecords(listener, 4000L); + generateManyRecords(listener, 4000L) assertEquals(2000L, snapshotter.activeSnapshotOffset) snapshotter.activeSnapshotOffset = -1L - generateManyRecords(listener, 5000L); + generateManyRecords(listener, 5000L) assertEquals(5000L, snapshotter.activeSnapshotOffset) assertEquals(null, snapshotter.failure.get()) } finally { @@ -199,5 +202,39 @@ class BrokerMetadataListenerTest { } } -} + private def registerBrokers( + listener: BrokerMetadataListener, + brokerIds: Iterable[Int], + endOffset: Long + ): Unit = { + brokerIds.foreach { brokerId => + listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset, + util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord(). + setBrokerId(brokerId). + setBrokerEpoch(100L). + setFenced(false). + setRack(null). + setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + brokerId)), 0.toShort)))) + } + } + private def createTopicWithOnePartition( + listener: BrokerMetadataListener, + replicas: Seq[Int], + endOffset: Long + ): Unit = { + listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset, + util.Arrays.asList( + new ApiMessageAndVersion(new TopicRecord(). + setName("foo"). + setTopicId(FOO_ID), 0.toShort), + new ApiMessageAndVersion(new PartitionRecord(). + setPartitionId(0). + setTopicId(FOO_ID). + setIsr(replicas.map(Int.box).asJava). + setLeader(0). + setReplicas(replicas.map(Int.box).asJava), 0.toShort))) + ) + } + +}
