[
https://issues.apache.org/jira/browse/KAFKA-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371154#comment-16371154
]
ASF GitHub Bot commented on KAFKA-6238:
---------------------------------------
dguy closed pull request #4583: KAFKA-6238; Fix inter-broker protocol message
format compatibility check
URL: https://github.com/apache/kafka/pull/4583
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
new file mode 100644
index 00000000000..e71ec599f41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public enum RecordFormat {
+ V0(0), V1(1), V2(2);
+
+ public final byte value;
+
+ RecordFormat(int value) {
+ this.value = (byte) value;
+ }
+
+ public static RecordFormat lookup(byte version) {
+ switch (version) {
+ case 0: return V0;
+ case 1: return V1;
+ case 2: return V2;
+ default: throw new IllegalArgumentException("Unknown format
version: " + version);
+ }
+ }
+
+ public static RecordFormat current() {
+ return V2;
+ }
+
+}
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala
b/core/src/main/scala/kafka/api/ApiVersion.scala
index b8329c1ece2..9270a7a3b01 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
package kafka.api
-import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.RecordFormat
/**
* This class contains the different Kafka versions.
@@ -90,11 +90,23 @@ object ApiVersion {
def latestVersion = versionNameMap.values.max
+ def allVersions: Set[ApiVersion] = {
+ versionNameMap.values.toSet
+ }
+
+ def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String =
{
+ messageFormatVersion match {
+ case RecordFormat.V0 => "0.8.0"
+ case RecordFormat.V1 => "0.10.0"
+ case RecordFormat.V2 => "0.11.0"
+ case _ => throw new IllegalArgumentException(s"Invalid message format
version $messageFormatVersion")
+ }
+ }
}
sealed trait ApiVersion extends Ordered[ApiVersion] {
val version: String
- val messageFormatVersion: Byte
+ val messageFormatVersion: RecordFormat
val id: Int
override def compare(that: ApiVersion): Int =
@@ -106,90 +118,90 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
// Keep the IDs in order of versions
case object KAFKA_0_8_0 extends ApiVersion {
val version: String = "0.8.0.X"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+ val messageFormatVersion = RecordFormat.V0
val id: Int = 0
}
case object KAFKA_0_8_1 extends ApiVersion {
val version: String = "0.8.1.X"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+ val messageFormatVersion = RecordFormat.V0
val id: Int = 1
}
case object KAFKA_0_8_2 extends ApiVersion {
val version: String = "0.8.2.X"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+ val messageFormatVersion = RecordFormat.V0
val id: Int = 2
}
case object KAFKA_0_9_0 extends ApiVersion {
val version: String = "0.9.0.X"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+ val messageFormatVersion = RecordFormat.V0
val id: Int = 3
}
case object KAFKA_0_10_0_IV0 extends ApiVersion {
val version: String = "0.10.0-IV0"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+ val messageFormatVersion = RecordFormat.V1
val id: Int = 4
}
case object KAFKA_0_10_0_IV1 extends ApiVersion {
val version: String = "0.10.0-IV1"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+ val messageFormatVersion = RecordFormat.V1
val id: Int = 5
}
case object KAFKA_0_10_1_IV0 extends ApiVersion {
val version: String = "0.10.1-IV0"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+ val messageFormatVersion = RecordFormat.V1
val id: Int = 6
}
case object KAFKA_0_10_1_IV1 extends ApiVersion {
val version: String = "0.10.1-IV1"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+ val messageFormatVersion = RecordFormat.V1
val id: Int = 7
}
case object KAFKA_0_10_1_IV2 extends ApiVersion {
val version: String = "0.10.1-IV2"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+ val messageFormatVersion = RecordFormat.V1
val id: Int = 8
}
case object KAFKA_0_10_2_IV0 extends ApiVersion {
val version: String = "0.10.2-IV0"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+ val messageFormatVersion = RecordFormat.V1
val id: Int = 9
}
case object KAFKA_0_11_0_IV0 extends ApiVersion {
val version: String = "0.11.0-IV0"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val messageFormatVersion = RecordFormat.V2
val id: Int = 10
}
case object KAFKA_0_11_0_IV1 extends ApiVersion {
val version: String = "0.11.0-IV1"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val messageFormatVersion = RecordFormat.V2
val id: Int = 11
}
case object KAFKA_0_11_0_IV2 extends ApiVersion {
val version: String = "0.11.0-IV2"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val messageFormatVersion = RecordFormat.V2
val id: Int = 12
}
case object KAFKA_1_0_IV0 extends ApiVersion {
val version: String = "1.0-IV0"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val messageFormatVersion = RecordFormat.V2
val id: Int = 13
}
case object KAFKA_1_1_IV0 extends ApiVersion {
val version: String = "1.1-IV0"
- val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val messageFormatVersion = RecordFormat.V2
val id: Int = 14
}
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index fec984cf5e1..257dd8f9ba4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -465,7 +465,7 @@ class Log(@volatile var dir: File,
private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown:
Boolean): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
- val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
+ val messageFormatVersion =
config.messageFormatVersion.messageFormatVersion.value
info(s"Loading producer state from offset $lastOffset for partition
$topicPartition with message " +
s"format version $messageFormatVersion")
@@ -663,7 +663,7 @@ class Log(@volatile var dir: File,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
- config.messageFormatVersion.messageFormatVersion,
+ config.messageFormatVersion.messageFormatVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b84587f55c2..9e79afa2a5b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -59,7 +59,7 @@ import DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken,
TokenInformation}
-import scala.collection.{mutable, _}
+import scala.collection._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
@@ -1347,7 +1347,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (apiVersionRequest.hasUnsupportedRequestVersion)
apiVersionRequest.getErrorResponse(requestThrottleMs,
Errors.UNSUPPORTED_VERSION.exception)
else
- ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
config.interBrokerProtocolVersion.messageFormatVersion)
+ ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
+ config.interBrokerProtocolVersion.messageFormatVersion.value)
}
sendResponseMaybeThrottle(request, createResponseCallback)
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 529d0e639d9..8b2fb1044e0 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1330,8 +1330,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable
meta-address 0.0.0.0. "+
s"Use a routable IP address.")
- require(interBrokerProtocolVersion >= logMessageFormatVersion,
- s"log.message.format.version $logMessageFormatVersionString cannot be
used when inter.broker.protocol.version is set to
$interBrokerProtocolVersionString")
+
+ val messageFormatVersion = logMessageFormatVersion.messageFormatVersion
+ require(interBrokerProtocolVersion.messageFormatVersion.value >=
messageFormatVersion.value,
+ s"log.message.format.version $logMessageFormatVersionString can only be
used when " +
+ "inter.broker.protocol.version is set to version " +
+ s"${ApiVersion.minVersionForMessageFormat(messageFormatVersion)} or
higher")
+
val interBrokerUsesSasl = interBrokerSecurityProtocol ==
SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol ==
SecurityProtocol.SASL_SSL
require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable ||
saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
s"Only GSSAPI mechanism is supported for inter-broker communication with
SASL when inter.broker.protocol.version is set to
$interBrokerProtocolVersionString")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f4bfe39c9ef..470842e9d9e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1002,7 +1002,7 @@ class ReplicaManager(val config: KafkaConfig,
}
def getMagic(topicPartition: TopicPartition): Option[Byte] =
-
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
+
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value))
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest:
UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 6fc69747f5d..88c9d523f9b 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,6 +17,7 @@
package kafka.api
+import org.apache.kafka.common.record.RecordFormat
import org.junit.Test
import org.junit.Assert._
@@ -74,4 +75,16 @@ class ApiVersionTest {
assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.1"))
}
+ @Test
+ def testMinVersionForMessageFormat(): Unit = {
+ assertEquals("0.8.0",
ApiVersion.minVersionForMessageFormat(RecordFormat.V0))
+ assertEquals("0.10.0",
ApiVersion.minVersionForMessageFormat(RecordFormat.V1))
+ assertEquals("0.11.0",
ApiVersion.minVersionForMessageFormat(RecordFormat.V2))
+
+ // Ensure that all message format versions have a defined min version so
that we remember
+ // to update the function
+ for (messageFormatVersion <- RecordFormat.values)
+
assertNotNull(ApiVersion.minVersionForMessageFormat(messageFormatVersion))
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6b263342118..0213c12814e 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -522,6 +522,30 @@ class KafkaConfigTest {
}
}
+ @Test
+ def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
+ def buildConfig(interBrokerProtocol: ApiVersion, messageFormat:
ApiVersion): KafkaConfig = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
interBrokerProtocol.version)
+ props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
+ KafkaConfig.fromProps(props)
+ }
+
+ ApiVersion.allVersions.foreach { interBrokerVersion =>
+ ApiVersion.allVersions.foreach { messageFormatVersion =>
+ if (interBrokerVersion.messageFormatVersion.value >=
messageFormatVersion.messageFormatVersion.value) {
+ val config = buildConfig(interBrokerVersion, messageFormatVersion)
+ assertEquals(messageFormatVersion, config.logMessageFormatVersion)
+ assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
+ } else {
+ intercept[IllegalArgumentException] {
+ buildConfig(interBrokerVersion, messageFormatVersion)
+ }
+ }
+ }
+ }
+ }
+
@Test
def testFromPropsInvalid() {
def getBaseProperties(): Properties = {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b3ae68f3daf..3ac293d8498 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -36,10 +36,10 @@ <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading
from 0.8.x, 0.9.x, 0.1
<li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION
(See <a href="#upgrade_10_performance_impact">potential performance impact
following the upgrade</a> for the details on what this
configuration does.)</li>
</ul>
- If you are upgrading from 0.11.0.x and you have not overridden the
message format, then you only need to override
+ If you are upgrading from 0.11.0.x or 1.0.x and you have not
overridden the message format, then you only need to override
the inter-broker protocol format.
<ul>
- <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0).</li>
+ <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0 or
1.0).</li>
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the
code, and restart it. </li>
@@ -106,10 +106,11 @@ <h4><a id="upgrade_1_0_0" href="#upgrade_1_0_0">Upgrading
from 0.8.x, 0.9.x, 0.1
<li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION
(See <a href="#upgrade_10_performance_impact">potential performance impact
following the upgrade</a> for the details on what this
configuration does.)</li>
</ul>
- If you are upgrading from 0.11.0.x and you have not overridden the
message format, then you only need to override
- the inter-broker protocol format.
+ If you are upgrading from 0.11.0.x and you have not overridden the
message format, you must set
+ both the message format version and the inter-broker protocol version
to 0.11.0.
<ul>
- <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).</li>
+ <li>inter.broker.protocol.version=0.11.0</li>
+ <li>log.message.format.version=0.11.0</li>
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the
code, and restart it. </li>
@@ -117,9 +118,11 @@ <h4><a id="upgrade_1_0_0" href="#upgrade_1_0_0">Upgrading
from 0.8.x, 0.9.x, 0.1
<li> Restart the brokers one by one for the new protocol version to take
effect. </li>
<li> If you have overridden the message format version as instructed
above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have
been upgraded to 0.11.0 or later,
- change log.message.format.version to 1.0 on each broker and restart
them one by one. Note that the older Scala consumer
- does not support the new message format introduced in 0.11, so to
avoid the performance cost of down-conversion (or to
- take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly
once semantics</a>), the newer Java consumer must be used.</li>
+ change log.message.format.version to 1.0 on each broker and restart
them one by one. If you are upgrading from
+ 0.11.0 and log.message.format.version is set to 0.11.0, you can update
the config and skip the rolling restart.
+ Note that the older Scala consumer does not support the new message
format introduced in 0.11, so to avoid the
+ performance cost of down-conversion (or to take advantage of <a
href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>),
+ the newer Java consumer must be used.</li>
</ol>
<p><b>Additional Upgrade Notes:</b></p>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Issues with protocol version when applying a rolling upgrade to 1.0.0
> ---------------------------------------------------------------------
>
> Key: KAFKA-6238
> URL: https://issues.apache.org/jira/browse/KAFKA-6238
> Project: Kafka
> Issue Type: Bug
> Components: documentation
> Affects Versions: 1.0.0
> Reporter: Diego Louzán
> Assignee: Jason Gustafson
> Priority: Major
> Fix For: 1.1.0, 1.2.0
>
>
> Hello,
> I am trying to perform a rolling upgrade from 0.10.0.1 to 1.0.0, and
> according to the instructions in the documentation, I should only have to
> upgrade the "inter.broker.protocol.version" parameter in the first step. But
> after setting the value to "0.10.0" or "0.10.0.1" (tried both), the broker
> refuses to start with the following error:
> {code}
> [2017-11-20 08:28:46,620] FATAL (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed:
> log.message.format.version 1.0-IV0 cannot be used when
> inter.broker.protocol.version is set to 0.10.0.1
> at scala.Predef$.require(Predef.scala:224)
> at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1205)
> at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
> at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
> at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
> at
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> {code}
> I checked the instructions for rolling upgrades to previous versions (namely
> 0.11.0.0), and in here it's stated that is also needed to upgrade the
> "log.message.format.version" parameter in two stages. I have tried that and
> the upgrade worked. It seems it still applies to version 1.0.0, so I'm not
> sure if this is wrong documentation, or an actual issue with kafka since it
> should work as stated in the docs.
> Regards,
> Diego Louzán
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)