Repository: kafka Updated Branches: refs/heads/trunk 0eedd6d80 -> a420d20c0
KAFKA-5099; Replica Deletion Regression from KIP-101 Replica deletion regressed from KIP-101. Replica deletion happens when a broker receives a StopReplicaRequest with delete=true. Ever since KAFKA-1911, replica deletion has been async, meaning the broker responds with a StopReplicaResponse simply after marking the replica directory as staged for deletion. This marking happens by moving a data log directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, acting as a soft-delete. A scheduled thread later actually deletes the data. It appears that the regression occurs while the scheduled thread is actually trying to delete the data, which means the controller considers operations such as partition reassignment and topic deletion complete. But if you look at the log4j logs and data logs, you'll find that the soft-deleted data logs actually won't get deleted. The bug is that upon log deletion, we attempt to flush the LeaderEpochFileCache to the original file location instead of the moved file location. Restarting the broker actually allows for the soft-deleted directories to get deleted. This patch avoids the issue by simply not flushing the LeaderEpochFileCache upon log deletion. Author: Onur Karaman <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #2986 from onurkaraman/KAFKA-5099 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a420d20c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a420d20c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a420d20c Branch: refs/heads/trunk Commit: a420d20c0e3da2c9637820f7d1a344706fd835fa Parents: 0eedd6d Author: Onur Karaman <[email protected]> Authored: Wed May 10 13:45:32 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Wed May 10 13:45:32 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 10 +++---- .../server/epoch/LeaderEpochFileCache.scala | 17 +++++++---- .../unit/kafka/admin/DeleteTopicTest.scala | 1 - .../src/test/scala/unit/kafka/log/LogTest.scala | 2 +- .../server/epoch/LeaderEpochFileCacheTest.scala | 30 ++++++++++---------- .../test/scala/unit/kafka/utils/TestUtils.scala | 19 +++++++++++++ 6 files changed, 52 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5722a43..d3ea251 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -162,12 +162,12 @@ class Log(@volatile var dir: File, nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset, activeSegment.size.toInt) - leaderEpochCache.clearLatest(nextOffsetMetadata.messageOffset) + leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset) logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. - leaderEpochCache.clearEarliest(logStartOffset) + leaderEpochCache.clearAndFlushEarliest(logStartOffset) loadProducerState(logEndOffset) @@ -997,7 +997,7 @@ class Log(@volatile var dir: File, // remove the segments for lookups deletable.foreach(deleteSegment) logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) - leaderEpochCache.clearEarliest(logStartOffset) + leaderEpochCache.clearAndFlushEarliest(logStartOffset) producerStateManager.evictUnretainedProducers(logStartOffset) updateFirstUnstableOffset() } @@ -1282,7 +1282,7 @@ class Log(@volatile var dir: File, updateLogEndOffset(targetOffset) this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) this.logStartOffset = math.min(targetOffset, this.logStartOffset) - leaderEpochCache.clearLatest(targetOffset) + leaderEpochCache.clearAndFlushLatest(targetOffset) loadProducerState(targetOffset) } } @@ -1308,7 +1308,7 @@ class Log(@volatile var dir: File, initFileSize = initFileSize, preallocate = config.preallocate)) updateLogEndOffset(newOffset) - leaderEpochCache.clear() + leaderEpochCache.clearAndFlush() producerStateManager.truncate() producerStateManager.updateMapEndOffset(newOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 56b1e55..2b1ecc7 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -30,8 +30,9 @@ trait LeaderEpochCache { def assign(leaderEpoch: Int, offset: Long) def latestEpoch(): Int def endOffsetFor(epoch: Int): Long - def clearLatest(offset: Long) - def clearEarliest(offset: Long) + def clearAndFlushLatest(offset: Long) + def clearAndFlushEarliest(offset: Long) + def clearAndFlush() def clear() } @@ -111,7 +112,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * * @param offset */ - override def clearLatest(offset: Long): Unit = { + override def clearAndFlushLatest(offset: Long): Unit = { inWriteLock(lock) { val before = epochs if (offset >= 0 && offset <= latestOffset()) { @@ -130,7 +131,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * * @param offset the offset to clear up to */ - override def clearEarliest(offset: Long): Unit = { + override def clearAndFlushEarliest(offset: Long): Unit = { inWriteLock(lock) { val before = epochs if (offset >= 0 && earliestOffset() < offset) { @@ -150,13 +151,19 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM /** * Delete all entries. */ - override def clear() = { + override def clearAndFlush() = { inWriteLock(lock) { epochs.clear() flush() } } + override def clear() = { + inWriteLock(lock) { + epochs.clear() + } + } + def epochEntries(): ListBuffer[EpochEntry] = { epochs } http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index caac222..2085d2d 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -234,7 +234,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness { brokerConfigs.head.setProperty("log.cleaner.enable","true") brokerConfigs.head.setProperty("log.cleanup.policy","compact") brokerConfigs.head.setProperty("log.segment.bytes","100") - brokerConfigs.head.setProperty("log.segment.delete.delay.ms","1000") brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577") val servers = createTestTopicAndCluster(topic,brokerConfigs) http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index b11c94b..b4fe9fb 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -2035,7 +2035,7 @@ class LogTest { assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) // deliberately remove some of the epoch entries - leaderEpochCache.clearLatest(2) + leaderEpochCache.clearAndFlushLatest(2) assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) log.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index afd1f35..8460fe4 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -401,7 +401,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When clear latest on epoch boundary - cache.clearLatest(offset = 8) + cache.clearAndFlushLatest(offset = 8) //Then should remove two latest epochs (remove is inclusive) assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries) @@ -418,7 +418,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset ON epoch boundary - cache.clearEarliest(offset = 8) + cache.clearAndFlushEarliest(offset = 8) //Then should preserve (3, 8) assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -435,7 +435,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset BETWEEN epoch boundaries - cache.clearEarliest(offset = 9) + cache.clearAndFlushEarliest(offset = 9) //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries) @@ -452,7 +452,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset before first epoch offset - cache.clearEarliest(offset = 1) + cache.clearAndFlushEarliest(offset = 1) //Then nothing should change assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -469,7 +469,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset on earliest epoch boundary - cache.clearEarliest(offset = 6) + cache.clearAndFlushEarliest(offset = 6) //Then nothing should change assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -486,7 +486,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When - cache.clearEarliest(offset = 11) + cache.clearAndFlushEarliest(offset = 11) //Then retain the last assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries) @@ -503,7 +503,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When we clear from a postition between offset 8 & offset 11 - cache.clearEarliest(offset = 9) + cache.clearAndFlushEarliest(offset = 9) //Then we should update the middle epoch entry's offset assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries) @@ -520,7 +520,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 2, offset = 10) //When we clear from a postition between offset 0 & offset 7 - cache.clearEarliest(offset = 5) + cache.clearAndFlushEarliest(offset = 5) //Then we should keeep epoch 0 but update the offset appropriately assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries) @@ -537,7 +537,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset beyond last epoch - cache.clearEarliest(offset = 15) + cache.clearAndFlushEarliest(offset = 15) //Then update the last assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries) @@ -554,7 +554,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset BETWEEN epoch boundaries - cache.clearLatest(offset = 9) + cache.clearAndFlushLatest(offset = 9) //Then should keep the preceding epochs assertEquals(3, cache.latestEpoch()) @@ -572,7 +572,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When - cache.clear() + cache.clearAndFlush() //Then assertEquals(0, cache.epochEntries.size) @@ -589,7 +589,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset on epoch boundary - cache.clearLatest(offset = UNDEFINED_EPOCH_OFFSET) + cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -606,7 +606,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 11) //When reset to offset on epoch boundary - cache.clearEarliest(offset = UNDEFINED_EPOCH_OFFSET) + cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -645,7 +645,7 @@ class LeaderEpochFileCacheTest { val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) //Then - cache.clearEarliest(7) + cache.clearAndFlushEarliest(7) } @Test @@ -657,7 +657,7 @@ class LeaderEpochFileCacheTest { val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) //Then - cache.clearLatest(7) + cache.clearAndFlushLatest(7) } @Before http://git-wip-us.apache.org/repos/asf/kafka/blob/a420d20c/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5d9e7c1..05d9686 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -217,6 +217,7 @@ object TestUtils extends Logging { props.put(KafkaConfig.ControllerSocketTimeoutMsProp, "1500") props.put(KafkaConfig.ControlledShutdownEnableProp, enableControlledShutdown.toString) props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString) + props.put(KafkaConfig.LogDeleteDelayMsProp, "1000") props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100") props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152") props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) @@ -1105,6 +1106,24 @@ object TestUtils extends Logging { } checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) }), "Cleaner offset for deleted partition should have been removed") + import scala.collection.JavaConverters._ + TestUtils.waitUntilTrue(() => servers.forall(server => + server.config.logDirs.forall { logDir => + topicPartitions.forall { tp => + !new File(logDir, tp.topic + "-" + tp.partition).exists() + } + } + ), "Failed to soft-delete the data to a delete directory") + TestUtils.waitUntilTrue(() => servers.forall(server => + server.config.logDirs.forall { logDir => + topicPartitions.forall { tp => + !java.util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryName => + partitionDirectoryName.startsWith(tp.topic + "-" + tp.partition) && + partitionDirectoryName.endsWith(Log.DeleteDirSuffix) + } + } + } + ), "Failed to hard-delete the delete directory") } /**
