Repository: kafka Updated Branches: refs/heads/1.0.0 5792f2fb3 -> 0222a35db
KAFKA-5767; Kafka server should halt if IBP < 1.0.0 and there is log directory failure Author: Dong Lin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #3718 from lindong28/KAFKA-5767 (cherry picked from commit 20d9adb173a0cde010a2502da7415ecda9bcaa80) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0222a35d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0222a35d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0222a35d Branch: refs/heads/1.0.0 Commit: 0222a35db1ad9399e16b7f3f5453755eceb6c256 Parents: 5792f2f Author: Dong Lin <[email protected]> Authored: Wed Oct 4 17:12:53 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Oct 4 17:13:02 2017 -0700 ---------------------------------------------------------------------- .../kafka/consumer/ConsumerFetcherThread.scala | 3 +- core/src/main/scala/kafka/log/LogManager.scala | 7 +- .../kafka/server/AbstractFetcherManager.scala | 5 +- .../kafka/server/AbstractFetcherThread.scala | 4 +- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../kafka/server/ReplicaFetcherThread.scala | 4 +- .../scala/kafka/server/ReplicaManager.scala | 4 + core/src/main/scala/kafka/utils/Exit.scala | 2 +- .../scala/kafka/utils/ShutdownableThread.scala | 6 +- .../kafka/api/LogDirFailureTest.scala | 187 ---------------- .../integration/UncleanLeaderElectionTest.scala | 14 +- .../unit/kafka/server/LogDirFailureTest.scala | 216 +++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +- 13 files changed, 251 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 4c7c227..d5e084e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -20,7 +20,7 @@ package kafka.consumer import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request} import kafka.cluster.BrokerEndPoint import kafka.message.ByteBufferMessageSet -import kafka.server.{AbstractFetcherThread, PartitionFetchState} +import kafka.server.{AbstractFetcherThread, PartitionFetchState, ResultWithPartitions} import kafka.common.{ErrorMapping, TopicAndPartition} import scala.collection.Map @@ -30,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.EpochEndOffset + @deprecated("This class has been deprecated and will be removed in a future release. " + "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0") class ConsumerFetcherThread(name: String, http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index f4bd8a2..102b1e5 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -131,8 +131,11 @@ class LogManager(logDirs: Array[File], val liveLogDirs = new ConcurrentLinkedQueue[File]() - for (dir <- dirs if !initialOfflineDirs.contains(dir)) { + for (dir <- dirs) { try { + if (initialOfflineDirs.contains(dir)) + throw new IOException(s"Failed to load ${dir.getAbsolutePath} during broker startup") + if (!dir.exists) { info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.") val created = dir.mkdirs() @@ -144,7 +147,7 @@ class LogManager(logDirs: Array[File], liveLogDirs.add(dir) } catch { case e: IOException => - error(s"Failed to create or validate data directory $dir.getAbsolutePath", e) + logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Failed to create or validate data directory ${dir.getAbsolutePath}", e) } } if (liveLogDirs.isEmpty) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/server/AbstractFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 0d7806c..a8316b4 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -29,8 +29,9 @@ import org.apache.kafka.common.utils.Utils abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { - // map of (source broker_id, fetcher_id per source broker) => fetcher - val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread] + // map of (source broker_id, fetcher_id per source broker) => fetcher. + // package private for test + private[server] val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index e772ac3..5df6732 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -82,8 +82,6 @@ abstract class AbstractFetcherThread(name: String, protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[REQ] - case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) - protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)] override def shutdown(){ @@ -420,3 +418,5 @@ case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem, truncating override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog) } + +case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f8af7a2..f2d4b16 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -137,7 +137,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP var zkUtils: ZkUtils = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" - val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap + val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap private var _clusterId: String = null private var _brokerTopicStats: BrokerTopicStats = null http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b90e9e8..d422112 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -240,7 +240,9 @@ class ReplicaFetcherThread(name: String, val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize)) } catch { - case e: KafkaStorageException => + case _: KafkaStorageException => + // The replica has already been marked offline due to log directory failure and the original failure should have already been logged. + // This partition should be removed from ReplicaFetcherThread soon by ReplicaManager.handleLogDirFailure() partitionsWithError += topicPartition } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 98a4be1..f4f3672 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -318,6 +318,10 @@ class ReplicaManager(val config: KafkaConfig, // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS) + + // If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field. + // In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed. + // Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0 val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0 logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure) logDirFailureHandler.start() http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/utils/Exit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index 3e29ddd..5819e97 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -38,7 +38,7 @@ object Exit { JExit.setExitProcedure(functionToProcedure(exitProcedure)) def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = - JExit.setExitProcedure(functionToProcedure(haltProcedure)) + JExit.setHaltProcedure(functionToProcedure(haltProcedure)) def resetExitProcedure(): Unit = JExit.resetExitProcedure() http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/main/scala/kafka/utils/ShutdownableThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index 6ed0968..0922d15 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -27,13 +27,17 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean this.setDaemon(false) this.logIdent = "[" + name + "]: " val isRunning: AtomicBoolean = new AtomicBoolean(true) - val shutdownLatch = new CountDownLatch(1) + private val shutdownLatch = new CountDownLatch(1) def shutdown(): Unit = { initiateShutdown() awaitShutdown() } + def isShutdownComplete: Boolean = { + shutdownLatch.getCount == 0 + } + def initiateShutdown(): Boolean = { if (isRunning.compareAndSet(true, false)) { info("Shutting down") http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala deleted file mode 100644 index b1ac47b..0000000 --- a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.api - -import java.util.Collections -import java.util.concurrent.{ExecutionException, TimeUnit} - -import kafka.controller.{OfflineReplica, PartitionAndReplica} -import kafka.server.KafkaConfig -import kafka.utils.{CoreUtils, TestUtils, ZkUtils} -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException} -import org.junit.{Before, Test} -import org.junit.Assert.assertTrue -import org.junit.Assert.assertEquals - -import scala.collection.JavaConverters._ - -/** - * Test whether clients can producer and consume when there is log directory failure - */ -class LogDirFailureTest extends IntegrationTestHarness { - - import kafka.api.LogDirFailureTest._ - - val producerCount: Int = 1 - val consumerCount: Int = 1 - val serverCount: Int = 2 - private val topic = "topic" - private val partitionNum = 12 - - this.logDirCount = 3 - this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") - this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") - this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "60000") - this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1") - - @Before - override def setUp() { - super.setUp() - TestUtils.createTopic(zkUtils, topic, partitionNum, serverCount, servers = servers) - } - - @Test - def testIOExceptionDuringLogRoll() { - testProduceAfterLogDirFailureOnLeader(Roll) - } - - @Test - def testIOExceptionDuringCheckpoint() { - testProduceAfterLogDirFailureOnLeader(Checkpoint) - } - - @Test - def testReplicaFetcherThreadAfterLogDirFailureOnFollower() { - val producer = producers.head - val partition = new TopicPartition(topic, 0) - - val partitionInfo = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get - val leaderServerId = partitionInfo.leader().id() - val leaderServer = servers.find(_.config.brokerId == leaderServerId).get - val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != leaderServerId).get - val followerServer = servers.find(_.config.brokerId == followerServerId).get - - followerServer.replicaManager.markPartitionOffline(partition) - // Send a message to another partition whose leader is the same as partition 0 - // so that ReplicaFetcherThread on the follower will get response from leader immediately - val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i => - leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined - }.get - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes) - // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on the follower - // has fetched from the leader and attempts to append to the offline replica. - producer.send(record).get - - assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size) - followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread => - assertTrue("ReplicaFetcherThread should still be working if its partition count > 0", thread.shutdownLatch.getCount > 0) - } - } - - def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType) { - val consumer = consumers.head - subscribeAndWaitForAssignment(topic, consumer) - val producer = producers.head - val partition = new TopicPartition(topic, 0) - val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes) - - val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() - val leaderServer = servers.find(_.config.brokerId == leaderServerId).get - - // The first send() should succeed - producer.send(record).get() - TestUtils.waitUntilTrue(() => { - consumer.poll(0).count() == 1 - }, "Expected the first message", 3000L) - - // Make log directory of the partition on the leader broker inaccessible by replacing it with a file - val replica = leaderServer.replicaManager.getReplicaOrException(partition) - val logDir = replica.log.get.dir.getParentFile - CoreUtils.swallow(Utils.delete(logDir)) - logDir.createNewFile() - assertTrue(logDir.isFile) - - if (failureType == Roll) { - try { - leaderServer.replicaManager.getLog(partition).get.roll() - fail("Log rolling should fail with KafkaStorageException") - } catch { - case e: KafkaStorageException => // This is expected - } - } else if (failureType == Checkpoint) { - leaderServer.replicaManager.checkpointHighWatermarks() - } - - // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline - TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log directory offline", 3000L) - assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty) - - // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException - try { - producer.send(record).get(6000, TimeUnit.MILLISECONDS) - fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException") - } catch { - case e: ExecutionException => - e.getCause match { - case t: KafkaStorageException => - case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3 - case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}") - } - case e: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${e.toString}") - } - - // Wait for producer to update metadata for the partition - TestUtils.waitUntilTrue(() => { - // ProduceResponse may contain KafkaStorageException and trigger metadata update - producer.send(record) - producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() != leaderServerId - }, "Expected new leader for the partition", 6000L) - - // Consumer should receive some messages - TestUtils.waitUntilTrue(() => { - consumer.poll(0).count() > 0 - }, "Expected some messages", 3000L) - - // There should be no remaining LogDirEventNotification znode - assertTrue(zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).isEmpty) - - // The controller should have marked the replica on the original leader as offline - val controllerServer = servers.find(_.kafkaController.isActive).get - val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica) - assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId))) - } - - private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { - consumer.subscribe(Collections.singletonList(topic)) - TestUtils.waitUntilTrue(() => { - consumer.poll(0) - !consumer.assignment.isEmpty - }, "Expected non-empty assignment") - } - -} - -object LogDirFailureTest { - sealed trait LogDirFailureType - case object Roll extends LogDirFailureType - case object Checkpoint extends LogDirFailureType -} - http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 0af0a04..24421d0 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -182,14 +182,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(servers, topic, null, "first") + produceMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(servers, topic, null, "second") + produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -199,7 +199,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId)) - produceMessage(servers, topic, null, "third") + produceMessage(servers, topic, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic)) @@ -215,14 +215,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(servers, topic, null, "first") + produceMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(servers, topic, null, "second") + produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -234,7 +234,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // message production and consumption should both fail while leader is down try { - produceMessage(servers, topic, null, "third") + produceMessage(servers, topic, "third") fail("Message produced while leader is down should fail, but it succeeded") } catch { case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected @@ -246,7 +246,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId)) - produceMessage(servers, topic, null, "third") + produceMessage(servers, topic, "third") waitUntilMetadataIsPropagated(servers, topic, partitionId) servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala new file mode 100644 index 0000000..438d736 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.io.File +import java.util.Collections +import java.util.concurrent.{ExecutionException, TimeUnit} + +import kafka.server.LogDirFailureTest._ +import kafka.api.IntegrationTestHarness +import kafka.controller.{OfflineReplica, PartitionAndReplica} +import kafka.utils.{CoreUtils, Exit, TestUtils, ZkUtils} +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException} +import org.junit.{Before, Test} +import org.junit.Assert.{assertTrue, assertFalse, assertEquals} + +import scala.collection.JavaConverters._ + +/** + * Test whether clients can producer and consume when there is log directory failure + */ +class LogDirFailureTest extends IntegrationTestHarness { + + val producerCount: Int = 1 + val consumerCount: Int = 1 + val serverCount: Int = 2 + private val topic = "topic" + private val partitionNum = 12 + + this.logDirCount = 3 + this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") + this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") + this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "60000") + this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1") + + @Before + override def setUp() { + super.setUp() + TestUtils.createTopic(zkUtils, topic, partitionNum, serverCount, servers = servers) + } + + @Test + def testIOExceptionDuringLogRoll() { + testProduceAfterLogDirFailureOnLeader(Roll) + } + + @Test + // Broker should halt on any log directory failure if inter-broker protocol < 1.0 + def brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() { + @volatile var statusCodeOption: Option[Int] = None + Exit.setHaltProcedure { (statusCode, _) => + statusCodeOption = Some(statusCode) + throw new IllegalArgumentException + } + + var server: KafkaServer = null + try { + val props = TestUtils.createBrokerConfig(serverCount, zkConnect, logDirCount = 3) + props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0") + props.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0") + val kafkaConfig = KafkaConfig.fromProps(props) + val logDir = new File(kafkaConfig.logDirs.head) + // Make log directory of the partition on the leader broker inaccessible by replacing it with a file + CoreUtils.swallow(Utils.delete(logDir)) + logDir.createNewFile() + assertTrue(logDir.isFile) + + server = TestUtils.createServer(kafkaConfig) + TestUtils.waitUntilTrue(() => statusCodeOption.contains(1), "timed out waiting for broker to halt") + } finally { + Exit.resetHaltProcedure() + if (server != null) + TestUtils.shutdownServers(List(server)) + } + } + + @Test + def testIOExceptionDuringCheckpoint() { + testProduceAfterLogDirFailureOnLeader(Checkpoint) + } + + @Test + def testReplicaFetcherThreadAfterLogDirFailureOnFollower() { + val producer = producers.head + val partition = new TopicPartition(topic, 0) + + val partitionInfo = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get + val leaderServerId = partitionInfo.leader().id() + val leaderServer = servers.find(_.config.brokerId == leaderServerId).get + val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != leaderServerId).get + val followerServer = servers.find(_.config.brokerId == followerServerId).get + + followerServer.replicaManager.markPartitionOffline(partition) + // Send a message to another partition whose leader is the same as partition 0 + // so that ReplicaFetcherThread on the follower will get response from leader immediately + val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i => + leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined + }.get + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes) + // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on the follower + // has fetched from the leader and attempts to append to the offline replica. + producer.send(record).get + + assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size) + followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread => + assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete) + } + } + + def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType) { + val consumer = consumers.head + subscribeAndWaitForAssignment(topic, consumer) + val producer = producers.head + val partition = new TopicPartition(topic, 0) + val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes) + + val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() + val leaderServer = servers.find(_.config.brokerId == leaderServerId).get + + // The first send() should succeed + producer.send(record).get() + TestUtils.waitUntilTrue(() => { + consumer.poll(0).count() == 1 + }, "Expected the first message", 3000L) + + // Make log directory of the partition on the leader broker inaccessible by replacing it with a file + val replica = leaderServer.replicaManager.getReplicaOrException(partition) + val logDir = replica.log.get.dir.getParentFile + CoreUtils.swallow(Utils.delete(logDir)) + logDir.createNewFile() + assertTrue(logDir.isFile) + + if (failureType == Roll) { + try { + leaderServer.replicaManager.getLog(partition).get.roll() + fail("Log rolling should fail with KafkaStorageException") + } catch { + case e: KafkaStorageException => // This is expected + } + } else if (failureType == Checkpoint) { + leaderServer.replicaManager.checkpointHighWatermarks() + } + + // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline + TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log directory offline", 3000L) + assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty) + + // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException + try { + producer.send(record).get(6000, TimeUnit.MILLISECONDS) + fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException") + } catch { + case e: ExecutionException => + e.getCause match { + case t: KafkaStorageException => + case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3 + case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}") + } + case e: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${e.toString}") + } + + // Wait for producer to update metadata for the partition + TestUtils.waitUntilTrue(() => { + // ProduceResponse may contain KafkaStorageException and trigger metadata update + producer.send(record) + producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() != leaderServerId + }, "Expected new leader for the partition", 6000L) + + // Consumer should receive some messages + TestUtils.waitUntilTrue(() => { + consumer.poll(0).count() > 0 + }, "Expected some messages", 3000L) + + // There should be no remaining LogDirEventNotification znode + assertTrue(zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).isEmpty) + + // The controller should have marked the replica on the original leader as offline + val controllerServer = servers.find(_.kafkaController.isActive).get + val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica) + assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId))) + } + + private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { + consumer.subscribe(Collections.singletonList(topic)) + TestUtils.waitUntilTrue(() => { + consumer.poll(0) + !consumer.assignment.isEmpty + }, "Expected non-empty assignment") + } + +} + +object LogDirFailureTest { + sealed trait LogDirFailureType + case object Roll extends LogDirFailureType + case object Checkpoint extends LogDirFailureType +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/0222a35d/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 687307a..1149db4 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1109,13 +1109,13 @@ object TestUtils extends Logging { values } - def produceMessage(servers: Seq[KafkaServer], topic: String, partition: Integer, message: String) { + def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) { val producer = createNewProducer( TestUtils.getBrokerListStrFromServers(servers), retries = 5, requestTimeoutMs = 2000 ) - producer.send(new ProducerRecord(topic, partition, topic.getBytes, message.getBytes)).get + producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get producer.close() }
