Repository: kafka Updated Branches: refs/heads/0.10.1 dc5fc239e -> a859cedf0
KAFKA-3590; Handle not-enough-replicas errors when writing to offsets topic Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com> Closes #1859 from hachikuji/KAFKA-3590 (cherry picked from commit 6a13a3dbaddf99850b2583007577fa2a6e1e6d3a) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a859cedf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a859cedf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a859cedf Branch: refs/heads/0.10.1 Commit: a859cedf0b9d3b91dd411623d769961d245cc7af Parents: dc5fc23 Author: Jason Gustafson <ja...@confluent.io> Authored: Fri Sep 23 13:13:29 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Sep 23 13:13:49 2016 -0700 ---------------------------------------------------------------------- .../kafka/coordinator/GroupCoordinator.scala | 24 ++--- .../coordinator/GroupMetadataManager.scala | 92 ++++++++++++-------- .../coordinator/GroupMetadataManagerTest.scala | 71 ++++++++++++--- 3 files changed, 127 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a859cedf/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 726426a..48efe39 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -267,14 +267,14 @@ class GroupCoordinator(val brokerId: Int, val missing = group.allMembers -- groupAssignment.keySet val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap - delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (errorCode: Short) => { + delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (error: Errors) => { group synchronized { // another member may have joined the group while we were awaiting this callback, // so we must ensure we are still in the AwaitingSync state and the same generation // when it gets invoked. if we have transitioned to another state, then do nothing if (group.is(AwaitingSync) && generationId == group.generationId) { - if (errorCode != Errors.NONE.code) { - resetAndPropagateAssignmentError(group, errorCode) + if (error != Errors.NONE) { + resetAndPropagateAssignmentError(group, error) maybePrepareRebalance(group) } else { setAndPropagateAssignment(group, assignment) @@ -549,19 +549,19 @@ class GroupCoordinator(val brokerId: Int, private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) { assert(group.is(AwaitingSync)) group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId)) - propagateAssignment(group, Errors.NONE.code) + propagateAssignment(group, Errors.NONE) } - private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) { + private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) { assert(group.is(AwaitingSync)) group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte]) - propagateAssignment(group, errorCode) + propagateAssignment(group, error) } - private def propagateAssignment(group: GroupMetadata, errorCode: Short) { + private def propagateAssignment(group: GroupMetadata, error: Errors) { for (member <- group.allMemberMetadata) { if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(member.assignment, errorCode) + member.awaitingSyncCallback(member.assignment, error.code) member.awaitingSyncCallback = null // reset the session timeout for members after propagating the member's assignment. @@ -645,7 +645,7 @@ class GroupCoordinator(val brokerId: Int, private def prepareRebalance(group: GroupMetadata) { // if any members are awaiting sync, cancel their request and have them rejoin if (group.is(AwaitingSync)) - resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code) + resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS) group.transitionTo(PreparingRebalance) info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId)) @@ -692,12 +692,12 @@ class GroupCoordinator(val brokerId: Int, if (group.is(Empty)) { info(s"Group ${group.groupId} with generation ${group.generationId} is now empty") - delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode => { - if (errorCode != Errors.NONE.code) { + delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, error => { + if (error != Errors.NONE) { // we failed to write the empty group metadata. If the broker fails before another rebalance, // the previous generation written to the log will become active again (and most likely timeout). // This should be safe since there are no active members in an empty generation, so we just warn. - warn(s"Failed to write empty metadata for group ${group.groupId}: ${Errors.forCode(errorCode).message()}") + warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}") } })) } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/a859cedf/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 1dc2a49..79d4411 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -175,7 +175,7 @@ class GroupMetadataManager(val brokerId: Int, def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], - responseCallback: Short => Unit): DelayedStore = { + responseCallback: Errors => Unit): DelayedStore = { val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId)) val groupMetadataValueVersion = if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0) 0.toShort else GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION @@ -202,36 +202,45 @@ class GroupMetadataManager(val brokerId: Int, // construct the error status in the propagated assignment response // in the cache val status = responseStatus(groupMetadataPartition) + val statusError = Errors.forCode(status.errorCode) - var responseCode = Errors.NONE.code - if (status.errorCode != Errors.NONE.code) { - debug("Metadata from group %s with generation %d failed when appending to log due to %s" - .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName)) + val responseError = if (statusError == Errors.NONE) { + Errors.NONE + } else { + debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " + + s"due to ${statusError.exceptionName}") // transform the log append error code to the corresponding the commit status error code - responseCode = if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) { - Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code - } else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) { - Errors.NOT_COORDINATOR_FOR_GROUP.code - } else if (status.errorCode == Errors.REQUEST_TIMED_OUT.code) { - Errors.REBALANCE_IN_PROGRESS.code - } else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code - || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code - || status.errorCode == Errors.INVALID_FETCH_SIZE.code) { - - error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client" - .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName)) - - Errors.UNKNOWN.code - } else { - error("Appending metadata message for group %s generation %d failed due to unexpected error: %s" - .format(group.groupId, generationId, status.errorCode)) + statusError match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION + | Errors.NOT_ENOUGH_REPLICAS + | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => + Errors.GROUP_COORDINATOR_NOT_AVAILABLE + + case Errors.NOT_LEADER_FOR_PARTITION => + Errors.NOT_COORDINATOR_FOR_GROUP + + case Errors.REQUEST_TIMED_OUT => + Errors.REBALANCE_IN_PROGRESS + + case Errors.MESSAGE_TOO_LARGE + | Errors.RECORD_LIST_TOO_LARGE + | Errors.INVALID_FETCH_SIZE => + + error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " + + s"${statusError.exceptionName}, returning UNKNOWN error code to the client") + + Errors.UNKNOWN - status.errorCode + case other => + error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " + + s"due to unexpected error: ${statusError.exceptionName}") + + other } } - responseCallback(responseCode) + responseCallback(responseError) } DelayedStore(groupMetadataMessageSet, putCacheCallback) @@ -286,10 +295,11 @@ class GroupMetadataManager(val brokerId: Int, // construct the commit response status and insert // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) + val statusError = Errors.forCode(status.errorCode) val responseCode = group synchronized { - if (status.errorCode == Errors.NONE.code) { + if (statusError == Errors.NONE) { if (!group.is(Dead)) { filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata) @@ -303,20 +313,28 @@ class GroupMetadataManager(val brokerId: Int, } } - debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" - .format(filteredOffsetMetadata, group.groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName)) + debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " + + s"with generation $generationId failed when appending to log due to ${statusError.exceptionName}") // transform the log append error code to the corresponding the commit status error code - if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code - else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) - Errors.NOT_COORDINATOR_FOR_GROUP.code - else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code - || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code - || status.errorCode == Errors.INVALID_FETCH_SIZE.code) - Errors.INVALID_COMMIT_OFFSET_SIZE.code - else - status.errorCode + val responseError = statusError match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION + | Errors.NOT_ENOUGH_REPLICAS + | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => + Errors.GROUP_COORDINATOR_NOT_AVAILABLE + + case Errors.NOT_LEADER_FOR_PARTITION => + Errors.NOT_COORDINATOR_FOR_GROUP + + case Errors.MESSAGE_TOO_LARGE + | Errors.RECORD_LIST_TOO_LARGE + | Errors.INVALID_FETCH_SIZE => + Errors.INVALID_COMMIT_OFFSET_SIZE + + case other => other + } + + responseError.code } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a859cedf/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala index b4f9ba3..0a1032f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -101,14 +101,48 @@ class GroupMetadataManagerTest { expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) - var errorCode: Option[Short] = None - def callback(error: Short) { - errorCode = Some(error) + var maybeError: Option[Errors] = None + def callback(error: Errors) { + maybeError = Some(error) } val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback) groupMetadataManager.store(delayedStore) - assertEquals(Errors.NONE.code, errorCode.get) + assertEquals(Some(Errors.NONE), maybeError) + } + + @Test + def testStoreGroupErrorMapping() { + assertStoreGroupErrorMapping(Errors.NONE, Errors.NONE) + assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP) + assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN) + assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN) + assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN) + assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE) + } + + private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) { + EasyMock.reset(replicaManager) + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + expectAppendMessage(appendError) + EasyMock.replay(replicaManager) + + var maybeError: Option[Errors] = None + def callback(error: Errors) { + maybeError = Some(error) + } + + val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback) + groupMetadataManager.store(delayedStore) + assertEquals(Some(expectedError), maybeError) + + EasyMock.verify(replicaManager) } @Test @@ -130,14 +164,14 @@ class GroupMetadataManagerTest { expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) - var errorCode: Option[Short] = None - def callback(error: Short) { - errorCode = Some(error) + var maybeError: Option[Errors] = None + def callback(error: Errors) { + maybeError = Some(error) } val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback) groupMetadataManager.store(delayedStore) - assertEquals(Errors.NONE.code, errorCode.get) + assertEquals(Some(Errors.NONE), maybeError) } @Test @@ -183,6 +217,19 @@ class GroupMetadataManagerTest { @Test def testCommitOffsetFailure() { + assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP) + assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE) + assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE) + assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE) + assertCommitOffsetErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE) + } + + private def assertCommitOffsetErrorMapping(appendError: Errors, expectedError: Errors): Unit = { + EasyMock.reset(replicaManager) + val memberId = "" val generationId = -1 val topicPartition = new TopicPartition("foo", 0) @@ -195,7 +242,7 @@ class GroupMetadataManagerTest { val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) - expectAppendMessage(Errors.NOT_LEADER_FOR_PARTITION) + expectAppendMessage(appendError) EasyMock.replay(replicaManager) var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None @@ -210,11 +257,13 @@ class GroupMetadataManagerTest { assertFalse(commitErrors.isEmpty) val maybeError = commitErrors.get.get(topicPartition) - assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError) + assertEquals(Some(expectedError.code), maybeError) assertFalse(group.hasOffsets) val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition)) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset)) + + EasyMock.verify(replicaManager) } @Test @@ -400,7 +449,7 @@ class GroupMetadataManagerTest { new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP) ) )}) - EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() + EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1)) }