Repository: kafka Updated Branches: refs/heads/trunk dd8cdb79d -> 2cc8f48ae
KAFKA-5308; TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response Return UNSUPPORTED_MESSAGE_FORMAT in handleWriteTxnMarkers when a topic is not the correct message format. Remove any TopicPartitions that have same error from those waiting for markers Author: Damian Guy <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Guozhang Wang <[email protected]> Closes #3152 from dguy/kafka-5308 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2cc8f48a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2cc8f48a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2cc8f48a Branch: refs/heads/trunk Commit: 2cc8f48ae5358087381fa5f79233789499d6b939 Parents: dd8cdb7 Author: Damian Guy <[email protected]> Authored: Tue May 30 23:38:50 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 30 23:38:50 2017 -0700 ---------------------------------------------------------------------- .../common/requests/ByteBufferChannel.java | 71 ++++++++++ .../common/requests/RequestResponseTest.java | 52 +------ ...nsactionMarkerRequestCompletionHandler.scala | 9 ++ .../src/main/scala/kafka/server/KafkaApis.scala | 59 +++++--- ...tionMarkerRequestCompletionHandlerTest.scala | 6 + .../scala/unit/kafka/server/KafkaApisTest.scala | 142 ++++++++++++++++++- 6 files changed, 266 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java b/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java new file mode 100644 index 0000000..0446df9 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; + +public class ByteBufferChannel implements GatheringByteChannel { + private final ByteBuffer buf; + private boolean closed = false; + + public ByteBufferChannel(long size) { + if (size > Integer.MAX_VALUE) + throw new IllegalArgumentException("size should be not be greater than Integer.MAX_VALUE"); + this.buf = ByteBuffer.allocate((int) size); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + int position = buf.position(); + for (int i = 0; i < length; i++) { + ByteBuffer src = srcs[i].duplicate(); + if (i == 0) + src.position(offset); + buf.put(src); + } + return buf.position() - position; + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException { + return write(srcs, 0, srcs.length); + } + + @Override + public int write(ByteBuffer src) throws IOException { + int position = buf.position(); + buf.put(src); + return buf.position() - position; + } + + @Override + public boolean isOpen() { + return !closed; + } + + @Override + public void close() throws IOException { + buf.flip(); + closed = true; + } + + public ByteBuffer buffer() { + return buf; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 2e9a688..e0f48bf 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -53,11 +53,9 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; import org.apache.kafka.common.utils.Utils; import org.junit.Test; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.nio.channels.GatheringByteChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -479,14 +477,14 @@ public class RequestResponseTest { send.writeTo(channel); channel.close(); - ByteBuffer buf = channel.buf; + ByteBuffer buf = channel.buffer(); // read the size int size = buf.getInt(); assertTrue(size > 0); // read the header - ResponseHeader responseHeader = ResponseHeader.parse(channel.buf); + ResponseHeader responseHeader = ResponseHeader.parse(channel.buffer()); assertEquals(header.correlationId(), responseHeader.correlationId()); // read the body @@ -1048,52 +1046,6 @@ public class RequestResponseTest { return new DeleteAclsResponse(0, responses); } - private static class ByteBufferChannel implements GatheringByteChannel { - private final ByteBuffer buf; - private boolean closed = false; - - private ByteBufferChannel(long size) { - if (size > Integer.MAX_VALUE) - throw new IllegalArgumentException("size should be not be greater than Integer.MAX_VALUE"); - this.buf = ByteBuffer.allocate((int) size); - } - - @Override - public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { - int position = buf.position(); - for (int i = 0; i < length; i++) { - ByteBuffer src = srcs[i].duplicate(); - if (i == 0) - src.position(offset); - buf.put(src); - } - return buf.position() - position; - } - - @Override - public long write(ByteBuffer[] srcs) throws IOException { - return write(srcs, 0, srcs.length); - } - - @Override - public int write(ByteBuffer src) throws IOException { - int position = buf.position(); - buf.put(src); - return buf.position() - position; - } - - @Override - public boolean isOpen() { - return !closed; - } - - @Override - public void close() throws IOException { - buf.flip(); - closed = true; - } - } - private DescribeConfigsRequest createDescribeConfigsRequest() { return new DescribeConfigsRequest.Builder(asList( new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 7c1c356..5fa6035 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -155,6 +155,15 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, txnMarkerChannelManager.removeMarkersForTxnId(transactionalId) abortSending = true + case Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT | + Errors.UNSUPPORTED_VERSION => + // The producer would have failed to send data to the failed topic so we can safely remove the partition + // from the set waiting for markers + info(s"Sending $transactionalId's transaction marker from partition $topicPartition has failed with " + + s" ${error.name}. This partition will be removed from the set of partitions" + + s" waiting for completion") + txnMetadata.removePartition(topicPartition) + case other => throw new IllegalStateException(s"Unexpected error ${other.exceptionName} while sending txn marker for $transactionalId") } http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 459cb27..d7f6773 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1492,8 +1492,11 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus") - errors.put(producerId, responseStatus.mapValues(_.error).asJava) - + val partitionErrors = responseStatus.mapValues(_.error).asJava + val previous = errors.putIfAbsent(producerId, partitionErrors) + if (previous != null) + previous.putAll(partitionErrors) + val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) => topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE }.keys @@ -1519,25 +1522,49 @@ class KafkaApis(val requestChannel: RequestChannel, // be nice to have only one append to the log. This requires pushing the building of the control records // into Log so that we only append those having a valid producer epoch, and exposing a new appendControlRecord // API in ReplicaManager. For now, we've done the simpler approach + var skippedMarkers = 0 for (marker <- markers.asScala) { val producerId = marker.producerId - val controlRecords = marker.partitions.asScala.map { partition => - val controlRecordType = marker.transactionResult match { - case TransactionResult.COMMIT => ControlRecordType.COMMIT - case TransactionResult.ABORT => ControlRecordType.ABORT + val (goodPartitions, partitionsWithIncorrectMessageFormat) = marker.partitions.asScala.partition { partition => + replicaManager.getMagic(partition) match { + case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true + case _ => false } - val endTxnMarker = new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch) - partition -> MemoryRecords.withEndTransactionMarker(producerId, marker.producerEpoch, endTxnMarker) - }.toMap + } - replicaManager.appendRecords( - timeout = config.requestTimeoutMs.toLong, - requiredAcks = -1, - internalTopicsAllowed = true, - isFromClient = false, - entriesPerPartition = controlRecords, - responseCallback = sendResponseCallback(producerId, marker.transactionResult)) + if (partitionsWithIncorrectMessageFormat.nonEmpty) { + val partitionErrors = new util.HashMap[TopicPartition, Errors]() + partitionsWithIncorrectMessageFormat.foreach { partition => partitionErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) } + errors.put(producerId, partitionErrors) + } + + if (goodPartitions.isEmpty) { + numAppends.decrementAndGet() + skippedMarkers += 1 + } else { + val controlRecords = goodPartitions.map { partition => + val controlRecordType = marker.transactionResult match { + case TransactionResult.COMMIT => ControlRecordType.COMMIT + case TransactionResult.ABORT => ControlRecordType.ABORT + } + val endTxnMarker = new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch) + partition -> MemoryRecords.withEndTransactionMarker(producerId, marker.producerEpoch, endTxnMarker) + }.toMap + + replicaManager.appendRecords( + timeout = config.requestTimeoutMs.toLong, + requiredAcks = -1, + internalTopicsAllowed = true, + isFromClient = false, + entriesPerPartition = controlRecords, + responseCallback = sendResponseCallback(producerId, marker.transactionResult)) + } } + + // No log appends were written as all partitions had incorrect log format + // so we need to send the error response + if (skippedMarkers == markers.size()) + sendResponseExemptThrottle(request, () => sendResponse(request, new WriteTxnMarkersResponse(errors))) } def ensureInterBrokerVersion(version: ApiVersion): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala index a255830..e3e67b7 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala @@ -187,6 +187,12 @@ class TransactionMarkerRequestCompletionHandlerTest { verifyRetriesPartitionOnError(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) } + @Test + def shouldRemoveTopicPartitionFromWaitingSetOnUnsupportedForMessageFormat(): Unit = { + mockCache() + verifyCompleteDelayedOperationOnError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) + } + private def verifyRetriesPartitionOnError(error: Errors) = { mockCache() http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index d22a5a0..fb17543 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -18,11 +18,15 @@ package unit.kafka.server +import java.net.InetAddress +import java.util + import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_0_11_0_IV0} import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.network.RequestChannel +import kafka.network.RequestChannel.Session import kafka.security.auth.Authorizer import kafka.server.QuotaFactory.QuotaManagers import kafka.server._ @@ -30,14 +34,19 @@ import kafka.utils.{MockTime, TestUtils, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.requests.{AbstractRequestResponse, AddPartitionsToTxnRequest, RequestHeader} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry +import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Utils -import org.easymock.EasyMock -import org.junit.{Before, Test} +import org.easymock.{Capture, EasyMock, IAnswer} +import org.junit.{Assert, Before, Test} -import scala.collection.JavaConverters +import scala.collection.JavaConverters._ +import scala.collection.Map class KafkaApisTest { @@ -53,7 +62,10 @@ class KafkaApisTest { private val metrics = new Metrics() private val brokerId = 1 private val authorizer: Option[Authorizer] = None - private val quotas = EasyMock.createNiceMock(classOf[QuotaManagers]) + private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager]) + private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager]) + private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager]) + private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager, replicaQuotaManager, replicaQuotaManager) private val brokerTopicStats = new BrokerTopicStats private val clusterId = "clusterId" private val time = new MockTime @@ -107,5 +119,121 @@ class KafkaApisTest { def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = { createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null) } - + + @Test + def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = { + val topicPartition = new TopicPartition("t", 0) + val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition)) + val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + + EasyMock.expect(replicaManager.getMagic(topicPartition)) + .andReturn(Some(RecordBatch.MAGIC_VALUE_V1)) + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel) + + createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request) + + val send = capturedResponse.getValue.responseSend.get + val channel = new ByteBufferChannel(send.size()) + send.writeTo(channel) + channel.close() + + // read the size + channel.buffer.getInt() + + val responseHeader = ResponseHeader.parse(channel.buffer) + val struct = ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer) + + val markersResponse = new WriteTxnMarkersResponse(struct) + Assert.assertEquals(expectedErrors, markersResponse.errors(1)) + } + + @Test + def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = { + val tp1 = new TopicPartition("t", 0) + val tp2 = new TopicPartition("t1", 0) + val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(tp1, tp2)) + val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava + + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + + EasyMock.expect(replicaManager.getMagic(tp1)) + .andReturn(Some(RecordBatch.MAGIC_VALUE_V1)) + EasyMock.expect(replicaManager.getMagic(tp2)) + .andReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + + + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), + EasyMock.anyShort(), + EasyMock.eq(true), + EasyMock.eq(false), + EasyMock.anyObject(), + EasyMock.capture(responseCallback), + EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) + } + }) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel) + + + createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request) + + val send = capturedResponse.getValue.responseSend.get + val channel = new ByteBufferChannel(send.size()) + send.writeTo(channel) + channel.close() + + // read the size + channel.buffer.getInt() + + val responseHeader = ResponseHeader.parse(channel.buffer) + val struct = ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer) + + val markersResponse = new WriteTxnMarkersResponse(struct) + Assert.assertEquals(expectedErrors, markersResponse.errors(1)) + EasyMock.verify(replicaManager) + } + + @Test + def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = { + val topicPartition = new TopicPartition("t", 0) + val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition)) + EasyMock.expect(replicaManager.getMagic(topicPartition)) + .andReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), + EasyMock.anyShort(), + EasyMock.eq(true), + EasyMock.eq(false), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject())) + + EasyMock.replay(replicaManager) + + createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request) + EasyMock.verify(replicaManager) + } + + private def createWriteTxnMarkersRequest(partitions: util.List[TopicPartition]) = { + val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(Utils.mkList( + new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions))).build() + val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS.id, writeTxnMarkersRequest.version(), "", 0) + val byteBuffer = writeTxnMarkersRequest.serialize(header) + + val request = RequestChannel.Request(1, "1", + Session(KafkaPrincipal.ANONYMOUS, + InetAddress.getLocalHost), + byteBuffer, 0, + new ListenerName(""), + SecurityProtocol.PLAINTEXT) + (writeTxnMarkersRequest, request) + } + + }
