Repository: kafka
Updated Branches:
  refs/heads/0.10.1 dc5fc239e -> a859cedf0


KAFKA-3590; Handle not-enough-replicas errors when writing to offsets topic

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com>

Closes #1859 from hachikuji/KAFKA-3590

(cherry picked from commit 6a13a3dbaddf99850b2583007577fa2a6e1e6d3a)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a859cedf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a859cedf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a859cedf

Branch: refs/heads/0.10.1
Commit: a859cedf0b9d3b91dd411623d769961d245cc7af
Parents: dc5fc23
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Sep 23 13:13:29 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Sep 23 13:13:49 2016 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    | 24 ++---
 .../coordinator/GroupMetadataManager.scala      | 92 ++++++++++++--------
 .../coordinator/GroupMetadataManagerTest.scala  | 71 ++++++++++++---
 3 files changed, 127 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a859cedf/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 726426a..48efe39 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -267,14 +267,14 @@ class GroupCoordinator(val brokerId: Int,
               val missing = group.allMembers -- groupAssignment.keySet
               val assignment = groupAssignment ++ missing.map(_ -> 
Array.empty[Byte]).toMap
 
-              delayedGroupStore = Some(groupManager.prepareStoreGroup(group, 
assignment, (errorCode: Short) => {
+              delayedGroupStore = Some(groupManager.prepareStoreGroup(group, 
assignment, (error: Errors) => {
                 group synchronized {
                   // another member may have joined the group while we were 
awaiting this callback,
                   // so we must ensure we are still in the AwaitingSync state 
and the same generation
                   // when it gets invoked. if we have transitioned to another 
state, then do nothing
                   if (group.is(AwaitingSync) && generationId == 
group.generationId) {
-                    if (errorCode != Errors.NONE.code) {
-                      resetAndPropagateAssignmentError(group, errorCode)
+                    if (error != Errors.NONE) {
+                      resetAndPropagateAssignmentError(group, error)
                       maybePrepareRebalance(group)
                     } else {
                       setAndPropagateAssignment(group, assignment)
@@ -549,19 +549,19 @@ class GroupCoordinator(val brokerId: Int,
   private def setAndPropagateAssignment(group: GroupMetadata, assignment: 
Map[String, Array[Byte]]) {
     assert(group.is(AwaitingSync))
     group.allMemberMetadata.foreach(member => member.assignment = 
assignment(member.memberId))
-    propagateAssignment(group, Errors.NONE.code)
+    propagateAssignment(group, Errors.NONE)
   }
 
-  private def resetAndPropagateAssignmentError(group: GroupMetadata, 
errorCode: Short) {
+  private def resetAndPropagateAssignmentError(group: GroupMetadata, error: 
Errors) {
     assert(group.is(AwaitingSync))
     group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
-    propagateAssignment(group, errorCode)
+    propagateAssignment(group, error)
   }
 
-  private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
+  private def propagateAssignment(group: GroupMetadata, error: Errors) {
     for (member <- group.allMemberMetadata) {
       if (member.awaitingSyncCallback != null) {
-        member.awaitingSyncCallback(member.assignment, errorCode)
+        member.awaitingSyncCallback(member.assignment, error.code)
         member.awaitingSyncCallback = null
 
         // reset the session timeout for members after propagating the 
member's assignment.
@@ -645,7 +645,7 @@ class GroupCoordinator(val brokerId: Int,
   private def prepareRebalance(group: GroupMetadata) {
     // if any members are awaiting sync, cancel their request and have them 
rejoin
     if (group.is(AwaitingSync))
-      resetAndPropagateAssignmentError(group, 
Errors.REBALANCE_IN_PROGRESS.code)
+      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
 
     group.transitionTo(PreparingRebalance)
     info("Preparing to restabilize group %s with old generation 
%s".format(group.groupId, group.generationId))
@@ -692,12 +692,12 @@ class GroupCoordinator(val brokerId: Int,
         if (group.is(Empty)) {
           info(s"Group ${group.groupId} with generation ${group.generationId} 
is now empty")
 
-          delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, 
errorCode => {
-            if (errorCode != Errors.NONE.code) {
+          delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, 
error => {
+            if (error != Errors.NONE) {
               // we failed to write the empty group metadata. If the broker 
fails before another rebalance,
               // the previous generation written to the log will become active 
again (and most likely timeout).
               // This should be safe since there are no active members in an 
empty generation, so we just warn.
-              warn(s"Failed to write empty metadata for group 
${group.groupId}: ${Errors.forCode(errorCode).message()}")
+              warn(s"Failed to write empty metadata for group 
${group.groupId}: ${error.message}")
             }
           }))
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a859cedf/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 1dc2a49..79d4411 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -175,7 +175,7 @@ class GroupMetadataManager(val brokerId: Int,
 
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
-                        responseCallback: Short => Unit): DelayedStore = {
+                        responseCallback: Errors => Unit): DelayedStore = {
     val (magicValue, timestamp) = 
getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
     val groupMetadataValueVersion = if (interBrokerProtocolVersion < 
KAFKA_0_10_1_IV0) 0.toShort else 
GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
 
@@ -202,36 +202,45 @@ class GroupMetadataManager(val brokerId: Int,
       // construct the error status in the propagated assignment response
       // in the cache
       val status = responseStatus(groupMetadataPartition)
+      val statusError = Errors.forCode(status.errorCode)
 
-      var responseCode = Errors.NONE.code
-      if (status.errorCode != Errors.NONE.code) {
-        debug("Metadata from group %s with generation %d failed when appending 
to log due to %s"
-          .format(group.groupId, generationId, 
Errors.forCode(status.errorCode).exceptionName))
+      val responseError = if (statusError == Errors.NONE) {
+        Errors.NONE
+      } else {
+        debug(s"Metadata from group ${group.groupId} with generation 
$generationId failed when appending to log " +
+          s"due to ${statusError.exceptionName}")
 
         // transform the log append error code to the corresponding the commit 
status error code
-        responseCode = if (status.errorCode == 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
-          Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
-        } else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) {
-          Errors.NOT_COORDINATOR_FOR_GROUP.code
-        } else if (status.errorCode == Errors.REQUEST_TIMED_OUT.code) {
-          Errors.REBALANCE_IN_PROGRESS.code
-        } else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
-          || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
-          || status.errorCode == Errors.INVALID_FETCH_SIZE.code) {
-
-          error("Appending metadata message for group %s generation %d failed 
due to %s, returning UNKNOWN error code to the client"
-            .format(group.groupId, generationId, 
Errors.forCode(status.errorCode).exceptionName))
-
-          Errors.UNKNOWN.code
-        } else {
-          error("Appending metadata message for group %s generation %d failed 
due to unexpected error: %s"
-            .format(group.groupId, generationId, status.errorCode))
+        statusError match {
+          case Errors.UNKNOWN_TOPIC_OR_PARTITION
+            | Errors.NOT_ENOUGH_REPLICAS
+            | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+            Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+
+          case Errors.NOT_LEADER_FOR_PARTITION =>
+            Errors.NOT_COORDINATOR_FOR_GROUP
+
+          case Errors.REQUEST_TIMED_OUT =>
+            Errors.REBALANCE_IN_PROGRESS
+
+          case Errors.MESSAGE_TOO_LARGE
+            | Errors.RECORD_LIST_TOO_LARGE
+            | Errors.INVALID_FETCH_SIZE =>
+
+            error(s"Appending metadata message for group ${group.groupId} 
generation $generationId failed due to " +
+              s"${statusError.exceptionName}, returning UNKNOWN error code to 
the client")
+
+            Errors.UNKNOWN
 
-          status.errorCode
+          case other =>
+            error(s"Appending metadata message for group ${group.groupId} 
generation $generationId failed " +
+              s"due to unexpected error: ${statusError.exceptionName}")
+
+            other
         }
       }
 
-      responseCallback(responseCode)
+      responseCallback(responseError)
     }
 
     DelayedStore(groupMetadataMessageSet, putCacheCallback)
@@ -286,10 +295,11 @@ class GroupMetadataManager(val brokerId: Int,
       // construct the commit response status and insert
       // the offset and metadata to cache if the append status has no error
       val status = responseStatus(offsetTopicPartition)
+      val statusError = Errors.forCode(status.errorCode)
 
       val responseCode =
         group synchronized {
-          if (status.errorCode == Errors.NONE.code) {
+          if (statusError == Errors.NONE) {
             if (!group.is(Dead)) {
               filteredOffsetMetadata.foreach { case (topicAndPartition, 
offsetAndMetadata) =>
                 group.completePendingOffsetWrite(topicAndPartition, 
offsetAndMetadata)
@@ -303,20 +313,28 @@ class GroupMetadataManager(val brokerId: Int,
               }
             }
 
-            debug("Offset commit %s from group %s consumer %s with generation 
%d failed when appending to log due to %s"
-              .format(filteredOffsetMetadata, group.groupId, consumerId, 
generationId, Errors.forCode(status.errorCode).exceptionName))
+            debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
+              s"with generation $generationId failed when appending to log due 
to ${statusError.exceptionName}")
 
             // transform the log append error code to the corresponding the 
commit status error code
-            if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-              Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
-            else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
-              Errors.NOT_COORDINATOR_FOR_GROUP.code
-            else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
-              || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
-              || status.errorCode == Errors.INVALID_FETCH_SIZE.code)
-              Errors.INVALID_COMMIT_OFFSET_SIZE.code
-            else
-              status.errorCode
+            val responseError = statusError match {
+              case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                   | Errors.NOT_ENOUGH_REPLICAS
+                   | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+                Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+
+              case Errors.NOT_LEADER_FOR_PARTITION =>
+                Errors.NOT_COORDINATOR_FOR_GROUP
+
+              case Errors.MESSAGE_TOO_LARGE
+                   | Errors.RECORD_LIST_TOO_LARGE
+                   | Errors.INVALID_FETCH_SIZE =>
+                Errors.INVALID_COMMIT_OFFSET_SIZE
+
+              case other => other
+            }
+
+            responseError.code
           }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a859cedf/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index b4f9ba3..0a1032f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -101,14 +101,48 @@ class GroupMetadataManagerTest {
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
-    var errorCode: Option[Short] = None
-    def callback(error: Short) {
-      errorCode = Some(error)
+    var maybeError: Option[Errors] = None
+    def callback(error: Errors) {
+      maybeError = Some(error)
     }
 
     val delayedStore = groupMetadataManager.prepareStoreGroup(group, 
Map.empty, callback)
     groupMetadataManager.store(delayedStore)
-    assertEquals(Errors.NONE.code, errorCode.get)
+    assertEquals(Some(Errors.NONE), maybeError)
+  }
+
+  @Test
+  def testStoreGroupErrorMapping() {
+    assertStoreGroupErrorMapping(Errors.NONE, Errors.NONE)
+    assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, 
Errors.NOT_COORDINATOR_FOR_GROUP)
+    assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN)
+    assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN)
+    assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN)
+    assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, 
Errors.CORRUPT_MESSAGE)
+  }
+
+  private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: 
Errors) {
+    EasyMock.reset(replicaManager)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    expectAppendMessage(appendError)
+    EasyMock.replay(replicaManager)
+
+    var maybeError: Option[Errors] = None
+    def callback(error: Errors) {
+      maybeError = Some(error)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreGroup(group, 
Map.empty, callback)
+    groupMetadataManager.store(delayedStore)
+    assertEquals(Some(expectedError), maybeError)
+
+    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -130,14 +164,14 @@ class GroupMetadataManagerTest {
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
-    var errorCode: Option[Short] = None
-    def callback(error: Short) {
-      errorCode = Some(error)
+    var maybeError: Option[Errors] = None
+    def callback(error: Errors) {
+      maybeError = Some(error)
     }
 
     val delayedStore = groupMetadataManager.prepareStoreGroup(group, 
Map(memberId -> Array[Byte]()), callback)
     groupMetadataManager.store(delayedStore)
-    assertEquals(Errors.NONE.code, errorCode.get)
+    assertEquals(Some(Errors.NONE), maybeError)
   }
 
   @Test
@@ -183,6 +217,19 @@ class GroupMetadataManagerTest {
 
   @Test
   def testCommitOffsetFailure() {
+    assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, 
Errors.NOT_COORDINATOR_FOR_GROUP)
+    assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, 
Errors.INVALID_COMMIT_OFFSET_SIZE)
+    assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, 
Errors.INVALID_COMMIT_OFFSET_SIZE)
+    assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, 
Errors.INVALID_COMMIT_OFFSET_SIZE)
+    assertCommitOffsetErrorMapping(Errors.CORRUPT_MESSAGE, 
Errors.CORRUPT_MESSAGE)
+  }
+
+  private def assertCommitOffsetErrorMapping(appendError: Errors, 
expectedError: Errors): Unit = {
+    EasyMock.reset(replicaManager)
+
     val memberId = ""
     val generationId = -1
     val topicPartition = new TopicPartition("foo", 0)
@@ -195,7 +242,7 @@ class GroupMetadataManagerTest {
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
 
-    expectAppendMessage(Errors.NOT_LEADER_FOR_PARTITION)
+    expectAppendMessage(appendError)
     EasyMock.replay(replicaManager)
 
     var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
@@ -210,11 +257,13 @@ class GroupMetadataManagerTest {
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError)
+    assertEquals(Some(expectedError.code), maybeError)
     assertFalse(group.hasOffsets)
 
     val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
Seq(topicPartition))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition).map(_.offset))
+
+    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -400,7 +449,7 @@ class GroupMetadataManagerTest {
           new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
-    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+    
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1))
   }
 
 

Reply via email to