This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 00630ce1661 KAFKA-18672; CoordinatorRecordSerde must validate value
version (4.0) (#18786)
00630ce1661 is described below
commit 00630ce166116421b0c6ad85a0413fead55b45b5
Author: David Jacot <[email protected]>
AuthorDate: Mon Feb 3 17:18:58 2025 +0100
KAFKA-18672; CoordinatorRecordSerde must validate value version (4.0)
(#18786)
CoordinatorRecordSerde does not validate the version of the value to check
whether the version is supported by the current version of the software. This
is problematic if a future and unsupported version of the record is read by an
older version of the software because it would misinterpret the bytes. Hence
CoordinatorRecordSerde must throw an error if the version is unknown. This is
also consistent with the handling in the old coordinator.
Reviewers: Jeff Kim <[email protected]>
---
.../common/runtime/CoordinatorLoader.java | 17 ----------
.../common/runtime/CoordinatorRecordSerde.java | 5 +++
.../coordinator/common/runtime/Deserializer.java | 36 ++++++++++++++++++++++
.../coordinator/group/CoordinatorLoaderImpl.scala | 3 +-
.../main/scala/kafka/tools/DumpLogSegments.scala | 2 +-
.../group/CoordinatorLoaderImplTest.scala | 2 +-
.../group/GroupCoordinatorRecordSerde.java | 5 ++-
.../group/GroupCoordinatorRecordSerdeTest.java | 35 +++++++++++++++++++--
.../share/ShareCoordinatorRecordSerde.java | 5 ++-
.../share/ShareCoordinatorRecordSerdeTest.java | 7 +++--
10 files changed, 85 insertions(+), 32 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
index e48a4253fa4..4f739082d67 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java
@@ -28,23 +28,6 @@ import java.util.concurrent.CompletableFuture;
*/
public interface CoordinatorLoader<U> extends AutoCloseable {
- /**
- * UnknownRecordTypeException is thrown when the Deserializer encounters
- * an unknown record type.
- */
- class UnknownRecordTypeException extends RuntimeException {
- private final short unknownType;
-
- public UnknownRecordTypeException(short unknownType) {
- super(String.format("Found an unknown record type %d",
unknownType));
- this.unknownType = unknownType;
- }
-
- public short unknownType() {
- return unknownType;
- }
- }
-
/**
* Object that is returned as part of the future from load(). Holds the
partition load time and the
* end time.
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
index 56f9a6cae13..28ed8962baa 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java
@@ -77,6 +77,11 @@ public abstract class CoordinatorRecordSerde implements
Serializer<CoordinatorRe
final ApiMessage valueMessage = apiMessageValueFor(recordType);
final short valueVersion = readVersion(valueBuffer, "value");
+
+ if (valueVersion < valueMessage.lowestSupportedVersion() ||
valueVersion > valueMessage.highestSupportedVersion()) {
+ throw new UnknownRecordVersionException(recordType, valueVersion);
+ }
+
readMessage(valueMessage, valueBuffer, valueVersion, "value");
return new CoordinatorRecord(
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
index 1873a254609..09a964dd754 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java
@@ -24,6 +24,42 @@ import java.nio.ByteBuffer;
* @param <T> The record type.
*/
public interface Deserializer<T> {
+ /**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+ class UnknownRecordTypeException extends RuntimeException {
+ private final short unknownType;
+
+ public UnknownRecordTypeException(short unknownType) {
+ super(String.format("Found an unknown record type %d",
unknownType));
+ this.unknownType = unknownType;
+ }
+
+ public short unknownType() {
+ return unknownType;
+ }
+ }
+
+ class UnknownRecordVersionException extends RuntimeException {
+ private final short type;
+ private final short unknownVersion;
+
+ public UnknownRecordVersionException(short type, short unknownVersion)
{
+ super(String.format("Found an unknown record version %d for %d
type", unknownVersion, type));
+ this.type = type;
+ this.unknownVersion = unknownVersion;
+ }
+
+ public short type() {
+ return type;
+ }
+
+ public short unknownVersion() {
+ return unknownVersion;
+ }
+ }
+
/**
* Deserializes the key and the value.
*
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index f5e42f099bd..70536abecc0 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -23,7 +23,8 @@ import
org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, FileRecords,
MemoryRecords}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.Time
-import
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.{LoadSummary,
UnknownRecordTypeException}
+import
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary
+import
org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader,
CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.KafkaScheduler
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index da5565b87d4..4053c35f2a0 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -37,8 +37,8 @@ import
org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Message}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
+import
org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import
org.apache.kafka.coordinator.group.generated.{ConsumerGroupCurrentMemberAssignmentKey,
ConsumerGroupCurrentMemberAssignmentKeyJsonConverter,
ConsumerGroupCurrentMemberAssignmentValue,
ConsumerGroupCurrentMemberAssignmentValueJsonConverter,
ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataKeyJsonConverter,
ConsumerGroupMemberMetadataValue,
ConsumerGroupMemberMetadataValueJsonConverter, ConsumerGroupMetadataKey,
ConsumerGroupMetadataKeyJsonConverter, ConsumerGroupMetadataV [...]
-import
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey,
ShareSnapshotKeyJsonConverter, ShareSnapshotValue,
ShareSnapshotValueJsonConverter, ShareUpdateKey, ShareUpdateKeyJsonConverter,
ShareUpdateValue, ShareUpdateValueJsonConverter}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index 68a6ba5da1d..dc4bbc830cd 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -24,7 +24,7 @@ import
org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType,
EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
-import
org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
+import
org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback,
Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.storage.internals.log.{FetchDataInfo,
LogOffsetMetadata}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
index 38abc233f5d..ad61b36bf1d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
@@ -93,7 +92,7 @@ public class GroupCoordinatorRecordSerde extends
CoordinatorRecordSerde {
case 16:
return new ConsumerGroupRegularExpressionKey();
default:
- throw new
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+ throw new UnknownRecordTypeException(recordVersion);
}
}
@@ -134,7 +133,7 @@ public class GroupCoordinatorRecordSerde extends
CoordinatorRecordSerde {
case 16:
return new ConsumerGroupRegularExpressionValue();
default:
- throw new
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+ throw new UnknownRecordTypeException(recordVersion);
}
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
index 94eb326af38..b7034365738 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.Deserializer;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@@ -166,8 +166,8 @@ public class GroupCoordinatorRecordSerdeTest {
ByteBuffer valueBuffer = ByteBuffer.allocate(64);
- CoordinatorLoader.UnknownRecordTypeException ex =
- assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
+ Deserializer.UnknownRecordTypeException ex =
+ assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
@@ -243,6 +243,35 @@ public class GroupCoordinatorRecordSerdeTest {
ex.getMessage());
}
+ @Test
+ public void testDeserializeWithInvalidValueVersion() {
+ GroupCoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();
+
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("foo"),
+ (short) 3
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ ByteBuffer valueBuffer1 = ByteBuffer.allocate(2);
+ valueBuffer1.putShort((short)
(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1));
+ valueBuffer1.rewind();
+ Deserializer.UnknownRecordVersionException ex =
+ assertThrows(Deserializer.UnknownRecordVersionException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer1));
+ assertEquals(key.version(), ex.type());
+ assertEquals(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1,
ex.unknownVersion());
+
+ keyBuffer.rewind();
+ ByteBuffer valueBuffer2 = ByteBuffer.allocate(2);
+ valueBuffer2.putShort((short)
(ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1));
+ valueBuffer2.rewind();
+ ex = assertThrows(Deserializer.UnknownRecordVersionException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer2));
+ assertEquals(key.version(), ex.type());
+ assertEquals(ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1,
ex.unknownVersion());
+ }
+
@Test
public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue());
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
index 28f59e57d33..ec1b574be1c 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java
@@ -18,7 +18,6 @@
package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
@@ -34,7 +33,7 @@ public class ShareCoordinatorRecordSerde extends
CoordinatorRecordSerde {
case 1:
return new ShareUpdateKey();
default:
- throw new
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+ throw new UnknownRecordTypeException(recordVersion);
}
}
@@ -46,7 +45,7 @@ public class ShareCoordinatorRecordSerde extends
CoordinatorRecordSerde {
case 1:
return new ShareUpdateValue();
default:
- throw new
CoordinatorLoader.UnknownRecordTypeException(recordVersion);
+ throw new UnknownRecordTypeException(recordVersion);
}
}
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
index c62abdb13dc..110974bbc20 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
@@ -121,8 +121,9 @@ public class ShareCoordinatorRecordSerdeTest {
ByteBuffer valueBuffer = ByteBuffer.allocate(64);
- CoordinatorLoader.UnknownRecordTypeException ex =
- assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
+ Deserializer.UnknownRecordTypeException ex =
+ assertThrows(
+ Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}