This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b31aa651156 MINOR: Fix MessageFormatters (#18266)
b31aa651156 is described below

commit b31aa651156fbc961fbb8460604393fef1c09185
Author: David Jacot <[email protected]>
AuthorDate: Thu Dec 19 16:12:50 2024 +0100

    MINOR: Fix MessageFormatters (#18266)
    
    While looking at the message formatters in 
https://github.com/apache/kafka/pull/18261, I have noticed at few incorrect 
test cases.
    * We should not log anything when the record type is unknown because the 
formatters have clear goals.
    * We should not parse the value when the key is null or when the key cannot 
be parsed. While it works in the tests, in practice, this is wrong because we 
cannot assume that type of the value if the type of the key is not defined. The 
key drives the type of the entire record.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/tools/consumer/ApiMessageFormatter.java  | 13 +++---
 .../consumer/GroupMetadataMessageFormatter.java    | 50 ++++------------------
 .../tools/consumer/OffsetsMessageFormatter.java    | 50 ++++------------------
 .../consumer/TransactionLogMessageFormatter.java   | 37 +++++-----------
 .../GroupMetadataMessageFormatterTest.java         | 22 ++++------
 .../tools/consumer/OffsetMessageFormatterTest.java | 25 +++++------
 .../TransactionLogMessageFormatterTest.java        | 21 +++------
 7 files changed, 62 insertions(+), 156 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java
index 9bedbe265c8..c3c529265e1 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java
@@ -33,6 +33,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 public abstract class ApiMessageFormatter implements MessageFormatter {
 
+    private static final String TYPE = "type";
     private static final String VERSION = "version";
     private static final String DATA = "data";
     private static final String KEY = "key";
@@ -46,22 +47,22 @@ public abstract class ApiMessageFormatter implements 
MessageFormatter {
         byte[] key = consumerRecord.key();
         if (Objects.nonNull(key)) {
             short keyVersion = ByteBuffer.wrap(key).getShort();
-            JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key), 
keyVersion);
+            JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));
 
             if (dataNode instanceof NullNode) {
                 return;
             }
             json.putObject(KEY)
-                    .put(VERSION, keyVersion)
+                    .put(TYPE, keyVersion)
                     .set(DATA, dataNode);
         } else {
-            json.set(KEY, NullNode.getInstance());
+            return;
         }
 
         byte[] value = consumerRecord.value();
         if (Objects.nonNull(value)) {
             short valueVersion = ByteBuffer.wrap(value).getShort();
-            JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value), 
valueVersion);
+            JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value));
 
             json.putObject(VALUE)
                     .put(VERSION, valueVersion)
@@ -77,6 +78,6 @@ public abstract class ApiMessageFormatter implements 
MessageFormatter {
         }
     }
 
-    protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer, short 
version);
-    protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer, short 
version);
+    protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer);
+    protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer);
 }  
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java
index 754c43193c8..cb608343c2e 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java
@@ -16,66 +16,34 @@
  */
 package org.apache.kafka.tools.consumer;
 
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;
-import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 
 import java.nio.ByteBuffer;
-import java.util.Optional;
 
 public class GroupMetadataMessageFormatter extends ApiMessageFormatter {
-
     @Override
-    protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
-        return readToGroupMetadataKey(byteBuffer)
-                .map(logKey -> transferKeyMessageToJsonNode(logKey, version))
-                .orElseGet(() -> new TextNode(UNKNOWN));
-    }
-
-    @Override
-    protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
-        return readToGroupMetadataValue(byteBuffer)
-                .map(logValue -> 
GroupMetadataValueJsonConverter.write(logValue, version))
-                .orElseGet(() -> new TextNode(UNKNOWN));
-    }
-
-    private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) 
{
+    protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
         short version = byteBuffer.getShort();
-        if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
-                && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new OffsetCommitKey(new 
ByteBufferAccessor(byteBuffer), version));
-        } else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && 
version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new GroupMetadataKey(new 
ByteBufferAccessor(byteBuffer), version));
-        } else {
-            return Optional.empty();
+        if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= 
GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
+            return GroupMetadataKeyJsonConverter.write(new 
GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version), version);
         }
+        return NullNode.getInstance();
     }
 
-    private JsonNode transferKeyMessageToJsonNode(ApiMessage message, short 
version) {
-        if (message instanceof OffsetCommitKey) {
-            return NullNode.getInstance();
-        } else if (message instanceof GroupMetadataKey) {
-            return GroupMetadataKeyJsonConverter.write((GroupMetadataKey) 
message, version);
-        } else {
-            return new TextNode(UNKNOWN);
-        }
-    }
-
-    private Optional<GroupMetadataValue> readToGroupMetadataValue(ByteBuffer 
byteBuffer) {
+    @Override
+    protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
         short version = byteBuffer.getShort();
-        if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION
-                && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new GroupMetadataValue(new 
ByteBufferAccessor(byteBuffer), version));
-        } else {
-            return Optional.empty();
+        if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version 
<= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
+            return GroupMetadataValueJsonConverter.write(new 
GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version);
         }
+        return new TextNode(UNKNOWN);
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
index 2927ed46c9f..f374a292650 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
@@ -16,9 +16,7 @@
  */
 package org.apache.kafka.tools.consumer;
 
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
 import 
org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
@@ -29,56 +27,26 @@ import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 
 import java.nio.ByteBuffer;
-import java.util.Optional;
 
 /**
  * Formatter for use with tools such as console consumer: Consumer should also 
set exclude.internal.topics to false.
  */
 public class OffsetsMessageFormatter extends ApiMessageFormatter {
-
     @Override
-    protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
-        return readToGroupMetadataKey(byteBuffer)
-                .map(logKey -> transferKeyMessageToJsonNode(logKey, version))
-                .orElseGet(() -> new TextNode(UNKNOWN));
-    }
-
-    @Override
-    protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
-        return readToOffsetMessageValue(byteBuffer)
-                .map(logValue -> 
OffsetCommitValueJsonConverter.write(logValue, version))
-                .orElseGet(() -> new TextNode(UNKNOWN));
-    }
-
-    private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) 
{
+    protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
         short version = byteBuffer.getShort();
-        if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
-                && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new OffsetCommitKey(new 
ByteBufferAccessor(byteBuffer), version));
-        } else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && 
version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new GroupMetadataKey(new 
ByteBufferAccessor(byteBuffer), version));
-        } else {
-            return Optional.empty();
+        if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= 
OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
+            return OffsetCommitKeyJsonConverter.write(new OffsetCommitKey(new 
ByteBufferAccessor(byteBuffer), version), version);
         }
+        return NullNode.getInstance();
     }
 
-    private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short 
keyVersion) {
-        if (logKey instanceof OffsetCommitKey) {
-            return OffsetCommitKeyJsonConverter.write((OffsetCommitKey) 
logKey, keyVersion);
-        } else if (logKey instanceof GroupMetadataKey) {
-            return NullNode.getInstance();
-        } else {
-            return new TextNode(UNKNOWN);
-        }
-    }
-
-    private Optional<OffsetCommitValue> readToOffsetMessageValue(ByteBuffer 
byteBuffer) {
+    @Override
+    protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
         short version = byteBuffer.getShort();
-        if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION
-                && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new OffsetCommitValue(new 
ByteBufferAccessor(byteBuffer), version));
-        } else {
-            return Optional.empty();
+        if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version 
<= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
+            return OffsetCommitValueJsonConverter.write(new 
OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version);
         }
+        return new TextNode(UNKNOWN);
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java
index d60231d320a..b255ee5a063 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java
@@ -23,44 +23,27 @@ import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
 import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 
 import java.nio.ByteBuffer;
-import java.util.Optional;
 
 public class TransactionLogMessageFormatter extends ApiMessageFormatter {
-
-    @Override
-    protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
-        return readToTransactionLogKey(byteBuffer)
-                .map(logKey -> TransactionLogKeyJsonConverter.write(logKey, 
version))
-                .orElseGet(() -> new TextNode(UNKNOWN));
-    }
-
     @Override
-    protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
-        return readToTransactionLogValue(byteBuffer)
-                .map(logValue -> 
TransactionLogValueJsonConverter.write(logValue, version))
-                .orElseGet(() -> new TextNode(UNKNOWN));
-    }
-
-    private Optional<TransactionLogKey> readToTransactionLogKey(ByteBuffer 
byteBuffer) {
+    protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
         short version = byteBuffer.getShort();
-        if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
-                && version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new TransactionLogKey(new 
ByteBufferAccessor(byteBuffer), version));
-        } else {
-            return Optional.empty();
+        if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION && version 
<= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+            return TransactionLogKeyJsonConverter.write(new 
TransactionLogKey(new ByteBufferAccessor(byteBuffer), version), version);
         }
+        return NullNode.getInstance();
     }
 
-    private Optional<TransactionLogValue> readToTransactionLogValue(ByteBuffer 
byteBuffer) {
+    @Override
+    protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
         short version = byteBuffer.getShort();
-        if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
-                && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
-            return Optional.of(new TransactionLogValue(new 
ByteBufferAccessor(byteBuffer), version));
-        } else {
-            return Optional.empty();
+        if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version 
<= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+            return TransactionLogValueJsonConverter.write(new 
TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version);
         }
+        return new TextNode(UNKNOWN);
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java
index bc06d2c6452..62ae43bd09c 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java
@@ -76,12 +76,12 @@ public class GroupMetadataMessageFormatterTest {
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
GROUP_METADATA_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
GROUP_METADATA_VALUE).array(),
-                        
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}"
+                        ""
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
GROUP_METADATA_VALUE).array(),
-                        
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0,"
 +
+                        
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0,"
 +
                             
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\","
 +
                             
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\","
 +
                             
"\"clientHost\":\"host-1\",\"sessionTimeout\":1500,\"subscription\":\"AAE=\"," +
@@ -90,7 +90,7 @@ public class GroupMetadataMessageFormatterTest {
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
GROUP_METADATA_VALUE).array(),
-                        
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1,"
 +
+                        
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1,"
 +
                             
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\","
 +
                             
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\","
 +
                             
"\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,\"sessionTimeout\":1500," +
@@ -99,7 +99,7 @@ public class GroupMetadataMessageFormatterTest {
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_VALUE).array(),
-                        
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2,"
 +
+                        
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2,"
 +
                             
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\","
 +
                             
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\","
 +
                             
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,"
 +
@@ -108,7 +108,7 @@ public class GroupMetadataMessageFormatterTest {
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 3, 
GROUP_METADATA_VALUE).array(),
-                        
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3,"
 +
+                        
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3,"
 +
                             
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\","
 +
                             
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\","
 +
                             
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\","
 +
@@ -117,7 +117,7 @@ public class GroupMetadataMessageFormatterTest {
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 4, 
GROUP_METADATA_VALUE).array(),
-                        
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4,"
 +
+                        
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4,"
 +
                             
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\","
 +
                             
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\","
 +
                             
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\","
 +
@@ -126,16 +126,12 @@ public class GroupMetadataMessageFormatterTest {
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
                         null,
-                        
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
+                        
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
                 Arguments.of(
                         null,
                         MessageUtil.toVersionPrefixedByteBuffer((short) 4, 
GROUP_METADATA_VALUE).array(),
-                        
"{\"key\":null,\"value\":{\"version\":4,\"data\":{\"protocolType\":\"consumer\",\"generation\":1,"
 +
-                            
"\"protocol\":\"range\",\"leader\":\"leader\",\"currentStateTimestamp\":1234," +
-                            
"\"members\":[{\"memberId\":\"member-1\",\"groupInstanceId\":\"group-instance-1\","
 +
-                            
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,"
 +
-                            
"\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"),
-                Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
+                        ""),
+                Arguments.of(null, null, ""),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 4, 
OFFSET_COMMIT_VALUE).array(),
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
index f2c4a8e3e34..684c681a175 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
@@ -65,67 +65,64 @@ public class OffsetMessageFormatterTest {
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}"
+                        ""
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                        
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
                             
"\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
                             "\"commitTimestamp\":1234}}}"
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                        
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
                             
"\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
                             
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                        
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
                             
"\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
                             "\"commitTimestamp\":1234}}}"
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 3, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                        
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
                             
"\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
                             
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 4, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                        
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
                             
"\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
                             
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 5, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 4, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":5,\"data\":\"unknown\"},\"value\":{\"version\":4," +
-                            
"\"data\":{\"offset\":100,\"leaderEpoch\":10,\"metadata\":\"metadata\"," +
-                            "\"commitTimestamp\":1234}}}"
+                        ""
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 5, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                        
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
                             "\"value\":{\"version\":5,\"data\":\"unknown\"}}"
                 ),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
                         null,
-                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                        
"{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
                             "\"value\":null}"),
                 Arguments.of(
                         null,
                         MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
OFFSET_COMMIT_VALUE).array(),
-                        
"{\"key\":null,\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\","
 +
-                            
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"),
-                Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
+                        ""),
+                Arguments.of(null, null, ""),
                 Arguments.of(
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
                         MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_VALUE).array(),
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java
index e2189fa490c..dc7946d5fa0 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java
@@ -56,13 +56,12 @@ public class TransactionLogMessageFormatterTest {
                 Arguments.of(
                     MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
TXN_LOG_KEY).array(),
                     MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
TXN_LOG_VALUE).array(),
-                    "{\"key\":{\"version\":10,\"data\":\"unknown\"}," +
-                        "\"value\":{\"version\":10,\"data\":\"unknown\"}}"
+                    ""
                 ),
                 Arguments.of(
                     MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
TXN_LOG_KEY).array(),
                     MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
TXN_LOG_VALUE).array(),
-                    
"{\"key\":{\"version\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
+                    
"{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
                         
"\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," +
                         
"\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[],"
 +
                         
"\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}"
@@ -70,29 +69,23 @@ public class TransactionLogMessageFormatterTest {
                 Arguments.of(
                     MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
TXN_LOG_KEY).array(),
                     MessageUtil.toVersionPrefixedByteBuffer((short) 5, 
TXN_LOG_VALUE).array(),
-                    
"{\"key\":{\"version\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
+                    
"{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
                         "\"value\":{\"version\":5,\"data\":\"unknown\"}}"
                 ),
                 Arguments.of(
                     MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
TXN_LOG_KEY).array(),
                     MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
TXN_LOG_VALUE).array(),
-                    "{\"key\":{\"version\":1,\"data\":\"unknown\"}," +
-                        
"\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," +
-                        
"\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[],"
 +
-                        
"\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}"),
+                    ""),
                 Arguments.of(
                     MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
TXN_LOG_KEY).array(),
                     null,
-                    
"{\"key\":{\"version\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
+                    
"{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," +
                         "\"value\":null}"),
                 Arguments.of(
                     null,
                     MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
TXN_LOG_VALUE).array(),
-                    "{\"key\":null," +
-                        
"\"value\":{\"version\":1,\"data\":{\"producerId\":100,\"producerEpoch\":50," +
-                        
"\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[],"
 +
-                        
"\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}"),
-                Arguments.of(null, null, "{\"key\":null,\"value\":null}")
+                    ""),
+                Arguments.of(null, null, "")
         );
     }
 

Reply via email to