Include controllerId in all requests sent by controller; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-793
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/19ae1959 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/19ae1959 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/19ae1959 Branch: refs/heads/trunk Commit: 19ae1959b091df44475243e3b199d6121ddedc72 Parents: 771760c Author: Swapnil Ghike <[email protected]> Authored: Thu Mar 7 18:20:33 2013 -0800 Committer: Jun Rao <[email protected]> Committed: Thu Mar 7 18:20:33 2013 -0800 ---------------------------------------------------------------------- .../PreferredReplicaLeaderElectionCommand.scala | 8 +++--- .../main/scala/kafka/api/StopReplicaRequest.scala | 16 +++++++++---- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- .../controller/ControllerChannelManager.scala | 18 +++++--------- .../scala/kafka/controller/KafkaController.scala | 1 + .../kafka/controller/PartitionStateMachine.scala | 2 +- .../kafka/controller/ReplicaStateMachine.scala | 2 +- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- .../api/RequestResponseSerializationTest.scala | 3 +- 9 files changed, 29 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index e59d5af..7405c5a 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -35,7 +35,6 @@ object PreferredReplicaLeaderElectionCommand extends Logging { .withRequiredArg .describedAs("list of partitions for which preferred replica leader election needs to be triggered") .ofType(classOf[String]) - .defaultsTo("") val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + "form host:port. Multiple URLS can be given to allow fail-over.") .withRequiredArg @@ -46,15 +45,16 @@ object PreferredReplicaLeaderElectionCommand extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) - val jsonFile = options.valueOf(jsonFileOpt) val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = if(jsonFile != "") Utils.readFileAsString(jsonFile) else "" var zkClient: ZkClient = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) val partitionsForPreferredReplicaElection = - if(jsonFile == "") ZkUtils.getAllPartitions(zkClient) else parsePreferredReplicaJsonData(jsonString) + if (!options.has(jsonFileOpt)) + ZkUtils.getAllPartitions(zkClient) + else + parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt))) val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection) preferredReplicaElectionCommand.moveLeaderToPreferredReplica() http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/api/StopReplicaRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 5107488..cd55db4 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -36,6 +36,7 @@ object StopReplicaRequest extends Logging { val correlationId = buffer.getInt val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt + val controllerId = buffer.getInt val controllerEpoch = buffer.getInt val deletePartitions = buffer.get match { case 1 => true @@ -48,7 +49,8 @@ object StopReplicaRequest extends Logging { (1 to topicPartitionPairCount) foreach { _ => topicPartitionPairSet.add(readShortString(buffer), buffer.getInt) } - StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch) + StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch, + deletePartitions, topicPartitionPairSet.toSet) } } @@ -56,14 +58,15 @@ case class StopReplicaRequest(versionId: Short, override val correlationId: Int, clientId: String, ackTimeoutMs: Int, + controllerId: Int, + controllerEpoch: Int, deletePartitions: Boolean, - partitions: Set[(String, Int)], - controllerEpoch: Int) + partitions: Set[(String, Int)]) extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) { - def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = { + def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerId: Int, controllerEpoch: Int, correlationId: Int) = { this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, - deletePartitions, partitions, controllerEpoch) + controllerId, controllerEpoch, deletePartitions, partitions) } def writeTo(buffer: ByteBuffer) { @@ -71,6 +74,7 @@ case class StopReplicaRequest(versionId: Short, buffer.putInt(correlationId) writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) + buffer.putInt(controllerId) buffer.putInt(controllerEpoch) buffer.put(if (deletePartitions) 1.toByte else 0.toByte) buffer.putInt(partitions.size) @@ -86,6 +90,7 @@ case class StopReplicaRequest(versionId: Short, 4 + /* correlation id */ ApiUtils.shortStringLength(clientId) + 4 + /* ackTimeoutMs */ + 4 + /* controller id*/ 4 + /* controller epoch */ 1 + /* deletePartitions */ 4 /* partition count */ @@ -104,6 +109,7 @@ case class StopReplicaRequest(versionId: Short, stopReplicaRequest.append("; ClientId: " + clientId) stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") stopReplicaRequest.append("; DeletePartitions: " + deletePartitions) + stopReplicaRequest.append("; ControllerId: " + controllerId) stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch) stopReplicaRequest.append("; Partitions: " + partitions.mkString(",")) stopReplicaRequest.toString() http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 39266b5..824e394 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -53,7 +53,7 @@ class Partition(val topic: String, * each partition. */ private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId) - private val stateChangeLogger = Logger.getLogger("state.change.logger") + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 2e50b8d..6e563d2 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -26,17 +26,12 @@ import collection.mutable import kafka.api._ import org.apache.log4j.Logger -class ControllerChannelManager private (config: KafkaConfig) extends Logging { - private var controllerContext: ControllerContext = null +class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " - def this(controllerContext: ControllerContext, config : KafkaConfig) { - this(config) - this.controllerContext = controllerContext - controllerContext.liveBrokers.foreach(addNewBroker(_)) - } + controllerContext.liveBrokers.foreach(addNewBroker(_)) def startup() = { brokerLock synchronized { @@ -114,7 +109,7 @@ class RequestSendThread(val controllerId: Int, val channel: BlockingChannel) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) { private val lock = new Object() - private val stateChangeLogger = Logger.getLogger("state.change.logger") + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) override def doWork(): Unit = { val queueItem = queue.take() @@ -154,7 +149,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] - private val stateChangeLogger = Logger.getLogger("state.change.logger") + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) def newBatch() { // raise error if the previous batch is not empty @@ -212,8 +207,9 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques if (replicas.size > 0) { debug("The stop replica request (delete = %s) sent to broker %d is %s" .format(deletePartitions, broker, replicas.mkString(","))) - sendRequest(broker, new StopReplicaRequest(deletePartitions, - Set.empty[(String, Int)] ++ replicas, controllerEpoch, correlationId), null) + val stopReplicaRequest = new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas, controllerId, + controllerEpoch, correlationId) + sendRequest(broker, stopReplicaRequest, null) } } m.clear() http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e18ab07..25a8cfe 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -74,6 +74,7 @@ trait KafkaControllerMBean { object KafkaController { val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps" + val stateChangeLogger = "state.change.logger" val InitialControllerEpoch = 1 val InitialControllerEpochZkVersion = 1 } http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index ce4b9e8..b25e9f4 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -47,7 +47,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val isShuttingDown = new AtomicBoolean(false) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " - private val stateChangeLogger = Logger.getLogger("state.change.logger") + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) /** * Invoked on successful controller election. First registers a topic change listener since that triggers all http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 43d60cf..88058ec 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -45,7 +45,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId) private val isShuttingDown = new AtomicBoolean(false) this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " - private val stateChangeLogger = Logger.getLogger("state.change.logger") + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) /** * Invoked on successful controller election. First registers a broker change listener since that triggers all http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c10cbde..e73e529 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -52,7 +52,7 @@ class ReplicaManager(val config: KafkaConfig, val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " - private val stateChangeLogger = Logger.getLogger("state.change.logger") + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) newGauge( "LeaderCount", http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 4c209f1..02ff81f 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -99,7 +99,8 @@ object SerializationTestUtils{ } def createTestStopReplicaRequest() : StopReplicaRequest = { - new StopReplicaRequest(controllerEpoch = 1, correlationId = 0, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0))) + new StopReplicaRequest(controllerId = 0, controllerEpoch = 1, correlationId = 0, deletePartitions = true, + partitions = collection.immutable.Set((topic1, 0), (topic2, 0))) } def createTestStopReplicaResponse() : StopReplicaResponse = {
