This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9a71bfb KAFKA-7030; Add configuration to disable message down-conversion (KIP-283) (#5192) 9a71bfb is described below commit 9a71bfb9d64dae5d0296d162a01a62d8c13324da Author: Dhruvil Shah <dhru...@confluent.io> AuthorDate: Wed Jun 13 21:23:23 2018 -0700 KAFKA-7030; Add configuration to disable message down-conversion (KIP-283) (#5192) Add support for the topic-level `message.downconversion.enable` config as part of KIP-283. --- .../apache/kafka/common/config/TopicConfig.java | 7 + core/src/main/scala/kafka/log/LogConfig.scala | 9 +- core/src/main/scala/kafka/server/KafkaApis.scala | 90 +++++------ core/src/main/scala/kafka/server/KafkaConfig.scala | 5 + core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../main/scala/kafka/server/ReplicaManager.scala | 7 +- .../server/DynamicBrokerReconfigurationTest.scala | 1 + .../FetchRequestDownConversionConfigTest.scala | 165 +++++++++++++++++++++ .../scala/unit/kafka/server/FetchRequestTest.scala | 2 +- 9 files changed, 238 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index d6b7003..fb2208c 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -165,4 +165,11 @@ public class TopicConfig { "the timestamp when a broker receives a message and the timestamp specified in the message. If " + "message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " + "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; + + public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable"; + public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " + + "down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, " + + "broker will not perform down-conversion for consumers expecting an older message format. The broker responds " + + "with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" + + "does not apply to any message format conversion that might be required for replication to followers."; } diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index c827121..bd4768e 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -63,6 +63,7 @@ object Defaults { val LeaderReplicationThrottledReplicas = Collections.emptyList[String]() val FollowerReplicationThrottledReplicas = Collections.emptyList[String]() val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots + val MessageDownConversionEnable = kafka.server.Defaults.MessageDownConversionEnable } case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] = Set.empty) @@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp) val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) + val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp) def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) @@ -131,6 +133,7 @@ object LogConfig { val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG val MessageTimestampDifferenceMaxMsProp = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG + val MessageDownConversionEnableProp = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG // Leave these out of TopicConfig for now as they are replication quota configs val LeaderReplicationThrottledReplicasProp = "leader.replication.throttled.replicas" @@ -158,6 +161,7 @@ object LogConfig { val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC val MessageTimestampDifferenceMaxMsDoc = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC + val MessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on " + "the leader side. The list should describe a set of replicas in the form " + @@ -262,6 +266,8 @@ object LogConfig { LeaderReplicationThrottledReplicasDoc, LeaderReplicationThrottledReplicasProp) .define(FollowerReplicationThrottledReplicasProp, LIST, Defaults.FollowerReplicationThrottledReplicas, ThrottledReplicaListValidator, MEDIUM, FollowerReplicationThrottledReplicasDoc, FollowerReplicationThrottledReplicasProp) + .define(MessageDownConversionEnableProp, BOOLEAN, Defaults.MessageDownConversionEnable, LOW, + MessageDownConversionEnableDoc, KafkaConfig.LogMessageDownConversionEnableProp) } def apply(): LogConfig = LogConfig(new Properties()) @@ -325,7 +331,8 @@ object LogConfig { PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp, MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp, MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp, - MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp + MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, + MessageDownConversionEnableProp -> KafkaConfig.LogMessageDownConversionEnableProp ) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cdd0d72..37a11bd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -43,7 +43,6 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} -import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -507,44 +506,41 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.toForget(), fetchRequest.isFromFollower()) + def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = { + new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, + FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) + } + val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]() val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower()) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { - fetchContext.foreachPartition((topicPartition, data) => { - if (!metadataCache.contains(topicPartition)) { - erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) - } else { + fetchContext.foreachPartition { (topicPartition, data) => + if (!metadataCache.contains(topicPartition)) + erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) + else interesting += (topicPartition -> data) - } - }) + } } else { - fetchContext.foreachPartition((part, _) => { - erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) - }) + fetchContext.foreachPartition { (part, _) => + erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED) + } } } else { // Regular Kafka consumers need READ permission on each partition they are fetching. - fetchContext.foreachPartition((topicPartition, data) => { + fetchContext.foreachPartition { (topicPartition, data) => if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL))) - erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) + erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) + erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) else interesting += (topicPartition -> data) - }) + } } - def convertRecords(tp: TopicPartition, unconvertedRecords: Records): BaseRecords = { + def maybeConvertFetchedData(tp: TopicPartition, + partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = { // Down-conversion of the fetched records is needed when the stored magic version is // greater than that supported by the client (as indicated by the fetch request version). If the // configured magic version for the topic is less than or equal to that supported by the version of the @@ -552,8 +548,10 @@ class KafkaApis(val requestChannel: RequestChannel, // know it must be supported. However, if the magic version is changed from a higher version back to a // lower version, this check will no longer be valid and we will fail to down-convert the messages // which were written in the new format prior to the version downgrade. - replicaManager.getMagic(tp).flatMap { magic => - val downConvertMagic = { + val unconvertedRecords = partitionData.records + val logConfig = replicaManager.getLogConfig(tp) + val downConvertMagic = + logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic => if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)) Some(RecordBatch.MAGIC_VALUE_V0) else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) @@ -562,28 +560,36 @@ class KafkaApis(val requestChannel: RequestChannel, None } - downConvertMagic.map { magic => - trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") - - // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much - // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked - // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the - // client. - new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time) - } - }.getOrElse(unconvertedRecords) + // For fetch requests from clients, check if down-conversion is disabled for the particular partition + if (downConvertMagic.isDefined && !fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) { + trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.") + errorResponse(Errors.UNSUPPORTED_VERSION) + } else { + val convertedRecords = + downConvertMagic.map { magic => + trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") + // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much + // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked + // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the + // client. + new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time) + }.getOrElse(unconvertedRecords) + new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, + FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions, + convertedRecords) + } } // the callback for process a fetch response, invoked before throttling def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = { val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] - responsePartitionData.foreach{ case (tp, data) => + responsePartitionData.foreach { case (tp, data) => val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset, data.logStartOffset, abortedTransactions, data.records)) } - erroneous.foreach{case (tp, data) => partitions.put(tp, data)} + erroneous.foreach { case (tp, data) => partitions.put(tp, data) } // When this callback is triggered, the remote API call has completed. // Record time before any byte-rate throttling. @@ -598,14 +604,10 @@ class KafkaApis(val requestChannel: RequestChannel, if (unconvertedPartitionData.error != Errors.NONE) debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}") - val convertedRecords = convertRecords(tp, unconvertedPartitionData.records) - val convertedPartitionData = new FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error, - unconvertedPartitionData.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedPartitionData.logStartOffset, - unconvertedPartitionData.abortedTransactions, convertedRecords) - convertedData.put(tp, convertedPartitionData) + convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData)) } - // Prepare fetch resopnse from converted data + // Prepare fetch response from converted data val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs, unconvertedFetchResponse.sessionId()) response.responseData.asScala.foreach { case (topicPartition, data) => @@ -1455,7 +1457,7 @@ class KafkaApis(val requestChannel: RequestChannel, duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap } else Map.empty - val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) + val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults sendResponseCallback(completeResults) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ecbb790..2760def 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -115,6 +115,7 @@ object Defaults { val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 + val MessageDownConversionEnable = true /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMs = RequestTimeoutMs @@ -329,6 +330,7 @@ object KafkaConfig { val MinInSyncReplicasProp = "min.insync.replicas" val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name" val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name" + val LogMessageDownConversionEnableProp = LogConfigPrefix + "message.downconversion.enable" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" val DefaultReplicationFactorProp = "default.replication.factor" @@ -598,6 +600,7 @@ object KafkaConfig { "implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface." val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " + "implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface." + val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC; /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" @@ -861,6 +864,7 @@ object KafkaConfig { .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc) .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc) + .define(LogMessageDownConversionEnableProp, BOOLEAN, Defaults.MessageDownConversionEnable, LOW, LogMessageDownConversionEnableDoc) /** ********* Replication configuration ***********/ .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) @@ -1134,6 +1138,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString) def logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp)) def logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp) + def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp) /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 57bca69..f73ede6 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -79,6 +79,7 @@ object KafkaServer { logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version) logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name) logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long) + logProps.put(LogConfig.MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean) logProps } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 24f3235..965595b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,7 +25,7 @@ import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.controller.{KafkaController, StateChangeLogger} -import kafka.log.{Log, LogAppendInfo, LogManager} +import kafka.log.{Log, LogAppendInfo, LogConfig, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.OffsetCheckpointFile @@ -995,8 +995,9 @@ class ReplicaManager(val config: KafkaConfig, quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync } - def getMagic(topicPartition: TopicPartition): Option[Byte] = - getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.recordVersion.value)) + def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = getReplica(topicPartition).flatMap(_.log.map(_.config)) + + def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value) def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = { replicaStateChangeLock synchronized { diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 45b3fdc..69ca317 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -369,6 +369,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.LogPreAllocateProp, true.toString) props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString) props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000") + props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false") reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp, "4000")) // Verify that all broker defaults have been updated diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala new file mode 100644 index 0000000..e5ef985 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import java.util +import java.util.Properties + +import kafka.log.LogConfig +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.Assert._ +import org.junit.Test + +class FetchRequestDownConversionConfigTest extends BaseRequestTest { + private var producer: KafkaProducer[String, String] = null + override def numBrokers: Int = 1 + + override def setUp(): Unit = { + super.setUp() + initProducer() + } + + override def tearDown(): Unit = { + if (producer != null) + producer.close() + super.tearDown() + } + + override protected def propertyOverrides(properties: Properties): Unit = { + super.propertyOverrides(properties) + properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false") + } + + private def initProducer(): Unit = { + producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer) + } + + private def createTopics(numTopics: Int, numPartitions: Int, + configs: Map[String, String] = Map.empty, topicSuffixStart: Int = 0): Map[TopicPartition, Int] = { + val topics = (0 until numTopics).map(t => s"topic${t + topicSuffixStart}") + val topicConfig = new Properties + topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 1.toString) + configs.foreach { case (k, v) => topicConfig.setProperty(k, v) } + topics.flatMap { topic => + val partitionToLeader = createTopic(topic, numPartitions = numPartitions, replicationFactor = 1, + topicConfig = topicConfig) + partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader } + }.toMap + } + + private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition], + offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = { + val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + topicPartitions.foreach { tp => + partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes)) + } + partitionMap + } + + private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse[MemoryRecords] = { + val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId)) + FetchResponse.parse(response, request.version) + } + + /** + * Tests that fetch request that require down-conversion returns with an error response when down-conversion is disabled on broker. + */ + @Test + def testV1FetchWithDownConversionDisabled(): Unit = { + val topicMap = createTopics(numTopics = 5, numPartitions = 1) + val topicPartitions = topicMap.keySet.toSeq + topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get()) + val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024, + topicPartitions)).build(1) + val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest) + topicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, fetchResponse.responseData().get(tp).error)) + } + + /** + * Tests that "message.downconversion.enable" has no effect when down-conversion is not required. + */ + @Test + def testLatestFetchWithDownConversionDisabled(): Unit = { + val topicMap = createTopics(numTopics = 5, numPartitions = 1) + val topicPartitions = topicMap.keySet.toSeq + topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get()) + val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024, + topicPartitions)).build() + val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest) + topicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error)) + } + + /** + * Tests that "message.downconversion.enable" can be set at topic level, and its configuration is obeyed for client + * fetch requests. + */ + @Test + def testV1FetchWithTopicLevelOverrides(): Unit = { + // create topics with default down-conversion configuration (i.e. conversion disabled) + val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0) + val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq + + // create topics with down-conversion configuration enabled + val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true") + val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5) + val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq + + val allTopics = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions + val leaderId = conversionDisabledTopicsMap.head._2 + + allTopics.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get()) + val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024, + allTopics)).build(1) + val fetchResponse = sendFetchRequest(leaderId, fetchRequest) + + conversionDisabledTopicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, fetchResponse.responseData().get(tp).error)) + conversionEnabledTopicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error)) + } + + /** + * Tests that "message.downconversion.enable" has no effect on fetch requests from replicas. + */ + @Test + def testV1FetchFromReplica(): Unit = { + // create topics with default down-conversion configuration (i.e. conversion disabled) + val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0) + val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq + + // create topics with down-conversion configuration enabled + val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true") + val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5) + val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq + + val allTopicPartitions = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions + val leaderId = conversionDisabledTopicsMap.head._2 + + allTopicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get()) + val fetchRequest = FetchRequest.Builder.forReplica(1, 1, Int.MaxValue, 0, + createPartitionMap(1024, allTopicPartitions)).build() + val fetchResponse = sendFetchRequest(leaderId, fetchRequest) + + allTopicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error)) + } +} diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 63e23b2..06ff2d9 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -426,7 +426,7 @@ class FetchRequestTest extends BaseRequestTest { } private def createTopics(numTopics: Int, numPartitions: Int, configs: Map[String, String] = Map.empty): Map[TopicPartition, Int] = { - val topics = (0 until numPartitions).map(t => s"topic$t") + val topics = (0 until numTopics).map(t => s"topic$t") val topicConfig = new Properties topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString) configs.foreach { case (k, v) => topicConfig.setProperty(k, v) } -- To stop receiving notification emails like this one, please contact j...@apache.org.