This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new b5c96402e feat(java): Implement user headers serialization / 
deserialization in java sdk (#2516)
b5c96402e is described below

commit b5c96402ea9205d53f92136011510a9151667a86
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Sat Jan 3 00:09:26 2026 +0900

    feat(java): Implement user headers serialization / deserialization in java 
sdk (#2516)
    
    closes #2455
---
 examples/java/README.md                            |   9 +-
 examples/java/build.gradle.kts                     |  10 ++
 .../producer/MessageEnvelopeProducer.java          |   3 +-
 .../consumer/MessageHeadersConsumer.java           | 169 +++++++++++++++++++++
 .../producer/MessageHeadersProducer.java}          |  52 +++----
 foreign/java/BUILD_AND_TEST.md                     |  32 ++--
 .../iggy/connector/flink/sink/IggySinkWriter.java  |   3 +-
 .../client/blocking/http/ObjectMapperFactory.java  |   5 +
 .../main/java/org/apache/iggy/message/Message.java |  60 +++++++-
 .../org/apache/iggy/serde/BytesDeserializer.java   |  26 +++-
 .../org/apache/iggy/serde/BytesSerializer.java     |   3 +-
 11 files changed, 310 insertions(+), 62 deletions(-)

diff --git a/examples/java/README.md b/examples/java/README.md
index 1d89b691b..1589d5c15 100644
--- a/examples/java/README.md
+++ b/examples/java/README.md
@@ -77,7 +77,14 @@ gradle runGettingStartedConsumer
 
 ### Message Headers
 
-This example will be created as and when userHeaders 
serialization/deserialization is implemented in the Java client SDK.
+Shows metadata management using custom headers:
+
+```bash
+gradle runMessageHeadersProducer
+gradle runMessageHeadersConsumer
+```
+
+Demonstrates using header keys and values for message metadata instead of 
payload-based typing, with header-based message routing.
 
 ### Message Envelopes
 
diff --git a/examples/java/build.gradle.kts b/examples/java/build.gradle.kts
index 46a319207..86c563a46 100644
--- a/examples/java/build.gradle.kts
+++ b/examples/java/build.gradle.kts
@@ -73,6 +73,16 @@ tasks.register<JavaExec>("runMessageEnvelopeConsumer") {
     
mainClass.set("org.apache.iggy.examples.messageenvelope.consumer.MessageEnvelopeConsumer")
 }
 
+tasks.register<JavaExec>("runMessageHeadersProducer") {
+    classpath = sourceSets["main"].runtimeClasspath
+    
mainClass.set("org.apache.iggy.examples.messageheaders.producer.MessageHeadersProducer")
+}
+
+tasks.register<JavaExec>("runMessageHeadersConsumer") {
+    classpath = sourceSets["main"].runtimeClasspath
+    
mainClass.set("org.apache.iggy.examples.messageheaders.consumer.MessageHeadersConsumer")
+}
+
 tasks.register<JavaExec>("runSinkDataProducer") {
     classpath = sourceSets["main"].runtimeClasspath
     mainClass.set("org.apache.iggy.examples.sinkdataproducer.SinkDataProducer")
diff --git 
a/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
 
b/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
index 8feab9bd1..7dbeeff74 100644
--- 
a/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
+++ 
b/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 
@@ -126,7 +127,7 @@ public final class MessageEnvelopeProducer {
                         BigInteger.ZERO,
                         0L,
                         (long) payload.length);
-                Message message = new Message(header, payload, 
Optional.empty());
+                Message message = new Message(header, payload, new 
HashMap<>());
                 messages.add(message);
                 serializableMessages.add(serializableMessage);
             }
diff --git 
a/examples/java/src/main/java/org/apache/iggy/examples/messageheaders/consumer/MessageHeadersConsumer.java
 
b/examples/java/src/main/java/org/apache/iggy/examples/messageheaders/consumer/MessageHeadersConsumer.java
new file mode 100644
index 000000000..0a657a64f
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/iggy/examples/messageheaders/consumer/MessageHeadersConsumer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.messageheaders.consumer;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.examples.shared.Messages;
+import org.apache.iggy.examples.shared.Messages.OrderConfirmed;
+import org.apache.iggy.examples.shared.Messages.OrderCreated;
+import org.apache.iggy.examples.shared.Messages.OrderRejected;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.HeaderValue;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tools.jackson.databind.ObjectMapper;
+import tools.jackson.databind.json.JsonMapper;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Optional;
+
+public final class MessageHeadersConsumer {
+    private static final String STREAM_NAME = "headers-stream";
+    private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
+
+    private static final String TOPIC_NAME = "orders";
+    private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
+
+    private static final long PARTITION_ID = 0L;
+    private static final int BATCHES_LIMIT = 10;
+    private static final long MESSAGES_PER_BATCH = 1L;
+
+    private static final String MESSAGE_TYPE_HEADER = "message_type";
+
+    private static final Logger log = 
LoggerFactory.getLogger(MessageHeadersConsumer.class);
+    private static final ObjectMapper MAPPER = JsonMapper.builder().build();
+
+    private MessageHeadersConsumer() {}
+
+    public static void main(final String[] args) {
+        var client = IggyTcpClient.builder()
+                .host("localhost")
+                .port(8090)
+                .credentials("iggy", "iggy")
+                .build();
+
+        consumeMessages(client);
+    }
+
+    private static void consumeMessages(IggyTcpClient client) {
+        log.info(
+                "Messages will be consumed from stream: {}, topic: {}, 
partition: {}.",
+                STREAM_ID,
+                TOPIC_ID,
+                PARTITION_ID);
+
+        BigInteger offset = BigInteger.ZERO;
+        int consumedBatches = 0;
+
+        Consumer consumer = Consumer.of(0L);
+
+        while (true) {
+            if (consumedBatches == BATCHES_LIMIT) {
+                log.info("Consumed {} batches of messages, exiting.", 
consumedBatches);
+                return;
+            }
+
+            try {
+                PolledMessages polledMessages = client.messages()
+                        .pollMessages(
+                                STREAM_ID,
+                                TOPIC_ID,
+                                Optional.of(PARTITION_ID),
+                                consumer,
+                                PollingStrategy.offset(offset),
+                                MESSAGES_PER_BATCH,
+                                false);
+
+                if (polledMessages.messages().isEmpty()) {
+                    log.info("No messages found.");
+                    continue;
+                }
+
+                for (Message message : polledMessages.messages()) {
+                    handleMessage(message);
+                }
+
+                consumedBatches++;
+
+                offset = 
offset.add(BigInteger.valueOf(polledMessages.messages().size()));
+
+            } catch (Exception e) {
+                log.error("Error polling messages", e);
+                break;
+            }
+        }
+    }
+
+    private static void handleMessage(Message message) {
+        String payload = new String(message.payload(), StandardCharsets.UTF_8);
+        String messageType = "unknown";
+
+        try {
+            Map<String, HeaderValue> userHeaders = message.userHeaders();
+            if (userHeaders.isEmpty()) {
+                log.warn("Missing headers at offset {}.", 
message.header().offset());
+                return;
+            }
+
+            HeaderValue headerValue = userHeaders.get(MESSAGE_TYPE_HEADER);
+            if (headerValue == null) {
+                log.warn(
+                        "Missing message type header at offset {}.",
+                        message.header().offset());
+                return;
+            }
+
+            messageType = headerValue.value();
+            log.info(
+                    "Handling message type: {} at offset: {}...",
+                    messageType,
+                    message.header().offset());
+
+            switch (messageType) {
+                case Messages.ORDER_CREATED_TYPE -> {
+                    OrderCreated order = MAPPER.readValue(payload, 
OrderCreated.class);
+                    log.info("{}", order);
+                }
+                case Messages.ORDER_CONFIRMED_TYPE -> {
+                    OrderConfirmed order = MAPPER.readValue(payload, 
OrderConfirmed.class);
+                    log.info("{}", order);
+                }
+                case Messages.ORDER_REJECTED_TYPE -> {
+                    OrderRejected order = MAPPER.readValue(payload, 
OrderRejected.class);
+                    log.info("{}", order);
+                }
+                default -> log.warn("Received unknown message type: {}", 
messageType);
+            }
+        } catch (Exception e) {
+            log.error(
+                    "Failed to handle message type {} at offset {}",
+                    messageType,
+                    message.header().offset(),
+                    e);
+        }
+    }
+}
diff --git 
a/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
 
b/examples/java/src/main/java/org/apache/iggy/examples/messageheaders/producer/MessageHeadersProducer.java
similarity index 74%
copy from 
examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
copy to 
examples/java/src/main/java/org/apache/iggy/examples/messageheaders/producer/MessageHeadersProducer.java
index 8feab9bd1..3e5dd4938 100644
--- 
a/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
+++ 
b/examples/java/src/main/java/org/apache/iggy/examples/messageheaders/producer/MessageHeadersProducer.java
@@ -17,16 +17,16 @@
  * under the License.
  */
 
-package org.apache.iggy.examples.messageenvelope.producer;
+package org.apache.iggy.examples.messageheaders.producer;
 
 import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
 import org.apache.iggy.examples.shared.Messages.SerializableMessage;
 import org.apache.iggy.examples.shared.MessagesGenerator;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.HeaderKind;
+import org.apache.iggy.message.HeaderValue;
 import org.apache.iggy.message.Message;
-import org.apache.iggy.message.MessageHeader;
-import org.apache.iggy.message.MessageId;
 import org.apache.iggy.message.Partitioning;
 import org.apache.iggy.stream.StreamDetails;
 import org.apache.iggy.topic.CompressionAlgorithm;
@@ -35,26 +35,28 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
-public final class MessageEnvelopeProducer {
-    private static final String STREAM_NAME = "envelope-stream";
+public final class MessageHeadersProducer {
+    private static final String STREAM_NAME = "headers-stream";
     private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
 
-    private static final String TOPIC_NAME = "envelope-topic";
+    private static final String TOPIC_NAME = "orders";
     private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
 
     private static final long PARTITION_ID = 0L;
     private static final int BATCHES_LIMIT = 10;
     private static final int MESSAGES_PER_BATCH = 1;
-    private static final long INTERVAL_MS = 1;
 
-    private static final Logger log = 
LoggerFactory.getLogger(MessageEnvelopeProducer.class);
+    private static final String MESSAGE_TYPE_HEADER = "message_type";
 
-    private MessageEnvelopeProducer() {}
+    private static final Logger log = 
LoggerFactory.getLogger(MessageHeadersProducer.class);
+
+    private MessageHeadersProducer() {}
 
     public static void main(String[] args) {
         var client = IggyTcpClient.builder()
@@ -92,42 +94,26 @@ public final class MessageEnvelopeProducer {
 
     public static void produceMessages(IggyTcpClient client) {
         log.info(
-                "Messages will be sent to stream: {}, topic: {}, partition: {} 
with interval {}ms.",
+                "Messages will be sent to stream: {}, topic: {}, partition: 
{}.",
                 STREAM_NAME,
                 TOPIC_NAME,
-                PARTITION_ID,
-                INTERVAL_MS);
+                PARTITION_ID);
 
         int sentBatches = 0;
         Partitioning partitioning = Partitioning.partitionId(PARTITION_ID);
         MessagesGenerator generator = new MessagesGenerator();
 
         while (sentBatches < BATCHES_LIMIT) {
-            try {
-                Thread.sleep(INTERVAL_MS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                break;
-            }
-
             List<Message> messages = new ArrayList<>();
             List<SerializableMessage> serializableMessages = new ArrayList<>();
 
             for (int i = 0; i < MESSAGES_PER_BATCH; i++) {
                 SerializableMessage serializableMessage = generator.generate();
-                String json = serializableMessage.toJsonEnvelope();
-                byte[] payload = json.getBytes(StandardCharsets.UTF_8);
-
-                MessageHeader header = new MessageHeader(
-                        BigInteger.ZERO,
-                        MessageId.serverGenerated(),
-                        BigInteger.ZERO,
-                        BigInteger.ZERO,
-                        BigInteger.ZERO,
-                        0L,
-                        (long) payload.length);
-                Message message = new Message(header, payload, 
Optional.empty());
-                messages.add(message);
+                String messageType = serializableMessage.getMessageType();
+                String json = serializableMessage.toJson();
+                Map<String, HeaderValue> userHeaders = new HashMap<>();
+                userHeaders.put(MESSAGE_TYPE_HEADER, new 
HeaderValue(HeaderKind.String, messageType));
+                messages.add(Message.of(json, userHeaders));
                 serializableMessages.add(serializableMessage);
             }
 
diff --git a/foreign/java/BUILD_AND_TEST.md b/foreign/java/BUILD_AND_TEST.md
index 5ccb8fca7..fff63a7cd 100644
--- a/foreign/java/BUILD_AND_TEST.md
+++ b/foreign/java/BUILD_AND_TEST.md
@@ -31,23 +31,23 @@ READ the README.md and start iggy-server as prescribed 
there.
 cd iggy/foreign/java/java-sdk
 
 # Clean previous builds
-../gradlew clean
+gradle clean
 
 # Build without running tests
-../gradlew build -x test
+gradle build -x test
 
 # Or build with tests (skip checkstyle)
-../gradlew build -x checkstyleMain -x checkstyleTest
+gradle build -x checkstyleMain -x checkstyleTest
 ```
 
 ### 2. Compile Only
 
 ```bash
 # Compile main source code
-../gradlew compileJava
+gradle compileJava
 
 # Compile test code
-../gradlew compileTestJava
+gradle compileTestJava
 ```
 
 ## Running Tests
@@ -56,24 +56,24 @@ cd iggy/foreign/java/java-sdk
 
 ```bash
 # Run all tests (skip checkstyle)
-../gradlew test -x checkstyleMain -x checkstyleTest
+gradle test -x checkstyleMain -x checkstyleTest
 ```
 
 ### 2. Run Specific Test Class
 
 ```bash
 # Run AsyncPollMessageTest specifically
-../gradlew test --tests AsyncPollMessageTest -x checkstyleMain -x 
checkstyleTest
+gradle test --tests AsyncPollMessageTest -x checkstyleMain -x checkstyleTest
 
 # Run with detailed output
-../gradlew test --tests AsyncPollMessageTest --info -x checkstyleMain -x 
checkstyleTest
+gradle test --tests AsyncPollMessageTest --info -x checkstyleMain -x 
checkstyleTest
 ```
 
 ### 3. Run Test Suite by Package
 
 ```bash
 # Run all async client tests
-../gradlew test --tests "org.apache.iggy.client.async.*" -x checkstyleMain -x 
checkstyleTest
+gradle test --tests "org.apache.iggy.client.async.*" -x checkstyleMain -x 
checkstyleTest
 ```
 
 ### 4. View Test Results
@@ -103,7 +103,7 @@ This test validates the async client's polling 
functionality and demonstrates im
 ```bash
 # Run with full output
 cd iggy/foreign/java/java-sdk
-../gradlew test --tests AsyncPollMessageTest -x checkstyleMain -x 
checkstyleTest --info
+gradle test --tests AsyncPollMessageTest -x checkstyleMain -x checkstyleTest 
--info
 ```
 
 Expected output:
@@ -220,17 +220,17 @@ vim src/main/java/org/apache/iggy/client/async/...
 ### 2. Compile
 
 ```bash
-../gradlew compileJava
+gradle compileJava
 ```
 
 ### 3. Test
 
 ```bash
 # Run specific test
-../gradlew test --tests AsyncPollMessageTest -x checkstyleMain -x 
checkstyleTest
+gradle test --tests AsyncPollMessageTest -x checkstyleMain -x checkstyleTest
 
 # Or run all async tests
-../gradlew test --tests "org.apache.iggy.client.async.*" -x checkstyleMain -x 
checkstyleTest
+gradle test --tests "org.apache.iggy.client.async.*" -x checkstyleMain -x 
checkstyleTest
 ```
 
 ### 4. View Results
@@ -261,19 +261,19 @@ When contributing changes:
 1. **Always compile after changes**
 
    ```bash
-   ../gradlew compileJava compileTestJava
+   gradle compileJava compileTestJava
    ```
 
 2. **Run relevant tests**
 
    ```bash
-   ../gradlew test --tests "*Test" -x checkstyleMain -x checkstyleTest
+   gradle test --tests "*Test" -x checkstyleMain -x checkstyleTest
    ```
 
 3. **Check for warnings**
 
    ```bash
-   ../gradlew build 2>&1 | grep -i warning
+   gradle build 2>&1 | grep -i warning
    ```
 
 4. **Clean up test files**
diff --git 
a/foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySinkWriter.java
 
b/foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySinkWriter.java
index 15cdbcff3..70458a1e3 100644
--- 
a/foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySinkWriter.java
+++ 
b/foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySinkWriter.java
@@ -37,6 +37,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -288,6 +289,6 @@ public class IggySinkWriter<T> implements SinkWriter<T> {
                 0L, // userHeadersLength
                 (long) payload.length); // payloadLength
 
-        return new Message(header, payload, Optional.empty());
+        return new Message(header, payload, Collections.emptyMap());
     }
 }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/ObjectMapperFactory.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/ObjectMapperFactory.java
index 324378780..609a3a919 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/ObjectMapperFactory.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/ObjectMapperFactory.java
@@ -19,18 +19,23 @@
 
 package org.apache.iggy.client.blocking.http;
 
+import com.fasterxml.jackson.annotation.JsonSetter;
+import com.fasterxml.jackson.annotation.Nulls;
 import tools.jackson.databind.DeserializationFeature;
 import tools.jackson.databind.MapperFeature;
 import tools.jackson.databind.ObjectMapper;
 import tools.jackson.databind.PropertyNamingStrategies;
 import tools.jackson.databind.json.JsonMapper;
 
+import java.util.Map;
+
 final class ObjectMapperFactory {
 
     private static final ObjectMapper INSTANCE = JsonMapper.builder()
             .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
             .enable(DeserializationFeature.FAIL_ON_NULL_CREATOR_PROPERTIES)
             .propertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
+            .withConfigOverride(Map.class, map -> 
map.setNullHandling(JsonSetter.Value.forValueNulls(Nulls.AS_EMPTY)))
             .build();
 
     private ObjectMapperFactory() {}
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
index 3f69ebcf8..4e78293a1 100644
--- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
+++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
@@ -20,26 +20,74 @@
 package org.apache.iggy.message;
 
 import java.math.BigInteger;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-public record Message(MessageHeader header, byte[] payload, 
Optional<Map<String, HeaderValue>> userHeaders) {
+public record Message(MessageHeader header, byte[] payload, Map<String, 
HeaderValue> userHeaders) {
 
     public static Message of(String payload) {
+        return of(payload, Collections.emptyMap());
+    }
+
+    public static Message of(String payload, Map<String, HeaderValue> 
userHeaders) {
         final byte[] payloadBytes = payload.getBytes();
+        final long userHeadersLength = getUserHeadersSize(userHeaders);
         final MessageHeader msgHeader = new MessageHeader(
                 BigInteger.ZERO,
                 MessageId.serverGenerated(),
                 BigInteger.ZERO,
                 BigInteger.ZERO,
                 BigInteger.ZERO,
-                0L,
+                userHeadersLength,
                 (long) payloadBytes.length);
-        return new Message(msgHeader, payloadBytes, Optional.empty());
+        return new Message(msgHeader, payloadBytes, userHeaders);
+    }
+
+    public Message withUserHeaders(Map<String, HeaderValue> userHeaders) {
+        Map<String, HeaderValue> mergedHeaders = mergeUserHeaders(userHeaders);
+        long userHeadersLength = getUserHeadersSize(mergedHeaders);
+        MessageHeader updatedHeader = new MessageHeader(
+                header.checksum(),
+                header.id(),
+                header.offset(),
+                header.timestamp(),
+                header.originTimestamp(),
+                userHeadersLength,
+                (long) payload.length);
+        return new Message(updatedHeader, payload, mergedHeaders);
     }
 
     public int getSize() {
-        // userHeaders is empty for now.
-        return MessageHeader.SIZE + payload.length;
+        long userHeadersLength = getUserHeadersSize(userHeaders);
+        return Math.toIntExact(MessageHeader.SIZE + payload.length + 
userHeadersLength);
+    }
+
+    private Map<String, HeaderValue> mergeUserHeaders(Map<String, HeaderValue> 
userHeaders) {
+        if (userHeaders.isEmpty()) {
+            return this.userHeaders;
+        }
+
+        if (this.userHeaders.isEmpty()) {
+            return userHeaders;
+        }
+
+        Map<String, HeaderValue> mergedHeaders = new 
HashMap<>(this.userHeaders);
+        mergedHeaders.putAll(userHeaders);
+        return mergedHeaders;
+    }
+
+    private static long getUserHeadersSize(Map<String, HeaderValue> 
userHeaders) {
+        if (userHeaders.isEmpty()) {
+            return 0L;
+        }
+
+        long size = 0L;
+        for (Map.Entry<String, HeaderValue> entry : userHeaders.entrySet()) {
+            byte[] keyBytes = entry.getKey().getBytes();
+            byte[] valueBytes = entry.getValue().value().getBytes();
+            size += 4L + keyBytes.length + 1L + 4L + valueBytes.length;
+        }
+        return size;
     }
 }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
index 58e384b1c..378b8930c 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
@@ -26,6 +26,8 @@ import org.apache.iggy.consumergroup.ConsumerGroupDetails;
 import org.apache.iggy.consumergroup.ConsumerGroupMember;
 import org.apache.iggy.consumeroffset.ConsumerOffsetInfo;
 import org.apache.iggy.message.BytesMessageId;
+import org.apache.iggy.message.HeaderKind;
+import org.apache.iggy.message.HeaderValue;
 import org.apache.iggy.message.Message;
 import org.apache.iggy.message.MessageHeader;
 import org.apache.iggy.message.PolledMessages;
@@ -194,8 +196,26 @@ public final class BytesDeserializer {
                 new MessageHeader(checksum, id, offset, timestamp, 
originTimestamp, userHeadersLength, payloadLength);
         var payload = newByteArray(payloadLength);
         response.readBytes(payload);
-        // TODO: Add support for user headers.
-        return new Message(header, payload, Optional.empty());
+        Map<String, HeaderValue> userHeaders = new HashMap<>();
+        if (userHeadersLength > 0) {
+            ByteBuf userHeadersBuffer = 
response.readSlice(toInt(userHeadersLength));
+            Map<String, HeaderValue> headers = new HashMap<>();
+            while (userHeadersBuffer.isReadable()) {
+                var userHeaderKeyLength = 
userHeadersBuffer.readUnsignedIntLE();
+                var userHeaderKey = userHeadersBuffer
+                        .readCharSequence(toInt(userHeaderKeyLength), 
StandardCharsets.UTF_8)
+                        .toString();
+                var userHeaderKindCode = userHeadersBuffer.readUnsignedByte();
+                var userHeaderValueLength = 
userHeadersBuffer.readUnsignedIntLE();
+                String userHeaderValue = userHeadersBuffer
+                        .readCharSequence(toInt(userHeaderValueLength), 
StandardCharsets.UTF_8)
+                        .toString();
+                headers.put(userHeaderKey, new 
HeaderValue(HeaderKind.fromCode(userHeaderKindCode), userHeaderValue));
+            }
+            userHeaders = headers;
+        }
+
+        return new Message(header, payload, userHeaders);
     }
 
     public static Stats readStats(ByteBuf response) {
@@ -406,7 +426,7 @@ public final class BytesDeserializer {
     }
 
     private static int toInt(Long size) {
-        return size.intValue();
+        return Math.toIntExact(size);
     }
 
     private static byte[] newByteArray(Long size) {
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
index 267de8491..936dd55d8 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
@@ -81,9 +81,10 @@ public final class BytesSerializer {
     }
 
     public static ByteBuf toBytes(Message message) {
-        var buffer = Unpooled.buffer(MessageHeader.SIZE + 
message.payload().length);
+        var buffer = Unpooled.buffer(message.getSize());
         buffer.writeBytes(toBytes(message.header()));
         buffer.writeBytes(message.payload());
+        buffer.writeBytes(toBytes(message.userHeaders()));
         return buffer;
     }
 

Reply via email to