Repository: kafka Updated Branches: refs/heads/trunk b1cc72510 -> d9ab917dc
kafka-1992; checkEnoughReplicasReachOffset doesn't need to get requiredAcks; patched by Gwen Shapira; reviewed by Jeff Holoman, Jiangjie Qin and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9ab917d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9ab917d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9ab917d Branch: refs/heads/trunk Commit: d9ab917dcc972aa3ef3644aeb4c96094bd9e54d8 Parents: b1cc725 Author: Gwen Shapira <csh...@gmail.com> Authored: Tue Apr 7 15:10:47 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Apr 7 15:10:47 2015 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 34 ++++++++++++-------- .../scala/kafka/server/DelayedProduce.scala | 4 +-- 2 files changed, 21 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ab917d/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3fb549c..122b1db 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -292,31 +292,37 @@ class Partition(val topic: String, } } - def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { + /* + * Note that this method will only be called if requiredAcks = -1 + * and we are waiting for all replicas in ISR to be fully caught up to + * the (local) leader's offset corresponding to this produce request + * before we acknowledge the produce request. + */ + def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = { leaderReplicaIfLocal() match { case Some(leaderReplica) => // keep the current immutable replica list reference val curInSyncReplicas = inSyncReplicas val numAcks = curInSyncReplicas.count(r => { if (!r.isLocal) - r.logEndOffset.messageOffset >= requiredOffset + if (r.logEndOffset.messageOffset >= requiredOffset) { + trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset)) + true + } + else + false else true /* also count the local (leader) replica */ }) - val minIsr = leaderReplica.log.get.config.minInSyncReplicas - trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) + trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId)) + + val minIsr = leaderReplica.log.get.config.minInSyncReplicas - if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { + if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* - * requiredAcks < 0 means acknowledge after all replicas in ISR - * are fully caught up to the (local) leader's offset - * corresponding to this produce request. - * - * minIsr means that the topic is configured not to accept messages - * if there are not enough replicas in ISR - * in this scenario the request was already appended locally and - * then added to the purgatory before the ISR was shrunk + * The topic may be configured not to accept messages if there are not enough replicas in ISR + * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk */ if (minIsr <= curInSyncReplicas.size) { (true, ErrorMapping.NoError) @@ -412,7 +418,7 @@ class Partition(val topic: String, // Avoid writing to leader if there are not enough insync replicas to make it safe if (inSyncSize < minIsr && requiredAcks == -1) { throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]" - .format(topic,partitionId,minIsr,inSyncSize)) + .format(topic, partitionId, inSyncSize, minIsr)) } val info = log.append(messages, assignOffsets = true) http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ab917d/core/src/main/scala/kafka/server/DelayedProduce.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 4d763bf..05078b2 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -89,9 +89,7 @@ class DelayedProduce(delayMs: Long, val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => - partition.checkEnoughReplicasReachOffset( - status.requiredOffset, - produceMetadata.produceRequiredAcks) + partition.checkEnoughReplicasReachOffset(status.requiredOffset) case None => // Case A (false, ErrorMapping.UnknownTopicOrPartitionCode)