Repository: kafka
Updated Branches:
  refs/heads/trunk b1cc72510 -> d9ab917dc


kafka-1992; checkEnoughReplicasReachOffset doesn't need to get requiredAcks; 
patched by Gwen Shapira; reviewed by Jeff Holoman, Jiangjie Qin and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9ab917d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9ab917d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9ab917d

Branch: refs/heads/trunk
Commit: d9ab917dcc972aa3ef3644aeb4c96094bd9e54d8
Parents: b1cc725
Author: Gwen Shapira <csh...@gmail.com>
Authored: Tue Apr 7 15:10:47 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Tue Apr 7 15:10:47 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 34 ++++++++++++--------
 .../scala/kafka/server/DelayedProduce.scala     |  4 +--
 2 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ab917d/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 3fb549c..122b1db 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -292,31 +292,37 @@ class Partition(val topic: String,
     }
   }
 
-  def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): 
(Boolean, Short) = {
+  /*
+   * Note that this method will only be called if requiredAcks = -1
+   * and we are waiting for all replicas in ISR to be fully caught up to
+   * the (local) leader's offset corresponding to this produce request
+   * before we acknowledge the produce request.
+   */
+  def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = 
{
     leaderReplicaIfLocal() match {
       case Some(leaderReplica) =>
         // keep the current immutable replica list reference
         val curInSyncReplicas = inSyncReplicas
         val numAcks = curInSyncReplicas.count(r => {
           if (!r.isLocal)
-            r.logEndOffset.messageOffset >= requiredOffset
+            if (r.logEndOffset.messageOffset >= requiredOffset) {
+              trace("Replica %d of %s-%d received offset 
%d".format(r.brokerId, topic, partitionId, requiredOffset))
+              true
+            }
+            else
+              false
           else
             true /* also count the local (leader) replica */
         })
-        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
 
-        trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, 
topic, partitionId))
+        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
topic, partitionId))
+
+        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
 
-        if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= 
requiredOffset ) {
+        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
           /*
-          * requiredAcks < 0 means acknowledge after all replicas in ISR
-          * are fully caught up to the (local) leader's offset
-          * corresponding to this produce request.
-          *
-          * minIsr means that the topic is configured not to accept messages
-          * if there are not enough replicas in ISR
-          * in this scenario the request was already appended locally and
-          * then added to the purgatory before the ISR was shrunk
+          * The topic may be configured not to accept messages if there are 
not enough replicas in ISR
+          * in this scenario the request was already appended locally and then 
added to the purgatory before the ISR was shrunk
           */
           if (minIsr <= curInSyncReplicas.size) {
             (true, ErrorMapping.NoError)
@@ -412,7 +418,7 @@ class Partition(val topic: String,
           // Avoid writing to leader if there are not enough insync replicas 
to make it safe
           if (inSyncSize < minIsr && requiredAcks == -1) {
             throw new NotEnoughReplicasException("Number of insync replicas 
for partition [%s,%d] is [%d], below required minimum [%d]"
-              .format(topic,partitionId,minIsr,inSyncSize))
+              .format(topic, partitionId, inSyncSize, minIsr))
           }
 
           val info = log.append(messages, assignOffsets = true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ab917d/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 4d763bf..05078b2 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -89,9 +89,7 @@ class DelayedProduce(delayMs: Long,
         val partitionOpt = 
replicaManager.getPartition(topicAndPartition.topic, 
topicAndPartition.partition)
         val (hasEnough, errorCode) = partitionOpt match {
           case Some(partition) =>
-            partition.checkEnoughReplicasReachOffset(
-              status.requiredOffset,
-              produceMetadata.produceRequiredAcks)
+            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
           case None =>
             // Case A
             (false, ErrorMapping.UnknownTopicOrPartitionCode)

Reply via email to