This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 00630ce1661 KAFKA-18672; CoordinatorRecordSerde must validate value 
version (4.0) (#18786)
00630ce1661 is described below

commit 00630ce166116421b0c6ad85a0413fead55b45b5
Author: David Jacot <[email protected]>
AuthorDate: Mon Feb 3 17:18:58 2025 +0100

    KAFKA-18672; CoordinatorRecordSerde must validate value version (4.0) 
(#18786)
    
    CoordinatorRecordSerde does not validate the version of the value to check 
whether the version is supported by the current version of the software. This 
is problematic if a future and unsupported version of the record is read by an 
older version of the software because it would misinterpret the bytes. Hence 
CoordinatorRecordSerde must throw an error if the version is unknown. This is 
also consistent with the handling in the old coordinator.
    
    Reviewers: Jeff Kim <[email protected]>
---
 .../common/runtime/CoordinatorLoader.java          | 17 ----------
 .../common/runtime/CoordinatorRecordSerde.java     |  5 +++
 .../coordinator/common/runtime/Deserializer.java   | 36 ++++++++++++++++++++++
 .../coordinator/group/CoordinatorLoaderImpl.scala  |  3 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  2 +-
 .../group/CoordinatorLoaderImplTest.scala          |  2 +-
 .../group/GroupCoordinatorRecordSerde.java         |  5 ++-
 .../group/GroupCoordinatorRecordSerdeTest.java     | 35 +++++++++++++++++++--
 .../share/ShareCoordinatorRecordSerde.java         |  5 ++-
 .../share/ShareCoordinatorRecordSerdeTest.java     |  7 +++--
 10 files changed, 85 insertions(+), 32 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
index e48a4253fa4..4f739082d67 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
@@ -28,23 +28,6 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface CoordinatorLoader<U> extends AutoCloseable {
 
-    /**
-     * UnknownRecordTypeException is thrown when the Deserializer encounters
-     * an unknown record type.
-     */
-    class UnknownRecordTypeException extends RuntimeException {
-        private final short unknownType;
-
-        public UnknownRecordTypeException(short unknownType) {
-            super(String.format("Found an unknown record type %d", 
unknownType));
-            this.unknownType = unknownType;
-        }
-
-        public short unknownType() {
-            return unknownType;
-        }
-    }
-
     /**
      * Object that is returned as part of the future from load(). Holds the 
partition load time and the
      * end time.
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
index 56f9a6cae13..28ed8962baa 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
@@ -77,6 +77,11 @@ public abstract class CoordinatorRecordSerde implements 
Serializer<CoordinatorRe
 
         final ApiMessage valueMessage = apiMessageValueFor(recordType);
         final short valueVersion = readVersion(valueBuffer, "value");
+
+        if (valueVersion < valueMessage.lowestSupportedVersion() || 
valueVersion > valueMessage.highestSupportedVersion()) {
+            throw new UnknownRecordVersionException(recordType, valueVersion);
+        }
+
         readMessage(valueMessage, valueBuffer, valueVersion, "value");
 
         return new CoordinatorRecord(
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
index 1873a254609..09a964dd754 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
@@ -24,6 +24,42 @@ import java.nio.ByteBuffer;
  * @param <T> The record type.
  */
 public interface Deserializer<T> {
+    /**
+     * UnknownRecordTypeException is thrown when the Deserializer encounters
+     * an unknown record type.
+     */
+    class UnknownRecordTypeException extends RuntimeException {
+        private final short unknownType;
+
+        public UnknownRecordTypeException(short unknownType) {
+            super(String.format("Found an unknown record type %d", 
unknownType));
+            this.unknownType = unknownType;
+        }
+
+        public short unknownType() {
+            return unknownType;
+        }
+    }
+
+    class UnknownRecordVersionException extends RuntimeException {
+        private final short type;
+        private final short unknownVersion;
+
+        public UnknownRecordVersionException(short type, short unknownVersion) 
{
+            super(String.format("Found an unknown record version %d for %d 
type", unknownVersion, type));
+            this.type = type;
+            this.unknownVersion = unknownVersion;
+        }
+
+        public short type() {
+            return type;
+        }
+
+        public short unknownVersion() {
+            return unknownVersion;
+        }
+    }
+
     /**
      * Deserializes the key and the value.
      *
diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index f5e42f099bd..70536abecc0 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -23,7 +23,8 @@ import 
org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import org.apache.kafka.common.record.{ControlRecordType, FileRecords, 
MemoryRecords}
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.Time
-import 
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.{LoadSummary, 
UnknownRecordTypeException}
+import 
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary
+import 
org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
 import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader, 
CoordinatorPlayback, Deserializer}
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.KafkaScheduler
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index da5565b87d4..4053c35f2a0 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -37,8 +37,8 @@ import 
org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
 import org.apache.kafka.common.protocol.{ByteBufferAccessor, Message}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
+import 
org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
 import 
org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey,
 ConsumerGroupCurrentMemberAssignmentKeyJsonConverter, 
ConsumerGroupCurrentMemberAssignmentValue, 
ConsumerGroupCurrentMemberAssignmentValueJsonConverter, 
ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter, 
ConsumerGroupMemberMetadataValue, 
ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey, 
ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataV [...]
-import 
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
 import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
 import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, 
ShareSnapshotKeyJsonConverter, ShareSnapshotValue, 
ShareSnapshotValueJsonConverter, ShareUpdateKey, ShareUpdateKeyJsonConverter, 
ShareUpdateValue, ShareUpdateValueJsonConverter}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index 68a6ba5da1d..dc4bbc830cd 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -24,7 +24,7 @@ import 
org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.{MockTime, Time}
-import 
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
+import 
org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
 import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback, 
Deserializer}
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.storage.internals.log.{FetchDataInfo, 
LogOffsetMetadata}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
index 38abc233f5d..ad61b36bf1d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
@@ -93,7 +92,7 @@ public class GroupCoordinatorRecordSerde extends 
CoordinatorRecordSerde {
             case 16:
                 return new ConsumerGroupRegularExpressionKey();
             default:
-                throw new 
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+                throw new UnknownRecordTypeException(recordVersion);
         }
     }
 
@@ -134,7 +133,7 @@ public class GroupCoordinatorRecordSerde extends 
CoordinatorRecordSerde {
             case 16:
                 return new ConsumerGroupRegularExpressionValue();
             default:
-                throw new 
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+                throw new UnknownRecordTypeException(recordVersion);
         }
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
index 94eb326af38..b7034365738 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.Deserializer;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@@ -166,8 +166,8 @@ public class GroupCoordinatorRecordSerdeTest {
 
         ByteBuffer valueBuffer = ByteBuffer.allocate(64);
 
-        CoordinatorLoader.UnknownRecordTypeException ex =
-            assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
+        Deserializer.UnknownRecordTypeException ex =
+            assertThrows(Deserializer.UnknownRecordTypeException.class,
                 () -> serde.deserialize(keyBuffer, valueBuffer));
         assertEquals((short) 255, ex.unknownType());
     }
@@ -243,6 +243,35 @@ public class GroupCoordinatorRecordSerdeTest {
             ex.getMessage());
     }
 
+    @Test
+    public void testDeserializeWithInvalidValueVersion() {
+        GroupCoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();
+
+        ApiMessageAndVersion key = new ApiMessageAndVersion(
+            new ConsumerGroupMetadataKey().setGroupId("foo"),
+            (short) 3
+        );
+        ByteBuffer keyBuffer = 
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+        ByteBuffer valueBuffer1 = ByteBuffer.allocate(2);
+        valueBuffer1.putShort((short) 
(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1));
+        valueBuffer1.rewind();
+        Deserializer.UnknownRecordVersionException ex =
+            assertThrows(Deserializer.UnknownRecordVersionException.class,
+                () -> serde.deserialize(keyBuffer, valueBuffer1));
+        assertEquals(key.version(), ex.type());
+        assertEquals(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1, 
ex.unknownVersion());
+
+        keyBuffer.rewind();
+        ByteBuffer valueBuffer2 = ByteBuffer.allocate(2);
+        valueBuffer2.putShort((short) 
(ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1));
+        valueBuffer2.rewind();
+        ex = assertThrows(Deserializer.UnknownRecordVersionException.class,
+            () -> serde.deserialize(keyBuffer, valueBuffer2));
+        assertEquals(key.version(), ex.type());
+        assertEquals(ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1, 
ex.unknownVersion());
+    }
+
     @Test
     public void testDeserializeAllRecordTypes() {
         roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue());
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
index 28f59e57d33..ec1b574be1c 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.coordinator.share;
 
 import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
 import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
 import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
@@ -34,7 +33,7 @@ public class ShareCoordinatorRecordSerde extends 
CoordinatorRecordSerde {
             case 1:
                 return new ShareUpdateKey();
             default:
-                throw new 
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+                throw new UnknownRecordTypeException(recordVersion);
         }
     }
 
@@ -46,7 +45,7 @@ public class ShareCoordinatorRecordSerde extends 
CoordinatorRecordSerde {
             case 1:
                 return new ShareUpdateValue();
             default:
-                throw new 
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+                throw new UnknownRecordTypeException(recordVersion);
         }
     }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
index c62abdb13dc..110974bbc20 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.coordinator.share;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.Deserializer;
 import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
 import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
 import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
@@ -121,8 +121,9 @@ public class ShareCoordinatorRecordSerdeTest {
 
         ByteBuffer valueBuffer = ByteBuffer.allocate(64);
 
-        CoordinatorLoader.UnknownRecordTypeException ex =
-            assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
+        Deserializer.UnknownRecordTypeException ex =
+            assertThrows(
+                Deserializer.UnknownRecordTypeException.class,
                 () -> serde.deserialize(keyBuffer, valueBuffer));
         assertEquals((short) 255, ex.unknownType());
     }

Reply via email to