This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5ef962b KAFKA-13056; Do not rely on broker for snapshots if
controller is co-resident (#11013)
5ef962b is described below
commit 5ef962ba064efd5e073310aa39ab6a9b6ae8b7c3
Author: Jason Gustafson <[email protected]>
AuthorDate: Sat Jul 10 10:47:46 2021 -0700
KAFKA-13056; Do not rely on broker for snapshots if controller is
co-resident (#11013)
When a node is serving as both broker and controller, we should only rely
on the controller to write new snapshots.
Reviewers: Luke Chen <[email protected]>, Colin P. McCabe
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 22 +++--
.../server/metadata/BrokerMetadataListener.scala | 19 ++--
.../metadata/BrokerMetadataListenerTest.scala | 101 ++++++++++++++-------
3 files changed, 94 insertions(+), 48 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index a29e759..16856f8 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
import kafka.raft.RaftManager
import kafka.security.CredentialProvider
+import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher,
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache,
SnapshotWriterBuilder}
import kafka.utils.{CoreUtils, KafkaScheduler}
import org.apache.kafka.snapshot.SnapshotWriter
@@ -145,7 +146,7 @@ class BrokerServer(
val clusterId: String = metaProps.clusterId
- var metadataSnapshotter: BrokerMetadataSnapshotter = null
+ var metadataSnapshotter: Option[BrokerMetadataSnapshotter] = None
var metadataListener: BrokerMetadataListener = null
@@ -289,10 +290,16 @@ class BrokerServer(
ConfigType.Topic -> new TopicConfigHandler(logManager, config,
quotaManagers, None),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
- metadataSnapshotter = new BrokerMetadataSnapshotter(config.nodeId,
- time,
- threadNamePrefix,
- new
BrokerSnapshotWriterBuilder(raftManager.client))
+ if (!config.processRoles.contains(ControllerRole)) {
+ // If no controller is defined, we rely on the broker to generate
snapshots.
+ metadataSnapshotter = Some(new BrokerMetadataSnapshotter(
+ config.nodeId,
+ time,
+ threadNamePrefix,
+ new BrokerSnapshotWriterBuilder(raftManager.client)
+ ))
+ }
+
metadataListener = new BrokerMetadataListener(config.nodeId,
time,
threadNamePrefix,
@@ -437,9 +444,8 @@ class BrokerServer(
if (metadataListener != null) {
CoreUtils.swallow(metadataListener.close(), this)
}
- if (metadataSnapshotter != null) {
- CoreUtils.swallow(metadataSnapshotter.close(), this)
- }
+ metadataSnapshotter.foreach(snapshotter =>
CoreUtils.swallow(snapshotter.close(), this))
+
if (transactionCoordinator != null)
CoreUtils.swallow(transactionCoordinator.shutdown(), this)
if (groupCoordinator != null)
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 9369384..c4c027a 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -39,7 +39,7 @@ class BrokerMetadataListener(
time: Time,
threadNamePrefix: Option[String],
val maxBytesBetweenSnapshots: Long,
- val snapshotter: MetadataSnapshotter
+ val snapshotter: Option[MetadataSnapshotter]
) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
private val logContext = new LogContext(s"[BrokerMetadataListener
id=${brokerId}] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -121,14 +121,17 @@ class BrokerMetadataListener(
} finally {
reader.close()
}
- _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
_publisher.foreach(publish(_, results.highestMetadataOffset))
- if (shouldSnapshot()) {
- if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
- _highestEpoch,
- _highestTimestamp,
- _delta.apply())) {
- _bytesSinceLastSnapshot = 0L
+
+ snapshotter.foreach { snapshotter =>
+ _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+ if (shouldSnapshot()) {
+ if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
+ _highestEpoch,
+ _highestTimestamp,
+ _delta.apply())) {
+ _bytesSinceLastSnapshot = 0L
+ }
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 81a22e1..735c779 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -30,19 +30,20 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
+import scala.jdk.CollectionConverters._
class BrokerMetadataListenerTest {
@Test
def testCreateAndClose(): Unit = {
val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
- (_, _, _, _) => throw new UnsupportedOperationException())
+ snapshotter = None)
listener.close()
}
@Test
def testPublish(): Unit = {
val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
- (_, _, _, _) => throw new UnsupportedOperationException())
+ snapshotter = None)
try {
listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
@@ -50,7 +51,7 @@ class BrokerMetadataListenerTest {
setBrokerEpoch(100L).
setFenced(false).
setRack(null).
- setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 1))));
+ setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")),
0.toShort))))
val imageRecords = listener.getImageRecords().get()
assertEquals(0, imageRecords.size())
assertEquals(100L, listener.highestMetadataOffset())
@@ -60,7 +61,7 @@ class BrokerMetadataListenerTest {
setBrokerEpoch(200L).
setFenced(true).
setRack(null).
- setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 1))));
+ setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")),
0.toShort))))
listener.startPublishing(new MetadataPublisher {
override def publish(newHighestMetadataOffset: Long,
delta: MetadataDelta,
@@ -142,37 +143,39 @@ class BrokerMetadataListenerTest {
}
@Test
+ def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
+ val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L,
+ snapshotter = None)
+ try {
+ val brokerIds = 0 to 3
+
+ registerBrokers(listener, brokerIds, endOffset = 100L)
+ createTopicWithOnePartition(listener, replicas = brokerIds, endOffset =
200L)
+ listener.getImageRecords().get()
+ assertEquals(200L, listener.highestMetadataOffset())
+
+ generateManyRecords(listener, endOffset = 1000L)
+ assertEquals(1000L, listener.highestMetadataOffset())
+ } finally {
+ listener.close()
+ }
+ }
+
+ @Test
def testCreateSnapshot(): Unit = {
val snapshotter = new MockMetadataSnapshotter()
- val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L,
snapshotter)
+ val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L,
Some(snapshotter))
try {
- (0 to 3).foreach {
- id => listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
- util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
- setBrokerId(id).
- setBrokerEpoch(100L).
- setFenced(false).
- setRack(null).
- setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + id)),
1))))
- }
- listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
- util.Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
- setName("foo").
- setTopicId(FOO_ID), 1.toShort),
- new ApiMessageAndVersion(new PartitionRecord().
- setPartitionId(0).
- setTopicId(FOO_ID).
- setIsr(util.Arrays.asList(0, 1, 2)).
- setLeader(0).
- setReplicas(util.Arrays.asList(0, 1, 2)).
- setRemovingReplicas(util.Arrays.asList(0, 1, 2)).
- setAddingReplicas(util.Arrays.asList(0, 1, 2)), 1.toShort))))
+ val brokerIds = 0 to 3
+
+ registerBrokers(listener, brokerIds, endOffset = 100L)
+ createTopicWithOnePartition(listener, replicas = brokerIds, endOffset =
200L)
listener.getImageRecords().get()
assertEquals(200L, listener.highestMetadataOffset())
// Check that we generate at least one snapshot once we see enough
records.
assertEquals(-1L, snapshotter.prevCommittedOffset)
- generateManyRecords(listener, 1000L);
+ generateManyRecords(listener, 1000L)
assertEquals(1000L, snapshotter.prevCommittedOffset)
assertEquals(1000L, snapshotter.activeSnapshotOffset)
snapshotter.activeSnapshotOffset = -1L
@@ -180,18 +183,18 @@ class BrokerMetadataListenerTest {
// Test creating a new snapshot after publishing it.
val publisher = new MockMetadataPublisher()
listener.startPublishing(publisher).get()
- generateManyRecords(listener, 2000L);
+ generateManyRecords(listener, 2000L)
listener.getImageRecords().get()
assertEquals(2000L, snapshotter.activeSnapshotOffset)
assertEquals(2000L, snapshotter.prevCommittedOffset)
// Test how we handle the snapshotter returning false.
- generateManyRecords(listener, 3000L);
+ generateManyRecords(listener, 3000L)
assertEquals(2000L, snapshotter.activeSnapshotOffset)
- generateManyRecords(listener, 4000L);
+ generateManyRecords(listener, 4000L)
assertEquals(2000L, snapshotter.activeSnapshotOffset)
snapshotter.activeSnapshotOffset = -1L
- generateManyRecords(listener, 5000L);
+ generateManyRecords(listener, 5000L)
assertEquals(5000L, snapshotter.activeSnapshotOffset)
assertEquals(null, snapshotter.failure.get())
} finally {
@@ -199,5 +202,39 @@ class BrokerMetadataListenerTest {
}
}
-}
+ private def registerBrokers(
+ listener: BrokerMetadataListener,
+ brokerIds: Iterable[Int],
+ endOffset: Long
+ ): Unit = {
+ brokerIds.foreach { brokerId =>
+ listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+ util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+ setBrokerId(brokerId).
+ setBrokerEpoch(100L).
+ setFenced(false).
+ setRack(null).
+ setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" +
brokerId)), 0.toShort))))
+ }
+ }
+ private def createTopicWithOnePartition(
+ listener: BrokerMetadataListener,
+ replicas: Seq[Int],
+ endOffset: Long
+ ): Unit = {
+ listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+ util.Arrays.asList(
+ new ApiMessageAndVersion(new TopicRecord().
+ setName("foo").
+ setTopicId(FOO_ID), 0.toShort),
+ new ApiMessageAndVersion(new PartitionRecord().
+ setPartitionId(0).
+ setTopicId(FOO_ID).
+ setIsr(replicas.map(Int.box).asJava).
+ setLeader(0).
+ setReplicas(replicas.map(Int.box).asJava), 0.toShort)))
+ )
+ }
+
+}