[ 
https://issues.apache.org/jira/browse/KAFKA-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371154#comment-16371154
 ] 

ASF GitHub Bot commented on KAFKA-6238:
---------------------------------------

dguy closed pull request #4583: KAFKA-6238; Fix inter-broker protocol message 
format compatibility check
URL: https://github.com/apache/kafka/pull/4583
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java 
b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
new file mode 100644
index 00000000000..e71ec599f41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public enum RecordFormat {
+    V0(0), V1(1), V2(2);
+
+    public final byte value;
+
+    RecordFormat(int value) {
+        this.value = (byte) value;
+    }
+
+    public static RecordFormat lookup(byte version) {
+        switch (version) {
+            case 0: return V0;
+            case 1: return V1;
+            case 2: return V2;
+            default: throw new IllegalArgumentException("Unknown format 
version: " + version);
+        }
+    }
+
+    public static RecordFormat current() {
+        return V2;
+    }
+
+}
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index b8329c1ece2..9270a7a3b01 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.RecordFormat
 
 /**
  * This class contains the different Kafka versions.
@@ -90,11 +90,23 @@ object ApiVersion {
 
   def latestVersion = versionNameMap.values.max
 
+  def allVersions: Set[ApiVersion] = {
+    versionNameMap.values.toSet
+  }
+
+  def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String = 
{
+    messageFormatVersion match {
+      case RecordFormat.V0 => "0.8.0"
+      case RecordFormat.V1 => "0.10.0"
+      case RecordFormat.V2 => "0.11.0"
+      case _ => throw new IllegalArgumentException(s"Invalid message format 
version $messageFormatVersion")
+    }
+  }
 }
 
 sealed trait ApiVersion extends Ordered[ApiVersion] {
   val version: String
-  val messageFormatVersion: Byte
+  val messageFormatVersion: RecordFormat
   val id: Int
 
   override def compare(that: ApiVersion): Int =
@@ -106,90 +118,90 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
 // Keep the IDs in order of versions
 case object KAFKA_0_8_0 extends ApiVersion {
   val version: String = "0.8.0.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 0
 }
 
 case object KAFKA_0_8_1 extends ApiVersion {
   val version: String = "0.8.1.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 1
 }
 
 case object KAFKA_0_8_2 extends ApiVersion {
   val version: String = "0.8.2.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 2
 }
 
 case object KAFKA_0_9_0 extends ApiVersion {
   val version: String = "0.9.0.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 3
 }
 
 case object KAFKA_0_10_0_IV0 extends ApiVersion {
   val version: String = "0.10.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 4
 }
 
 case object KAFKA_0_10_0_IV1 extends ApiVersion {
   val version: String = "0.10.0-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 5
 }
 
 case object KAFKA_0_10_1_IV0 extends ApiVersion {
   val version: String = "0.10.1-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 6
 }
 
 case object KAFKA_0_10_1_IV1 extends ApiVersion {
   val version: String = "0.10.1-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 7
 }
 
 case object KAFKA_0_10_1_IV2 extends ApiVersion {
   val version: String = "0.10.1-IV2"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 8
 }
 
 case object KAFKA_0_10_2_IV0 extends ApiVersion {
   val version: String = "0.10.2-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 9
 }
 
 case object KAFKA_0_11_0_IV0 extends ApiVersion {
   val version: String = "0.11.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 10
 }
 
 case object KAFKA_0_11_0_IV1 extends ApiVersion {
   val version: String = "0.11.0-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 11
 }
 
 case object KAFKA_0_11_0_IV2 extends ApiVersion {
   val version: String = "0.11.0-IV2"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 12
 }
 
 case object KAFKA_1_0_IV0 extends ApiVersion {
   val version: String = "1.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 13
 }
 
 case object KAFKA_1_1_IV0 extends ApiVersion {
   val version: String = "1.1-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 14
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index fec984cf5e1..257dd8f9ba4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -465,7 +465,7 @@ class Log(@volatile var dir: File,
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: 
Boolean): Unit = lock synchronized {
     checkIfMemoryMappedBufferClosed()
-    val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
+    val messageFormatVersion = 
config.messageFormatVersion.messageFormatVersion.value
     info(s"Loading producer state from offset $lastOffset for partition 
$topicPartition with message " +
       s"format version $messageFormatVersion")
 
@@ -663,7 +663,7 @@ class Log(@volatile var dir: File,
               appendInfo.sourceCodec,
               appendInfo.targetCodec,
               config.compact,
-              config.messageFormatVersion.messageFormatVersion,
+              config.messageFormatVersion.messageFormatVersion.value,
               config.messageTimestampType,
               config.messageTimestampDifferenceMaxMs,
               leaderEpoch,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b84587f55c2..9e79afa2a5b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -59,7 +59,7 @@ import DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 
-import scala.collection.{mutable, _}
+import scala.collection._
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
@@ -1347,7 +1347,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (apiVersionRequest.hasUnsupportedRequestVersion)
         apiVersionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception)
       else
-        ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, 
config.interBrokerProtocolVersion.messageFormatVersion)
+        ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
+          config.interBrokerProtocolVersion.messageFormatVersion.value)
     }
     sendResponseMaybeThrottle(request, createResponseCallback)
   }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 529d0e639d9..8b2fb1044e0 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1330,8 +1330,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable 
meta-address 0.0.0.0. "+
       s"Use a routable IP address.")
-    require(interBrokerProtocolVersion >= logMessageFormatVersion,
-      s"log.message.format.version $logMessageFormatVersionString cannot be 
used when inter.broker.protocol.version is set to 
$interBrokerProtocolVersionString")
+
+    val messageFormatVersion = logMessageFormatVersion.messageFormatVersion
+    require(interBrokerProtocolVersion.messageFormatVersion.value >= 
messageFormatVersion.value,
+      s"log.message.format.version $logMessageFormatVersionString can only be 
used when " +
+        "inter.broker.protocol.version is set to version " +
+        s"${ApiVersion.minVersionForMessageFormat(messageFormatVersion)} or 
higher")
+
     val interBrokerUsesSasl = interBrokerSecurityProtocol == 
SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == 
SecurityProtocol.SASL_SSL
     require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || 
saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
       s"Only GSSAPI mechanism is supported for inter-broker communication with 
SASL when inter.broker.protocol.version is set to 
$interBrokerProtocolVersionString")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f4bfe39c9ef..470842e9d9e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1002,7 +1002,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getMagic(topicPartition: TopicPartition): Option[Byte] =
-    
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
+    
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value))
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala 
b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 6fc69747f5d..88c9d523f9b 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import org.apache.kafka.common.record.RecordFormat
 import org.junit.Test
 import org.junit.Assert._
 
@@ -74,4 +75,16 @@ class ApiVersionTest {
     assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.1"))
   }
 
+  @Test
+  def testMinVersionForMessageFormat(): Unit = {
+    assertEquals("0.8.0", 
ApiVersion.minVersionForMessageFormat(RecordFormat.V0))
+    assertEquals("0.10.0", 
ApiVersion.minVersionForMessageFormat(RecordFormat.V1))
+    assertEquals("0.11.0", 
ApiVersion.minVersionForMessageFormat(RecordFormat.V2))
+
+    // Ensure that all message format versions have a defined min version so 
that we remember
+    // to update the function
+    for (messageFormatVersion <- RecordFormat.values)
+      
assertNotNull(ApiVersion.minVersionForMessageFormat(messageFormatVersion))
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6b263342118..0213c12814e 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -522,6 +522,30 @@ class KafkaConfigTest {
     }
   }
 
+  @Test
+  def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
+    def buildConfig(interBrokerProtocol: ApiVersion, messageFormat: 
ApiVersion): KafkaConfig = {
+      val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+      props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
interBrokerProtocol.version)
+      props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
+      KafkaConfig.fromProps(props)
+    }
+
+    ApiVersion.allVersions.foreach { interBrokerVersion =>
+      ApiVersion.allVersions.foreach { messageFormatVersion =>
+        if (interBrokerVersion.messageFormatVersion.value >= 
messageFormatVersion.messageFormatVersion.value) {
+          val config = buildConfig(interBrokerVersion, messageFormatVersion)
+          assertEquals(messageFormatVersion, config.logMessageFormatVersion)
+          assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
+        } else {
+          intercept[IllegalArgumentException] {
+            buildConfig(interBrokerVersion, messageFormatVersion)
+          }
+        }
+      }
+    }
+  }
+
   @Test
   def testFromPropsInvalid() {
     def getBaseProperties(): Properties = {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b3ae68f3daf..3ac293d8498 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -36,10 +36,10 @@ <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading 
from 0.8.x, 0.9.x, 0.1
             <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See <a href="#upgrade_10_performance_impact">potential performance impact
                 following the upgrade</a> for the details on what this 
configuration does.)</li>
         </ul>
-        If you are upgrading from 0.11.0.x and you have not overridden the 
message format, then you only need to override
+        If you are upgrading from 0.11.0.x or 1.0.x and you have not 
overridden the message format, then you only need to override
         the inter-broker protocol format.
         <ul>
-            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0).</li>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0 or 
1.0).</li>
         </ul>
     </li>
     <li> Upgrade the brokers one at a time: shut down the broker, update the 
code, and restart it. </li>
@@ -106,10 +106,11 @@ <h4><a id="upgrade_1_0_0" href="#upgrade_1_0_0">Upgrading 
from 0.8.x, 0.9.x, 0.1
             <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See <a href="#upgrade_10_performance_impact">potential performance impact
                following the upgrade</a> for the details on what this 
configuration does.)</li>
         </ul>
-       If you are upgrading from 0.11.0.x and you have not overridden the 
message format, then you only need to override
-       the inter-broker protocol format.
+       If you are upgrading from 0.11.0.x and you have not overridden the 
message format, you must set
+       both the message format version and the inter-broker protocol version 
to 0.11.0.
         <ul>
-            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).</li>
+            <li>inter.broker.protocol.version=0.11.0</li>
+            <li>log.message.format.version=0.11.0</li>
         </ul>
     </li>
     <li> Upgrade the brokers one at a time: shut down the broker, update the 
code, and restart it. </li>
@@ -117,9 +118,11 @@ <h4><a id="upgrade_1_0_0" href="#upgrade_1_0_0">Upgrading 
from 0.8.x, 0.9.x, 0.1
     <li> Restart the brokers one by one for the new protocol version to take 
effect. </li>
     <li> If you have overridden the message format version as instructed 
above, then you need to do one more rolling restart to
         upgrade it to its latest version. Once all (or most) consumers have 
been upgraded to 0.11.0 or later,
-        change log.message.format.version to 1.0 on each broker and restart 
them one by one. Note that the older Scala consumer
-        does not support the new message format introduced in 0.11, so to 
avoid the performance cost of down-conversion (or to
-        take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly 
once semantics</a>), the newer Java consumer must be used.</li>
+        change log.message.format.version to 1.0 on each broker and restart 
them one by one. If you are upgrading from
+        0.11.0 and log.message.format.version is set to 0.11.0, you can update 
the config and skip the rolling restart.
+        Note that the older Scala consumer does not support the new message 
format introduced in 0.11, so to avoid the
+        performance cost of down-conversion (or to take advantage of <a 
href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>),
+        the newer Java consumer must be used.</li>
 </ol>
 
 <p><b>Additional Upgrade Notes:</b></p>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Issues with protocol version when applying a rolling upgrade to 1.0.0
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-6238
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6238
>             Project: Kafka
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 1.0.0
>            Reporter: Diego Louzán
>            Assignee: Jason Gustafson
>            Priority: Major
>             Fix For: 1.1.0, 1.2.0
>
>
> Hello,
> I am trying to perform a rolling upgrade from 0.10.0.1 to 1.0.0, and 
> according to the instructions in the documentation, I should only have to 
> upgrade the "inter.broker.protocol.version" parameter in the first step. But 
> after setting the value to "0.10.0" or "0.10.0.1" (tried both), the broker 
> refuses to start with the following error:
> {code}
> [2017-11-20 08:28:46,620] FATAL  (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 1.0-IV0 cannot be used when 
> inter.broker.protocol.version is set to 0.10.0.1
>         at scala.Predef$.require(Predef.scala:224)
>         at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1205)
>         at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
>         at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
>         at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
>         at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> {code}
> I checked the instructions for rolling upgrades to previous versions (namely 
> 0.11.0.0), and in here it's stated that is also needed to upgrade the 
> "log.message.format.version" parameter in two stages. I have tried that and 
> the upgrade worked. It seems it still applies to version 1.0.0, so I'm not 
> sure if this is wrong documentation, or an actual issue with kafka since it 
> should work as stated in the docs.
> Regards,
> Diego Louzán



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to