Repository: kafka Updated Branches: refs/heads/trunk 3fd9be49a -> fed3f1f88
MINOR: Avoid trace logging computation in `checkEnoughReplicasReachOffset` `numAcks` is only used in the `trace` logging statement so it should be a `def` instead of a `val`. Also took the chance to improve the code and documentation a little. Author: Ismael Juma <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1449 from ijuma/minor-avoid-trace-logging-computation-in-partition Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fed3f1f8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fed3f1f8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fed3f1f8 Branch: refs/heads/trunk Commit: fed3f1f8890b219e4247fd9de1305ad18679ff99 Parents: 3fd9be4 Author: Ismael Juma <[email protected]> Authored: Tue May 31 09:03:18 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue May 31 09:03:18 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 42 ++++++++++---------- .../scala/kafka/server/DelayedProduce.scala | 22 ++++------ 2 files changed, 30 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fed3f1f8/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 4e79bdc..ea22e87 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -296,46 +296,48 @@ class Partition(val topic: String, } /* - * Note that this method will only be called if requiredAcks = -1 - * and we are waiting for all replicas in ISR to be fully caught up to - * the (local) leader's offset corresponding to this produce request - * before we acknowledge the produce request. + * Returns a tuple where the first element is a boolean indicating whether enough replicas reached `requiredOffset` + * and the second element is an error (which would be `Errors.NONE` for no error). + * + * Note that this method will only be called if requiredAcks = -1 and we are waiting for all replicas in ISR to be + * fully caught up to the (local) leader's offset corresponding to this produce request before we acknowledge the + * produce request. */ - def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = { + def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = { leaderReplicaIfLocal() match { case Some(leaderReplica) => // keep the current immutable replica list reference val curInSyncReplicas = inSyncReplicas - val numAcks = curInSyncReplicas.count(r => { + + def numAcks = curInSyncReplicas.count { r => if (!r.isLocal) if (r.logEndOffset.messageOffset >= requiredOffset) { - trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset)) + trace(s"Replica ${r.brokerId} of ${topic}-${partitionId} received offset $requiredOffset") true } else false else true /* also count the local (leader) replica */ - }) + } - trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId)) + trace(s"$numAcks acks satisfied for ${topic}-${partitionId} with acks = -1") val minIsr = leaderReplica.log.get.config.minInSyncReplicas - if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) { + if (leaderReplica.highWatermark.messageOffset >= requiredOffset) { /* - * The topic may be configured not to accept messages if there are not enough replicas in ISR - * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk - */ - if (minIsr <= curInSyncReplicas.size) { - (true, Errors.NONE.code) - } else { - (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code) - } + * The topic may be configured not to accept messages if there are not enough replicas in ISR + * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk + */ + if (minIsr <= curInSyncReplicas.size) + (true, Errors.NONE) + else + (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) } else - (false, Errors.NONE.code) + (false, Errors.NONE) case None => - (false, Errors.NOT_LEADER_FOR_PARTITION.code) + (false, Errors.NOT_LEADER_FOR_PARTITION) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fed3f1f8/core/src/main/scala/kafka/server/DelayedProduce.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index be1be4f..5a59d3b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -82,32 +82,26 @@ class DelayedProduce(delayMs: Long, override def tryComplete(): Boolean = { // check for each partition if it still has pending acks produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => - trace("Checking produce satisfaction for %s, current status %s" - .format(topicAndPartition, status)) + trace(s"Checking produce satisfaction for ${topicAndPartition}, current status $status") // skip those partitions that have already been satisfied if (status.acksPending) { - val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) - val (hasEnough, errorCode) = partitionOpt match { + val (hasEnough, error) = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) match { case Some(partition) => partition.checkEnoughReplicasReachOffset(status.requiredOffset) case None => // Case A - (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + (false, Errors.UNKNOWN_TOPIC_OR_PARTITION) } - if (errorCode != Errors.NONE.code) { - // Case B.1 + // Case B.1 || B.2 + if (error != Errors.NONE || hasEnough) { status.acksPending = false - status.responseStatus.errorCode = errorCode - } else if (hasEnough) { - // Case B.2 - status.acksPending = false - status.responseStatus.errorCode = Errors.NONE.code + status.responseStatus.errorCode = error.code } } } - // check if each partition has satisfied at lease one of case A and case B - if (! produceMetadata.produceStatus.values.exists(p => p.acksPending)) + // check if every partition has satisfied at least one of case A or B + if (!produceMetadata.produceStatus.values.exists(_.acksPending)) forceComplete() else false
