This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6659777 MINOR: Don't ignore deletion of partition metadata file and
log topic id clean-ups (#10761)
6659777 is described below
commit 6659777b820f32483d1e96e26831cb87f80a8576
Author: Ismael Juma <[email protected]>
AuthorDate: Thu May 27 13:50:51 2021 -0700
MINOR: Don't ignore deletion of partition metadata file and log topic id
clean-ups (#10761)
Log if deletion fails and don't expose log topic id for mutability outside
of `assignTopicId()`.
Also remove an unnecessary parameter in `PartitionTest`.
Reviewers: Chia-Ping Tsai <[email protected]>, Justine Olshan
<[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 18 ++++++++++++------
.../scala/kafka/server/PartitionMetadataFile.scala | 6 ++++--
.../scala/unit/kafka/cluster/PartitionLockTest.scala | 2 +-
.../test/scala/unit/kafka/cluster/PartitionTest.scala | 11 +++++------
.../scala/unit/kafka/log/LogCleanerManagerTest.scala | 2 +-
.../src/test/scala/unit/kafka/log/LogCleanerTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala | 10 +++++-----
core/src/test/scala/unit/kafka/log/LogTest.scala | 9 ++++-----
.../scala/unit/kafka/server/ReplicaManagerTest.scala | 3 +--
.../test/scala/unit/kafka/utils/SchedulerTest.scala | 2 +-
10 files changed, 35 insertions(+), 30 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index fe60c42..0e74c28 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -270,7 +270,7 @@ class Log(@volatile private var _dir: File,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel,
- @volatile var topicId: Option[Uuid],
+ @volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean) extends Logging with
KafkaMetricsGroup {
import kafka.log.Log._
@@ -324,18 +324,24 @@ class Log(@volatile private var _dir: File,
if (partitionMetadataFile.exists()) {
if (keepPartitionMetadataFile) {
val fileTopicId = partitionMetadataFile.read().topicId
- if (topicId.isDefined && !topicId.contains(fileTopicId))
+ if (_topicId.isDefined && !_topicId.contains(fileTopicId))
throw new InconsistentTopicIdException(s"Tried to assign topic ID
$topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID $fileTopicId")
- topicId = Some(fileTopicId)
+ _topicId = Some(fileTopicId)
} else {
- partitionMetadataFile.delete()
+ try partitionMetadataFile.delete()
+ catch {
+ case e: IOException =>
+ error(s"Error while trying to delete partition metadata file
${partitionMetadataFile}", e)
+ }
}
} else if (keepPartitionMetadataFile) {
- topicId.foreach(partitionMetadataFile.write)
+ _topicId.foreach(partitionMetadataFile.write)
}
}
+ def topicId: Option[Uuid] = _topicId
+
def dir: File = _dir
def parentDir: String = _parentDir
@@ -551,7 +557,7 @@ class Log(@volatile private var _dir: File,
/** Only used for ZK clusters when we update and start using topic IDs on
existing topics */
def assignTopicId(topicId: Uuid): Unit = {
partitionMetadataFile.write(topicId)
- this.topicId = Some(topicId)
+ _topicId = Some(topicId)
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 25b1ba6..c08c87d 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -138,7 +138,9 @@ class PartitionMetadataFile(val file: File,
file.exists()
}
- def delete(): Boolean = {
- file.delete()
+ def delete(): Unit = {
+ Files.delete(file.toPath)
}
+
+ override def toString: String = s"PartitionMetadataFile(path=$path)"
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 0cbeec9..8d567ec 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -385,7 +385,7 @@ class PartitionLockTest extends Logging {
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
- topicId = None,
+ _topicId = None,
keepPartitionMetadataFile = true) {
override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int,
origin: AppendOrigin, interBrokerProtocolVersion: ApiVersion): LogAppendInfo = {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7030715..b614e13 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -72,7 +72,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(17L, log.logEndOffset)
val leaderEpoch = 10
- val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch,
isLeader = true, log = log)
+ val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch,
isLeader = true)
def epochEndOffset(epoch: Int, endOffset: Long):
FetchResponseData.EpochEndOffset = {
new FetchResponseData.EpochEndOffset()
@@ -143,7 +143,7 @@ class PartitionTest extends AbstractPartitionTest {
), leaderEpoch = 5)
assertEquals(4, log.logEndOffset)
- val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch,
isLeader = true, log = log)
+ val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch,
isLeader = true)
assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch
= Optional.of[Integer](leaderEpoch),
@@ -171,7 +171,7 @@ class PartitionTest extends AbstractPartitionTest {
), leaderEpoch = 5)
assertEquals(4, log.logEndOffset)
- val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch,
isLeader = true, log = log)
+ val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch,
isLeader = true)
assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
assertEquals(None, log.latestEpoch)
@@ -733,8 +733,7 @@ class PartitionTest extends AbstractPartitionTest {
}
private def setupPartitionWithMocks(leaderEpoch: Int,
- isLeader: Boolean,
- log: Log =
logManager.getOrCreateLog(topicPartition, topicId = None)): Partition = {
+ isLeader: Boolean): Partition = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
offsetCheckpoints, None)
val controllerEpoch = 0
@@ -2046,7 +2045,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
- topicId = None,
+ _topicId = None,
keepPartitionMetadataFile = true) {
override def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 7620348..19de8ea 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -121,7 +121,7 @@ class LogCleanerManagerTest extends Logging {
extends Log(dir, config, segments, offsets.logStartOffset,
offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, new BrokerTopicStats, time,
LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition,
leaderEpochCache,
- producerStateManager, logDirFailureChannel, topicId = None,
keepPartitionMetadataFile = true) {
+ producerStateManager, logDirFailureChannel, _topicId = None,
keepPartitionMetadataFile = true) {
// Throw an error in getFirstBatchTimestampForSegments since it is
called in grabFilthiestLog()
override def getFirstBatchTimestampForSegments(segments:
Iterable[LogSegment]): Iterable[Long] =
throw new IllegalStateException("Error!")
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0890b03..5c91041 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -136,7 +136,7 @@ class LogCleanerTest {
leaderEpochCache = leaderEpochCache,
producerStateManager = producerStateManager,
logDirFailureChannel = logDirFailureChannel,
- topicId = None,
+ _topicId = None,
keepPartitionMetadataFile = true) {
override def replaceSegments(newSegments: Seq[LogSegment], oldSegments:
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
deleteStartLatch.countDown()
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 4826784..9a7b627 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -284,7 +284,7 @@ class LogLoaderTest {
new Log(logDir, logConfig, interceptedLogSegments,
offsets.logStartOffset, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, brokerTopicStats,
mockTime,
LogManager.ProducerIdExpirationCheckIntervalMs, topicPartition,
leaderEpochCache,
- producerStateManager, logDirFailureChannel, topicId = None,
keepPartitionMetadataFile = true)
+ producerStateManager, logDirFailureChannel, _topicId = None,
keepPartitionMetadataFile = true)
}
// Retain snapshots for the last 2 segments
@@ -366,7 +366,7 @@ class LogLoaderTest {
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
logDirFailureChannel = logDirFailureChannel,
- topicId = None,
+ _topicId = None,
keepPartitionMetadataFile = true)
EasyMock.verify(stateManager)
@@ -500,7 +500,7 @@ class LogLoaderTest {
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
logDirFailureChannel = logDirFailureChannel,
- topicId = None,
+ _topicId = None,
keepPartitionMetadataFile = true)
EasyMock.verify(stateManager)
@@ -561,7 +561,7 @@ class LogLoaderTest {
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
logDirFailureChannel = logDirFailureChannel,
- topicId = None,
+ _topicId = None,
keepPartitionMetadataFile = true)
EasyMock.verify(stateManager)
@@ -624,7 +624,7 @@ class LogLoaderTest {
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
logDirFailureChannel = logDirFailureChannel,
- topicId = None,
+ _topicId = None,
keepPartitionMetadataFile = true)
EasyMock.verify(stateManager)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3054116..4cc88d0 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2319,9 +2319,8 @@ class LogTest {
val log = createLog(logDir, logConfig)
// Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
- val id = Uuid.randomUuid()
- log.topicId = Some(id)
- log.partitionMetadataFile.write(id)
+ val topicId = Uuid.randomUuid()
+ log.assignTopicId(topicId)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
@@ -2336,8 +2335,8 @@ class LogTest {
// Check the topic ID remains in memory and was copied correctly.
assertTrue(log.topicId.isDefined)
- assertEquals(id, log.topicId.get)
- assertEquals(id, log.partitionMetadataFile.read().topicId)
+ assertEquals(topicId, log.topicId.get)
+ assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 28f7be8..99928f0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1075,7 +1075,6 @@ class ReplicaManagerTest {
private def initializeLogAndTopicId(replicaManager: ReplicaManager,
topicPartition: TopicPartition, topicId: Uuid): Unit = {
val partition = replicaManager.createPartition(new TopicPartition(topic,
0))
val log = replicaManager.logManager.getOrCreateLog(topicPartition, false,
false, Some(topicId))
- log.topicId = Some(topicId)
partition.log = Some(log)
}
@@ -1517,7 +1516,7 @@ class ReplicaManagerTest {
leaderEpochCache = leaderEpochCache,
producerStateManager = producerStateManager,
logDirFailureChannel = mockLogDirFailureChannel,
- topicId = topicId,
+ _topicId = topicId,
keepPartitionMetadataFile = true) {
override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch]
= {
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index e5d2f21..b085ab0 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -143,7 +143,7 @@ class SchedulerTest {
recoveryPoint = offsets.recoveryPoint, nextOffsetMetadata =
offsets.nextOffsetMetadata, scheduler,
brokerTopicStats, mockTime,
LogManager.ProducerIdExpirationCheckIntervalMs,
topicPartition, leaderEpochCache, producerStateManager,
logDirFailureChannel,
- topicId = None, keepPartitionMetadataFile = true)
+ _topicId = None, keepPartitionMetadataFile = true)
assertTrue(scheduler.taskRunning(log.producerExpireCheck))
log.close()
assertFalse(scheduler.taskRunning(log.producerExpireCheck))