This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch non_string_header_key
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/non_string_header_key by this
push:
new 1396a6519 Add remaining SDKs support
1396a6519 is described below
commit 1396a6519382d1967b60f77408be961076512702
Author: spetz <[email protected]>
AuthorDate: Thu Jan 29 16:32:03 2026 +0100
Add remaining SDKs support
---
.../FetchMessagesTests.cs | 4 +-
.../Fixtures/FetchMessagesFixture.cs | 4 +-
.../SendMessagesTests.cs | 8 +-
.../csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs | 23 +-
foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs | 68 +++--
foreign/csharp/Iggy_SDK/Iggy_SDK.csproj | 2 +-
.../Iggy_SDK/JsonConverters/HeaderKeyConverter.cs | 81 +++++-
foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs | 16 +-
.../send_messages_request_serializer_test.go | 4 +-
foreign/go/contracts/user_headers.go | 43 ++-
foreign/java/gradle.properties | 2 +-
.../java/org/apache/iggy/message/HeaderKey.java | 53 ++++
.../main/java/org/apache/iggy/message/Message.java | 70 +++--
.../org/apache/iggy/serde/BytesDeserializer.java | 282 +++++++++++++-------
.../org/apache/iggy/serde/BytesSerializer.java | 60 +++--
.../client/blocking/tcp/BytesSerializerTest.java | 218 +++++++++++-----
foreign/node/package.json | 2 +-
foreign/node/src/wire/message/header.utils.ts | 287 +++++++++++----------
18 files changed, 812 insertions(+), 415 deletions(-)
diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs
b/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs
index d3e2750c1..b5d0fae1e 100644
--- a/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs
@@ -105,8 +105,8 @@ public class FetchMessagesTests
{
responseMessage.UserHeaders.ShouldNotBeNull();
responseMessage.UserHeaders.Count.ShouldBe(2);
-
responseMessage.UserHeaders[HeaderKey.New("header1")].ToString().ShouldBe("value1");
-
responseMessage.UserHeaders[HeaderKey.New("header2")].ToInt32().ShouldBeGreaterThan(0);
+
responseMessage.UserHeaders[HeaderKey.FromString("header1")].ToString().ShouldBe("value1");
+
responseMessage.UserHeaders[HeaderKey.FromString("header2")].ToInt32().ShouldBeGreaterThan(0);
}
}
}
diff --git
a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs
b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs
index c8ec18148..e58e2742f 100644
--- a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs
@@ -95,8 +95,8 @@ public class FetchMessagesFixture : IAsyncInitializer
messages.Add(new Message(Guid.NewGuid(),
Encoding.UTF8.GetBytes(dummyJson),
new Dictionary<HeaderKey, HeaderValue>
{
- { HeaderKey.New("header1"),
HeaderValue.FromString("value1") },
- { HeaderKey.New("header2"), HeaderValue.FromInt32(14 + i) }
+ { HeaderKey.FromString("header1"),
HeaderValue.FromString("value1") },
+ { HeaderKey.FromString("header2"),
HeaderValue.FromInt32(14 + i) }
}));
}
diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/SendMessagesTests.cs
b/foreign/csharp/Iggy_SDK.Tests.Integration/SendMessagesTests.cs
index af465a28d..3973415d0 100644
--- a/foreign/csharp/Iggy_SDK.Tests.Integration/SendMessagesTests.cs
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/SendMessagesTests.cs
@@ -59,14 +59,14 @@ public class SendMessagesTests
new Message(Guid.NewGuid(), Encoding.UTF8.GetBytes(dummyJson),
new Dictionary<HeaderKey, HeaderValue>
{
- { HeaderKey.New("header1"),
HeaderValue.FromString("value1") },
- { HeaderKey.New("header2"), HeaderValue.FromInt32(444) }
+ { HeaderKey.FromString("header1"),
HeaderValue.FromString("value1") },
+ { HeaderKey.FromString("header2"),
HeaderValue.FromInt32(444) }
}),
new Message(Guid.NewGuid(), Encoding.UTF8.GetBytes(dummyJson),
new Dictionary<HeaderKey, HeaderValue>
{
- { HeaderKey.New("header1"),
HeaderValue.FromString("value1") },
- { HeaderKey.New("header2"), HeaderValue.FromInt32(444) }
+ { HeaderKey.FromString("header1"),
HeaderValue.FromString("value1") },
+ { HeaderKey.FromString("header2"),
HeaderValue.FromInt32(444) }
})
];
diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
index 7f576c891..2aa448893 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
@@ -554,7 +554,7 @@ internal static class TcpContracts
return [];
}
- var headersLength = headers.Sum(header => 4 + header.Key.Value.Length
+ 1 + 4 + header.Value.Value.Length);
+ var headersLength = headers.Sum(header => 1 + 4 +
header.Key.Value.Length + 1 + 4 + header.Value.Value.Length);
Span<byte> headersBytes = stackalloc byte[headersLength];
var position = 0;
foreach (var (headerKey, headerValue) in headers)
@@ -589,19 +589,22 @@ internal static class TcpContracts
private static byte[] GetBytesFromHeader(HeaderKey headerKey, HeaderValue
headerValue)
{
- var headerBytesLength = 4 + headerKey.Value.Length + 1 + 4 +
headerValue.Value.Length;
+ var headerBytesLength = 1 + 4 + headerKey.Value.Length + 1 + 4 +
headerValue.Value.Length;
Span<byte> headerBytes = stackalloc byte[headerBytesLength];
+ var pos = 0;
- BinaryPrimitives.WriteInt32LittleEndian(headerBytes[..4],
headerKey.Value.Length);
- var headerKeyBytes = Encoding.UTF8.GetBytes(headerKey.Value);
- headerKeyBytes.CopyTo(headerBytes[4..(4 + headerKey.Value.Length)]);
+ headerBytes[pos++] = HeaderKindToByte(headerKey.Kind);
- headerBytes[4 + headerKey.Value.Length] =
HeaderKindToByte(headerValue.Kind);
+ BinaryPrimitives.WriteInt32LittleEndian(headerBytes[pos..(pos + 4)],
headerKey.Value.Length);
+ pos += 4;
+ headerKey.Value.CopyTo(headerBytes[pos..(pos +
headerKey.Value.Length)]);
+ pos += headerKey.Value.Length;
- BinaryPrimitives.WriteInt32LittleEndian(
- headerBytes[(4 + headerKey.Value.Length + 1)..(4 +
headerKey.Value.Length + 1 + 4)],
- headerValue.Value.Length);
- headerValue.Value.CopyTo(headerBytes[(4 + headerKey.Value.Length + 1 +
4)..]);
+ headerBytes[pos++] = HeaderKindToByte(headerValue.Kind);
+
+ BinaryPrimitives.WriteInt32LittleEndian(headerBytes[pos..(pos + 4)],
headerValue.Value.Length);
+ pos += 4;
+ headerValue.Value.CopyTo(headerBytes[pos..]);
return headerBytes.ToArray();
}
diff --git a/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs
b/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs
index 084632929..208f97896 100644
--- a/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs
+++ b/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs
@@ -15,79 +15,75 @@
// specific language governing permissions and limitations
// under the License.
+using System.Text;
using System.Text.Json.Serialization;
using Apache.Iggy.JsonConverters;
namespace Apache.Iggy.Headers;
-/// <summary>
-/// A key for a header.
-/// </summary>
[JsonConverter(typeof(HeaderKeyConverter))]
public readonly struct HeaderKey : IEquatable<HeaderKey>
{
- /// <summary>
- /// Header key value.
- /// </summary>
- public required string Value { get; init; }
+ public required HeaderKind Kind { get; init; }
- /// <summary>
- /// Creates a new header key from a string.
- /// </summary>
- /// <param name="val">Key value</param>
- /// <returns></returns>
- /// <exception cref="ArgumentException"></exception>
- public static HeaderKey New(string val)
+ public required byte[] Value { get; init; }
+
+ public static HeaderKey FromString(string val)
{
+ if (val.Length is 0 or > 255)
+ {
+ throw new ArgumentException("Value has incorrect size, must be
between 1 and 255", nameof(val));
+ }
+
return new HeaderKey
{
- Value = val.Length is 0 or > 255
- ? throw new ArgumentException("Value has incorrect size, must
be between 1 and 255", nameof(val))
- : val
+ Kind = HeaderKind.String,
+ Value = Encoding.UTF8.GetBytes(val)
};
}
- /// <inheritdoc />
+ public string AsString()
+ {
+ if (Kind is not HeaderKind.String)
+ {
+ throw new InvalidOperationException("HeaderKey is not of String
kind");
+ }
+
+ return Encoding.UTF8.GetString(Value);
+ }
+
public override string ToString()
{
- return Value;
+ return Kind == HeaderKind.String ? Encoding.UTF8.GetString(Value) :
Convert.ToBase64String(Value);
}
- /// <inheritdoc />
public bool Equals(HeaderKey other)
{
- return StringComparer.Ordinal.Equals(Value, other.Value);
+ return Kind == other.Kind && Value.SequenceEqual(other.Value);
}
- /// <inheritdoc />
public override bool Equals(object? obj)
{
return obj is HeaderKey other && Equals(other);
}
- /// <inheritdoc />
public override int GetHashCode()
{
- return StringComparer.Ordinal.GetHashCode(Value);
+ var hash = new HashCode();
+ hash.Add(Kind);
+ foreach (var b in Value)
+ {
+ hash.Add(b);
+ }
+
+ return hash.ToHashCode();
}
- /// <summary>
- /// Determines whether two specified <see cref="HeaderKey" /> objects
are equal.
- /// </summary>
- /// <param name="left">The first <see cref="HeaderKey" /> to
compare.</param>
- /// <param name="right">The second <see cref="HeaderKey" /> to
compare.</param>
- /// <returns>True if the two <see cref="HeaderKey" /> objects are equal;
otherwise, false.</returns>
public static bool operator ==(HeaderKey left, HeaderKey right)
{
return left.Equals(right);
}
- /// <summary>
- /// Determines whether two specified <see cref="HeaderKey" /> objects
are not equal.
- /// </summary>
- /// <param name="left">The first <see cref="HeaderKey" /> to
compare.</param>
- /// <param name="right">The second <see cref="HeaderKey" /> to
compare.</param>
- /// <returns>True if the two <see cref="HeaderKey" /> objects are not
equal; otherwise, false.</returns>
public static bool operator !=(HeaderKey left, HeaderKey right)
{
return !left.Equals(right);
diff --git a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
index ffc1c58f7..cf7adcb5f 100644
--- a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
+++ b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
@@ -7,7 +7,7 @@
<TargetFrameworks>net8.0;net10.0</TargetFrameworks>
<AssemblyName>Apache.Iggy</AssemblyName>
<RootNamespace>Apache.Iggy</RootNamespace>
- <PackageVersion>0.6.1-edge.1</PackageVersion>
+ <PackageVersion>0.6.2-edge.1</PackageVersion>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
diff --git a/foreign/csharp/Iggy_SDK/JsonConverters/HeaderKeyConverter.cs
b/foreign/csharp/Iggy_SDK/JsonConverters/HeaderKeyConverter.cs
index 8221796d5..d6383165e 100644
--- a/foreign/csharp/Iggy_SDK/JsonConverters/HeaderKeyConverter.cs
+++ b/foreign/csharp/Iggy_SDK/JsonConverters/HeaderKeyConverter.cs
@@ -25,22 +25,95 @@ internal class HeaderKeyConverter : JsonConverter<HeaderKey>
{
public override HeaderKey Read(ref Utf8JsonReader reader, Type
typeToConvert, JsonSerializerOptions options)
{
- return HeaderKey.New(reader.GetString() ?? throw new
JsonException("Header key cannot be null or empty."));
+ if (reader.TokenType != JsonTokenType.StartObject)
+ {
+ throw new JsonException("Expected start of object for HeaderKey.");
+ }
+
+ HeaderKind? kind = null;
+ byte[]? value = null;
+
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndObject)
+ {
+ break;
+ }
+
+ if (reader.TokenType != JsonTokenType.PropertyName)
+ {
+ throw new JsonException("Expected property name.");
+ }
+
+ var propertyName = reader.GetString();
+ reader.Read();
+
+ switch (propertyName)
+ {
+ case "kind":
+ var kindStr = reader.GetString();
+ kind = kindStr switch
+ {
+ "raw" => HeaderKind.Raw,
+ "string" => HeaderKind.String,
+ "bool" => HeaderKind.Bool,
+ "int32" => HeaderKind.Int32,
+ "int64" => HeaderKind.Int64,
+ "int128" => HeaderKind.Int128,
+ "uint32" => HeaderKind.Uint32,
+ "uint64" => HeaderKind.Uint64,
+ "uint128" => HeaderKind.Uint128,
+ "float32" => HeaderKind.Float,
+ "float64" => HeaderKind.Double,
+ _ => throw new JsonException($"Unknown header kind:
{kindStr}")
+ };
+ break;
+ case "value":
+ var base64 = reader.GetString();
+ value = base64 is not null ?
Convert.FromBase64String(base64) : null;
+ break;
+ }
+ }
+
+ if (kind is null || value is null)
+ {
+ throw new JsonException("HeaderKey must have both 'kind' and
'value' properties.");
+ }
+
+ return new HeaderKey { Kind = kind.Value, Value = value };
}
public override void Write(Utf8JsonWriter writer, HeaderKey value,
JsonSerializerOptions options)
{
- writer.WriteStringValue(value.Value);
+ writer.WriteStartObject();
+ writer.WriteString("kind", value.Kind switch
+ {
+ HeaderKind.Raw => "raw",
+ HeaderKind.String => "string",
+ HeaderKind.Bool => "bool",
+ HeaderKind.Int32 => "int32",
+ HeaderKind.Int64 => "int64",
+ HeaderKind.Int128 => "int128",
+ HeaderKind.Uint32 => "uint32",
+ HeaderKind.Uint64 => "uint64",
+ HeaderKind.Uint128 => "uint128",
+ HeaderKind.Float => "float32",
+ HeaderKind.Double => "float64",
+ _ => throw new JsonException($"Unknown header kind: {value.Kind}")
+ });
+ writer.WriteString("value", Convert.ToBase64String(value.Value));
+ writer.WriteEndObject();
}
public override HeaderKey ReadAsPropertyName(ref Utf8JsonReader reader,
Type typeToConvert,
JsonSerializerOptions options)
{
- return HeaderKey.New(reader.GetString() ?? throw new
JsonException("Header key cannot be null or empty."));
+ var keyStr = reader.GetString() ?? throw new JsonException("Header key
cannot be null or empty.");
+ return HeaderKey.FromString(keyStr);
}
public override void WriteAsPropertyName(Utf8JsonWriter writer, HeaderKey
value, JsonSerializerOptions options)
{
- writer.WritePropertyName(value.Value);
+ writer.WritePropertyName(value.ToString());
}
}
diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
index 288a0fb5c..779ed45a2 100644
--- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
+++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
@@ -428,17 +428,22 @@ internal static class BinaryMapper
while (position < payload.Length)
{
+ var keyKind = MapHeaderKind(payload, position);
+ position++;
+
var keyLength =
BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
if (keyLength is 0 or > 255)
{
throw new ArgumentException("Key has incorrect size, must be
between 1 and 255", nameof(keyLength));
}
- var key = Encoding.UTF8.GetString(payload[(position +
4)..(position + 4 + keyLength)]);
- position += 4 + keyLength;
+ position += 4;
+ var keyValue = payload[position..(position + keyLength)].ToArray();
+ position += keyLength;
- var headerKind = MapHeaderKind(payload, position);
+ var valueKind = MapHeaderKind(payload, position);
position++;
+
var valueLength =
BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
if (valueLength is 0 or > 255)
{
@@ -448,9 +453,10 @@ internal static class BinaryMapper
position += 4;
ReadOnlySpan<byte> value = payload[position..(position +
valueLength)];
position += valueLength;
- headers.Add(HeaderKey.New(key), new HeaderValue
+
+ headers.Add(new HeaderKey { Kind = keyKind, Value = keyValue },
new HeaderValue
{
- Kind = headerKind,
+ Kind = valueKind,
Value = value.ToArray()
});
}
diff --git
a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
index 29807c343..5875f70c2 100644
--- a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
+++ b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
@@ -68,8 +68,8 @@ func TestSerialize_SendMessagesRequest(t *testing.T) {
func createDefaultMessageHeaders() map[iggcon.HeaderKey]iggcon.HeaderValue {
return map[iggcon.HeaderKey]iggcon.HeaderValue{
- {Value: "HeaderKey1"}: {Kind: iggcon.String, Value:
[]byte("Value 1")},
- {Value: "HeaderKey2"}: {Kind: iggcon.Uint32, Value:
[]byte{0x01, 0x02, 0x03, 0x04}},
+ {Kind: iggcon.String, Value: []byte("HeaderKey1")}: {Kind:
iggcon.String, Value: []byte("Value 1")},
+ {Kind: iggcon.String, Value: []byte("HeaderKey2")}: {Kind:
iggcon.Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}},
}
}
diff --git a/foreign/go/contracts/user_headers.go
b/foreign/go/contracts/user_headers.go
index 3349c895c..40dfec023 100644
--- a/foreign/go/contracts/user_headers.go
+++ b/foreign/go/contracts/user_headers.go
@@ -28,14 +28,15 @@ type HeaderValue struct {
}
type HeaderKey struct {
- Value string
+ Kind HeaderKind
+ Value []byte
}
-func NewHeaderKey(val string) (HeaderKey, error) {
+func NewHeaderKeyString(val string) (HeaderKey, error) {
if len(val) == 0 || len(val) > 255 {
return HeaderKey{}, errors.New("value has incorrect size, must
be between 1 and 255")
}
- return HeaderKey{Value: val}, nil
+ return HeaderKey{Kind: String, Value: []byte(val)}, nil
}
type HeaderKind int
@@ -61,7 +62,7 @@ const (
func GetHeadersBytes(headers map[HeaderKey]HeaderValue) []byte {
headersLength := 0
for key, header := range headers {
- headersLength += 4 + len(key.Value) + 1 + 4 + len(header.Value)
+ headersLength += 1 + 4 + len(key.Value) + 1 + 4 +
len(header.Value)
}
headersBytes := make([]byte, headersLength)
position := 0
@@ -74,16 +75,24 @@ func GetHeadersBytes(headers map[HeaderKey]HeaderValue)
[]byte {
}
func getBytesFromHeader(key HeaderKey, value HeaderValue) []byte {
- headerBytesLength := 4 + len(key.Value) + 1 + 4 + len(value.Value)
+ headerBytesLength := 1 + 4 + len(key.Value) + 1 + 4 + len(value.Value)
headerBytes := make([]byte, headerBytesLength)
+ pos := 0
- binary.LittleEndian.PutUint32(headerBytes[:4], uint32(len(key.Value)))
- copy(headerBytes[4:4+len(key.Value)], key.Value)
+ headerBytes[pos] = byte(key.Kind)
+ pos++
- headerBytes[4+len(key.Value)] = byte(value.Kind)
+ binary.LittleEndian.PutUint32(headerBytes[pos:pos+4],
uint32(len(key.Value)))
+ pos += 4
+ copy(headerBytes[pos:pos+len(key.Value)], key.Value)
+ pos += len(key.Value)
-
binary.LittleEndian.PutUint32(headerBytes[4+len(key.Value)+1:4+len(key.Value)+1+4],
uint32(len(value.Value)))
- copy(headerBytes[4+len(key.Value)+1+4:], value.Value)
+ headerBytes[pos] = byte(value.Kind)
+ pos++
+
+ binary.LittleEndian.PutUint32(headerBytes[pos:pos+4],
uint32(len(value.Value)))
+ pos += 4
+ copy(headerBytes[pos:], value.Value)
return headerBytes
}
@@ -93,6 +102,12 @@ func DeserializeHeaders(userHeadersBytes []byte)
(map[HeaderKey]HeaderValue, err
position := 0
for position < len(userHeadersBytes) {
+ keyKind, err := deserializeHeaderKind(userHeadersBytes,
position)
+ if err != nil {
+ return nil, err
+ }
+ position++
+
if len(userHeadersBytes) <= position+4 {
return nil, errors.New("invalid header key length")
}
@@ -107,10 +122,10 @@ func DeserializeHeaders(userHeadersBytes []byte)
(map[HeaderKey]HeaderValue, err
return nil, errors.New("invalid header key")
}
- key := string(userHeadersBytes[position :
position+int(keyLength)])
+ keyValue := userHeadersBytes[position : position+int(keyLength)]
position += int(keyLength)
- headerKind, err := deserializeHeaderKind(userHeadersBytes,
position)
+ valueKind, err := deserializeHeaderKind(userHeadersBytes,
position)
if err != nil {
return nil, err
}
@@ -134,8 +149,8 @@ func DeserializeHeaders(userHeadersBytes []byte)
(map[HeaderKey]HeaderValue, err
value := userHeadersBytes[position : position+int(valueLength)]
position += int(valueLength)
- headers[HeaderKey{Value: key}] = HeaderValue{
- Kind: headerKind,
+ headers[HeaderKey{Kind: keyKind, Value: keyValue}] =
HeaderValue{
+ Kind: valueKind,
Value: value,
}
}
diff --git a/foreign/java/gradle.properties b/foreign/java/gradle.properties
index 5ff676060..2942656f9 100644
--- a/foreign/java/gradle.properties
+++ b/foreign/java/gradle.properties
@@ -17,5 +17,5 @@
# under the License.
#
-version=0.6.1-SNAPSHOT
+version=0.6.2-SNAPSHOT
group=org.apache.iggy
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/HeaderKey.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/HeaderKey.java
new file mode 100644
index 000000000..d8c6e43b2
--- /dev/null
+++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/HeaderKey.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.message;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public record HeaderKey(HeaderKind kind, byte[] value) {
+
+ public static HeaderKey fromString(String val) {
+ if (val.isEmpty() || val.length() > 255) {
+ throw new IllegalArgumentException("Value has incorrect size, must
be between 1 and 255");
+ }
+ return new HeaderKey(HeaderKind.String,
val.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ HeaderKey headerKey = (HeaderKey) o;
+ return kind == headerKey.kind && Arrays.equals(value, headerKey.value);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = kind.hashCode();
+ result = 31 * result + Arrays.hashCode(value);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return new String(value, StandardCharsets.UTF_8);
+ }
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
index 4e78293a1..a568bc5bf 100644
--- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
+++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
@@ -24,46 +24,60 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-public record Message(MessageHeader header, byte[] payload, Map<String,
HeaderValue> userHeaders) {
-
+public record Message(
+ MessageHeader header,
+ byte[] payload,
+ Map<HeaderKey, HeaderValue> userHeaders
+) {
public static Message of(String payload) {
return of(payload, Collections.emptyMap());
}
- public static Message of(String payload, Map<String, HeaderValue>
userHeaders) {
+ public static Message of(
+ String payload,
+ Map<HeaderKey, HeaderValue> userHeaders
+ ) {
final byte[] payloadBytes = payload.getBytes();
final long userHeadersLength = getUserHeadersSize(userHeaders);
final MessageHeader msgHeader = new MessageHeader(
- BigInteger.ZERO,
- MessageId.serverGenerated(),
- BigInteger.ZERO,
- BigInteger.ZERO,
- BigInteger.ZERO,
- userHeadersLength,
- (long) payloadBytes.length);
+ BigInteger.ZERO,
+ MessageId.serverGenerated(),
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ userHeadersLength,
+ (long) payloadBytes.length
+ );
return new Message(msgHeader, payloadBytes, userHeaders);
}
- public Message withUserHeaders(Map<String, HeaderValue> userHeaders) {
- Map<String, HeaderValue> mergedHeaders = mergeUserHeaders(userHeaders);
+ public Message withUserHeaders(Map<HeaderKey, HeaderValue> userHeaders) {
+ Map<HeaderKey, HeaderValue> mergedHeaders = mergeUserHeaders(
+ userHeaders
+ );
long userHeadersLength = getUserHeadersSize(mergedHeaders);
MessageHeader updatedHeader = new MessageHeader(
- header.checksum(),
- header.id(),
- header.offset(),
- header.timestamp(),
- header.originTimestamp(),
- userHeadersLength,
- (long) payload.length);
+ header.checksum(),
+ header.id(),
+ header.offset(),
+ header.timestamp(),
+ header.originTimestamp(),
+ userHeadersLength,
+ (long) payload.length
+ );
return new Message(updatedHeader, payload, mergedHeaders);
}
public int getSize() {
long userHeadersLength = getUserHeadersSize(userHeaders);
- return Math.toIntExact(MessageHeader.SIZE + payload.length +
userHeadersLength);
+ return Math.toIntExact(
+ MessageHeader.SIZE + payload.length + userHeadersLength
+ );
}
- private Map<String, HeaderValue> mergeUserHeaders(Map<String, HeaderValue>
userHeaders) {
+ private Map<HeaderKey, HeaderValue> mergeUserHeaders(
+ Map<HeaderKey, HeaderValue> userHeaders
+ ) {
if (userHeaders.isEmpty()) {
return this.userHeaders;
}
@@ -72,21 +86,25 @@ public record Message(MessageHeader header, byte[] payload,
Map<String, HeaderVa
return userHeaders;
}
- Map<String, HeaderValue> mergedHeaders = new
HashMap<>(this.userHeaders);
+ Map<HeaderKey, HeaderValue> mergedHeaders = new HashMap<>(
+ this.userHeaders
+ );
mergedHeaders.putAll(userHeaders);
return mergedHeaders;
}
- private static long getUserHeadersSize(Map<String, HeaderValue>
userHeaders) {
+ private static long getUserHeadersSize(
+ Map<HeaderKey, HeaderValue> userHeaders
+ ) {
if (userHeaders.isEmpty()) {
return 0L;
}
long size = 0L;
- for (Map.Entry<String, HeaderValue> entry : userHeaders.entrySet()) {
- byte[] keyBytes = entry.getKey().getBytes();
+ for (Map.Entry<HeaderKey, HeaderValue> entry : userHeaders.entrySet())
{
+ byte[] keyBytes = entry.getKey().value();
byte[] valueBytes = entry.getValue().value().getBytes();
- size += 4L + keyBytes.length + 1L + 4L + valueBytes.length;
+ size += 1L + 4L + keyBytes.length + 1L + 4L + valueBytes.length;
}
return size;
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
index 378b8930c..884a53d00 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
@@ -20,12 +20,20 @@
package org.apache.iggy.serde;
import io.netty.buffer.ByteBuf;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.iggy.consumergroup.ConsumerGroup;
import org.apache.iggy.consumergroup.ConsumerGroupDetails;
import org.apache.iggy.consumergroup.ConsumerGroupMember;
import org.apache.iggy.consumeroffset.ConsumerOffsetInfo;
import org.apache.iggy.message.BytesMessageId;
+import org.apache.iggy.message.HeaderKey;
import org.apache.iggy.message.HeaderKind;
import org.apache.iggy.message.HeaderValue;
import org.apache.iggy.message.Message;
@@ -51,14 +59,6 @@ import org.apache.iggy.user.UserInfo;
import org.apache.iggy.user.UserInfoDetails;
import org.apache.iggy.user.UserStatus;
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
/**
* Unified deserializer for both blocking and async clients.
* Provides deserialization of ByteBuf to domain objects according to Iggy
wire protocol.
@@ -74,9 +74,18 @@ public final class BytesDeserializer {
var size = readU64AsBigInteger(response);
var messagesCount = readU64AsBigInteger(response);
var nameLength = response.readByte();
- var name = response.readCharSequence(nameLength,
StandardCharsets.UTF_8).toString();
-
- return new StreamBase(streamId, createdAt, name, size.toString(),
messagesCount, topicsCount);
+ var name = response
+ .readCharSequence(nameLength, StandardCharsets.UTF_8)
+ .toString();
+
+ return new StreamBase(
+ streamId,
+ createdAt,
+ name,
+ size.toString(),
+ messagesCount,
+ topicsCount
+ );
}
public static StreamDetails readStreamDetails(ByteBuf response) {
@@ -108,7 +117,14 @@ public final class BytesDeserializer {
var currentOffset = readU64AsBigInteger(response);
var size = readU64AsBigInteger(response);
var messagesCount = readU64AsBigInteger(response);
- return new Partition(partitionId, createdAt, segmentsCount,
currentOffset, size.toString(), messagesCount);
+ return new Partition(
+ partitionId,
+ createdAt,
+ segmentsCount,
+ currentOffset,
+ size.toString(),
+ messagesCount
+ );
}
public static Topic readTopic(ByteBuf response) {
@@ -122,21 +138,26 @@ public final class BytesDeserializer {
var size = readU64AsBigInteger(response);
var messagesCount = readU64AsBigInteger(response);
var nameLength = response.readByte();
- var name = response.readCharSequence(nameLength,
StandardCharsets.UTF_8).toString();
+ var name = response
+ .readCharSequence(nameLength, StandardCharsets.UTF_8)
+ .toString();
return new Topic(
- topicId,
- createdAt,
- name,
- size.toString(),
- messageExpiry,
- CompressionAlgorithm.fromCode(compressionAlgorithmCode),
- maxTopicSize,
- (short) replicationFactor,
- messagesCount,
- partitionsCount);
+ topicId,
+ createdAt,
+ name,
+ size.toString(),
+ messageExpiry,
+ CompressionAlgorithm.fromCode(compressionAlgorithmCode),
+ maxTopicSize,
+ (short) replicationFactor,
+ messagesCount,
+ partitionsCount
+ );
}
- public static ConsumerGroupDetails readConsumerGroupDetails(ByteBuf
response) {
+ public static ConsumerGroupDetails readConsumerGroupDetails(
+ ByteBuf response
+ ) {
var consumerGroup = readConsumerGroup(response);
List<ConsumerGroupMember> members = new ArrayList<>();
@@ -147,7 +168,9 @@ public final class BytesDeserializer {
return new ConsumerGroupDetails(consumerGroup, members);
}
- public static ConsumerGroupMember readConsumerGroupMember(ByteBuf
response) {
+ public static ConsumerGroupMember readConsumerGroupMember(
+ ByteBuf response
+ ) {
var memberId = response.readUnsignedIntLE();
var partitionsCount = response.readUnsignedIntLE();
List<Long> partitionIds = new ArrayList<>();
@@ -162,7 +185,9 @@ public final class BytesDeserializer {
var partitionsCount = response.readUnsignedIntLE();
var membersCount = response.readUnsignedIntLE();
var nameLength = response.readByte();
- var name = response.readCharSequence(nameLength,
StandardCharsets.UTF_8).toString();
+ var name = response
+ .readCharSequence(nameLength, StandardCharsets.UTF_8)
+ .toString();
return new ConsumerGroup(groupId, name, partitionsCount, membersCount);
}
@@ -181,7 +206,12 @@ public final class BytesDeserializer {
while (response.isReadable()) {
messages.add(readPolledMessage(response));
}
- return new PolledMessages(partitionId, currentOffset, messagesCount,
messages);
+ return new PolledMessages(
+ partitionId,
+ currentOffset,
+ messagesCount,
+ messages
+ );
}
public static Message readPolledMessage(ByteBuf response) {
@@ -192,25 +222,53 @@ public final class BytesDeserializer {
var originTimestamp = readU64AsBigInteger(response);
var userHeadersLength = response.readUnsignedIntLE();
var payloadLength = response.readUnsignedIntLE();
- var header =
- new MessageHeader(checksum, id, offset, timestamp,
originTimestamp, userHeadersLength, payloadLength);
+ var header = new MessageHeader(
+ checksum,
+ id,
+ offset,
+ timestamp,
+ originTimestamp,
+ userHeadersLength,
+ payloadLength
+ );
var payload = newByteArray(payloadLength);
response.readBytes(payload);
- Map<String, HeaderValue> userHeaders = new HashMap<>();
+ Map<HeaderKey, HeaderValue> userHeaders = new HashMap<>();
if (userHeadersLength > 0) {
- ByteBuf userHeadersBuffer =
response.readSlice(toInt(userHeadersLength));
- Map<String, HeaderValue> headers = new HashMap<>();
+ ByteBuf userHeadersBuffer = response.readSlice(
+ toInt(userHeadersLength)
+ );
+ Map<HeaderKey, HeaderValue> headers = new HashMap<>();
while (userHeadersBuffer.isReadable()) {
+ var userHeaderKeyKindCode =
+ userHeadersBuffer.readUnsignedByte();
var userHeaderKeyLength =
userHeadersBuffer.readUnsignedIntLE();
- var userHeaderKey = userHeadersBuffer
- .readCharSequence(toInt(userHeaderKeyLength),
StandardCharsets.UTF_8)
- .toString();
- var userHeaderKindCode = userHeadersBuffer.readUnsignedByte();
- var userHeaderValueLength =
userHeadersBuffer.readUnsignedIntLE();
+ byte[] userHeaderKeyBytes = new byte[toInt(
+ userHeaderKeyLength
+ )];
+ userHeadersBuffer.readBytes(userHeaderKeyBytes);
+ var userHeaderKey = new HeaderKey(
+ HeaderKind.fromCode(userHeaderKeyKindCode),
+ userHeaderKeyBytes
+ );
+
+ var userHeaderValueKindCode =
+ userHeadersBuffer.readUnsignedByte();
+ var userHeaderValueLength =
+ userHeadersBuffer.readUnsignedIntLE();
String userHeaderValue = userHeadersBuffer
- .readCharSequence(toInt(userHeaderValueLength),
StandardCharsets.UTF_8)
- .toString();
- headers.put(userHeaderKey, new
HeaderValue(HeaderKind.fromCode(userHeaderKindCode), userHeaderValue));
+ .readCharSequence(
+ toInt(userHeaderValueLength),
+ StandardCharsets.UTF_8
+ )
+ .toString();
+ headers.put(
+ userHeaderKey,
+ new HeaderValue(
+ HeaderKind.fromCode(userHeaderValueKindCode),
+ userHeaderValue
+ )
+ );
}
userHeaders = headers;
}
@@ -238,41 +296,49 @@ public final class BytesDeserializer {
var clientsCount = response.readUnsignedIntLE();
var consumerGroupsCount = response.readUnsignedIntLE();
var hostnameLength = response.readUnsignedIntLE();
- var hostname = response.readCharSequence(toInt(hostnameLength),
StandardCharsets.UTF_8)
- .toString();
+ var hostname = response
+ .readCharSequence(toInt(hostnameLength), StandardCharsets.UTF_8)
+ .toString();
var osNameLength = response.readUnsignedIntLE();
- var osName = response.readCharSequence(toInt(osNameLength),
StandardCharsets.UTF_8)
- .toString();
+ var osName = response
+ .readCharSequence(toInt(osNameLength), StandardCharsets.UTF_8)
+ .toString();
var osVersionLength = response.readUnsignedIntLE();
- var osVersion = response.readCharSequence(toInt(osVersionLength),
StandardCharsets.UTF_8)
- .toString();
+ var osVersion = response
+ .readCharSequence(toInt(osVersionLength), StandardCharsets.UTF_8)
+ .toString();
var kernelVersionLength = response.readUnsignedIntLE();
- var kernelVersion =
response.readCharSequence(toInt(kernelVersionLength), StandardCharsets.UTF_8)
- .toString();
+ var kernelVersion = response
+ .readCharSequence(
+ toInt(kernelVersionLength),
+ StandardCharsets.UTF_8
+ )
+ .toString();
return new Stats(
- processId,
- cpuUsage,
- totalCpuUsage,
- memoryUsage.toString(),
- totalMemory.toString(),
- availableMemory.toString(),
- runTime,
- startTime,
- readBytes.toString(),
- writtenBytes.toString(),
- messagesSizeBytes.toString(),
- streamsCount,
- topicsCount,
- partitionsCount,
- segmentsCount,
- messagesCount,
- clientsCount,
- consumerGroupsCount,
- hostname,
- osName,
- osVersion,
- kernelVersion);
+ processId,
+ cpuUsage,
+ totalCpuUsage,
+ memoryUsage.toString(),
+ totalMemory.toString(),
+ availableMemory.toString(),
+ runTime,
+ startTime,
+ readBytes.toString(),
+ writtenBytes.toString(),
+ messagesSizeBytes.toString(),
+ streamsCount,
+ topicsCount,
+ partitionsCount,
+ segmentsCount,
+ messagesCount,
+ clientsCount,
+ consumerGroupsCount,
+ hostname,
+ osName,
+ osVersion,
+ kernelVersion
+ );
}
public static ClientInfoDetails readClientInfoDetails(ByteBuf response) {
@@ -298,10 +364,17 @@ public final class BytesDeserializer {
transportString = "Quic";
}
var addressLength = response.readUnsignedIntLE();
- var address = response.readCharSequence(toInt(addressLength),
StandardCharsets.UTF_8)
- .toString();
+ var address = response
+ .readCharSequence(toInt(addressLength), StandardCharsets.UTF_8)
+ .toString();
var consumerGroupsCount = response.readUnsignedIntLE();
- return new ClientInfo(clientId, userIdOptional, address,
transportString, consumerGroupsCount);
+ return new ClientInfo(
+ clientId,
+ userIdOptional,
+ address,
+ transportString,
+ consumerGroupsCount
+ );
}
public static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) {
@@ -350,7 +423,14 @@ public final class BytesDeserializer {
topicPermissionsMap.put(topicId, topicPermissions);
}
return new StreamPermissions(
- manageStream, readStream, manageTopics, readTopics,
pollMessages, sendMessages, topicPermissionsMap);
+ manageStream,
+ readStream,
+ manageTopics,
+ readTopics,
+ pollMessages,
+ sendMessages,
+ topicPermissionsMap
+ );
}
public static TopicPermissions readTopicPermissions(ByteBuf response) {
@@ -358,7 +438,12 @@ public final class BytesDeserializer {
var readTopic = response.readBoolean();
var pollMessages = response.readBoolean();
var sendMessages = response.readBoolean();
- return new TopicPermissions(manageTopic, readTopic, pollMessages,
sendMessages);
+ return new TopicPermissions(
+ manageTopic,
+ readTopic,
+ pollMessages,
+ sendMessages
+ );
}
public static GlobalPermissions readGlobalPermissions(ByteBuf response) {
@@ -373,16 +458,17 @@ public final class BytesDeserializer {
var pollMessages = response.readBoolean();
var sendMessages = response.readBoolean();
return new GlobalPermissions(
- manageServers,
- readServers,
- manageUsers,
- readUsers,
- manageStreams,
- readStreams,
- manageTopics,
- readTopics,
- pollMessages,
- sendMessages);
+ manageServers,
+ readServers,
+ manageUsers,
+ readUsers,
+ manageStreams,
+ readStreams,
+ manageTopics,
+ readTopics,
+ pollMessages,
+ sendMessages
+ );
}
public static UserInfo readUserInfo(ByteBuf response) {
@@ -391,23 +477,33 @@ public final class BytesDeserializer {
var statusCode = response.readByte();
var status = UserStatus.fromCode(statusCode);
var usernameLength = response.readByte();
- var username = response.readCharSequence(usernameLength,
StandardCharsets.UTF_8)
- .toString();
+ var username = response
+ .readCharSequence(usernameLength, StandardCharsets.UTF_8)
+ .toString();
return new UserInfo(userId, createdAt, status, username);
}
- public static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf
response) {
+ public static RawPersonalAccessToken readRawPersonalAccessToken(
+ ByteBuf response
+ ) {
var tokenLength = response.readByte();
- var token =
- response.readCharSequence(tokenLength,
StandardCharsets.UTF_8).toString();
+ var token = response
+ .readCharSequence(tokenLength, StandardCharsets.UTF_8)
+ .toString();
return new RawPersonalAccessToken(token);
}
- public static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf
response) {
+ public static PersonalAccessTokenInfo readPersonalAccessTokenInfo(
+ ByteBuf response
+ ) {
var nameLength = response.readByte();
- var name = response.readCharSequence(nameLength,
StandardCharsets.UTF_8).toString();
+ var name = response
+ .readCharSequence(nameLength, StandardCharsets.UTF_8)
+ .toString();
var expiry = readU64AsBigInteger(response);
- Optional<BigInteger> expiryOptional = expiry.equals(BigInteger.ZERO) ?
Optional.empty() : Optional.of(expiry);
+ Optional<BigInteger> expiryOptional = expiry.equals(BigInteger.ZERO)
+ ? Optional.empty()
+ : Optional.of(expiry);
return new PersonalAccessTokenInfo(name, expiryOptional);
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
index 4014f3914..37ad449de 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
@@ -21,9 +21,14 @@ package org.apache.iggy.serde;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Optional;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.identifier.Identifier;
+import org.apache.iggy.message.HeaderKey;
import org.apache.iggy.message.HeaderValue;
import org.apache.iggy.message.Message;
import org.apache.iggy.message.MessageHeader;
@@ -34,11 +39,6 @@ import org.apache.iggy.user.Permissions;
import org.apache.iggy.user.StreamPermissions;
import org.apache.iggy.user.TopicPermissions;
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.Optional;
-
/**
* Unified serializer for both blocking and async clients.
* Provides serialization of domain objects to ByteBuf according to Iggy wire
protocol.
@@ -68,7 +68,9 @@ public final class BytesSerializer {
buffer.writeBytes(identifier.getName().getBytes());
return buffer;
} else {
- throw new IllegalArgumentException("Unknown identifier kind: " +
identifier.getKind());
+ throw new IllegalArgumentException(
+ "Unknown identifier kind: " + identifier.getKind()
+ );
}
}
@@ -119,15 +121,16 @@ public final class BytesSerializer {
return buffer;
}
- public static ByteBuf toBytes(Map<String, HeaderValue> headers) {
+ public static ByteBuf toBytes(Map<HeaderKey, HeaderValue> headers) {
if (headers.isEmpty()) {
return Unpooled.EMPTY_BUFFER;
}
var buffer = Unpooled.buffer();
- for (Map.Entry<String, HeaderValue> entry : headers.entrySet()) {
- String key = entry.getKey();
- buffer.writeIntLE(key.length());
- buffer.writeBytes(key.getBytes());
+ for (Map.Entry<HeaderKey, HeaderValue> entry : headers.entrySet()) {
+ HeaderKey key = entry.getKey();
+ buffer.writeByte(key.kind().asCode());
+ buffer.writeIntLE(key.value().length);
+ buffer.writeBytes(key.value());
HeaderValue value = entry.getValue();
buffer.writeByte(value.kind().asCode());
@@ -143,8 +146,9 @@ public final class BytesSerializer {
if (permissions.streams().isEmpty()) {
buffer.writeByte(0);
} else {
- for (Map.Entry<Long, StreamPermissions> entry :
- permissions.streams().entrySet()) {
+ for (Map.Entry<Long, StreamPermissions> entry : permissions
+ .streams()
+ .entrySet()) {
buffer.writeByte(1);
buffer.writeIntLE(entry.getKey().intValue());
buffer.writeBytes(toBytes(entry.getValue()));
@@ -182,7 +186,9 @@ public final class BytesSerializer {
if (permissions.topics().isEmpty()) {
buffer.writeByte(0);
} else {
- for (Map.Entry<Long, TopicPermissions> entry :
permissions.topics().entrySet()) {
+ for (Map.Entry<Long, TopicPermissions> entry : permissions
+ .topics()
+ .entrySet()) {
buffer.writeByte(1);
buffer.writeIntLE(entry.getKey().intValue());
buffer.writeBytes(toBytes(entry.getValue()));
@@ -212,12 +218,19 @@ public final class BytesSerializer {
public static ByteBuf toBytesAsU64(BigInteger value) {
if (value.signum() == -1) {
- throw new IllegalArgumentException("Negative value cannot be
serialized to unsigned 64: " + value);
+ throw new IllegalArgumentException(
+ "Negative value cannot be serialized to unsigned 64: " + value
+ );
}
ByteBuf buffer = Unpooled.buffer(8, 8);
byte[] valueAsBytes = value.toByteArray();
- if (valueAsBytes.length > 9 || valueAsBytes.length == 9 &&
valueAsBytes[0] != 0) {
- throw new IllegalArgumentException("Value too large for U64: " +
value);
+ if (
+ valueAsBytes.length > 9 ||
+ (valueAsBytes.length == 9 && valueAsBytes[0] != 0)
+ ) {
+ throw new IllegalArgumentException(
+ "Value too large for U64: " + value
+ );
}
ArrayUtils.reverse(valueAsBytes);
buffer.writeBytes(valueAsBytes, 0, Math.min(8, valueAsBytes.length));
@@ -229,12 +242,19 @@ public final class BytesSerializer {
public static ByteBuf toBytesAsU128(BigInteger value) {
if (value.signum() == -1) {
- throw new IllegalArgumentException("Negative value cannot be
serialized to unsigned 128: " + value);
+ throw new IllegalArgumentException(
+ "Negative value cannot be serialized to unsigned 128: " + value
+ );
}
ByteBuf buffer = Unpooled.buffer(16, 16);
byte[] valueAsBytes = value.toByteArray();
- if (valueAsBytes.length > 17 || valueAsBytes.length == 17 &&
valueAsBytes[0] != 0) {
- throw new IllegalArgumentException("Value too large for U128: " +
value);
+ if (
+ valueAsBytes.length > 17 ||
+ (valueAsBytes.length == 17 && valueAsBytes[0] != 0)
+ ) {
+ throw new IllegalArgumentException(
+ "Value too large for U128: " + value
+ );
}
ArrayUtils.reverse(valueAsBytes);
buffer.writeBytes(valueAsBytes, 0, Math.min(16, valueAsBytes.length));
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
index 224d5e7ec..c53a6c8a3 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
@@ -19,12 +19,22 @@
package org.apache.iggy.client.blocking.tcp;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.identifier.ConsumerId;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.message.BytesMessageId;
+import org.apache.iggy.message.HeaderKey;
import org.apache.iggy.message.HeaderKind;
import org.apache.iggy.message.HeaderValue;
import org.apache.iggy.message.Message;
@@ -39,16 +49,6 @@ import org.apache.iggy.user.TopicPermissions;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
class BytesSerializerTest {
@Nested
@@ -65,7 +65,9 @@ class BytesSerializerTest {
ByteBuf bytesAsU64 = BytesSerializer.toBytesAsU64(value);
// then
-
assertThat(bytesAsU64).isEqualByComparingTo(Unpooled.copyLong(maxLong));
+ assertThat(bytesAsU64).isEqualByComparingTo(
+ Unpooled.copyLong(maxLong)
+ );
}
@Test
@@ -88,7 +90,9 @@ class BytesSerializerTest {
var value = BigInteger.valueOf(-1);
// when & then
- assertThatThrownBy(() ->
BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() ->
+ BytesSerializer.toBytesAsU64(value)
+ ).isInstanceOf(IllegalArgumentException.class);
}
@Test
@@ -99,7 +103,9 @@ class BytesSerializerTest {
var value = maxU64.add(BigInteger.ONE);
// when & then
- assertThatThrownBy(() ->
BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() ->
+ BytesSerializer.toBytesAsU64(value)
+ ).isInstanceOf(IllegalArgumentException.class);
}
}
@@ -118,7 +124,9 @@ class BytesSerializerTest {
ByteBuf bytesAsU128 = BytesSerializer.toBytesAsU128(value);
// then
-
assertThat(bytesAsU128).isEqualByComparingTo(Unpooled.wrappedBuffer(maxU128, 1,
16));
+ assertThat(bytesAsU128).isEqualByComparingTo(
+ Unpooled.wrappedBuffer(maxU128, 1, 16)
+ );
}
@Test
@@ -141,7 +149,9 @@ class BytesSerializerTest {
var value = BigInteger.valueOf(-1);
// when & then
- assertThatThrownBy(() ->
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() ->
+ BytesSerializer.toBytesAsU128(value)
+ ).isInstanceOf(IllegalArgumentException.class);
}
@Test
@@ -153,7 +163,9 @@ class BytesSerializerTest {
var value = maxU128Value.add(BigInteger.ONE);
// when & then
- assertThatThrownBy(() ->
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() ->
+ BytesSerializer.toBytesAsU128(value)
+ ).isInstanceOf(IllegalArgumentException.class);
}
}
@@ -172,7 +184,9 @@ class BytesSerializerTest {
assertThat(result.readByte()).isEqualTo((byte) 4); // length
byte[] stringBytes = new byte[4];
result.readBytes(stringBytes);
- assertThat(new String(stringBytes,
StandardCharsets.UTF_8)).isEqualTo("test");
+ assertThat(
+ new String(stringBytes, StandardCharsets.UTF_8)
+ ).isEqualTo("test");
}
@Test
@@ -198,7 +212,9 @@ class BytesSerializerTest {
// then
byte[] expectedBytes = input.getBytes(StandardCharsets.UTF_8);
- assertThat(result.readByte()).isEqualTo((byte)
expectedBytes.length);
+ assertThat(result.readByte()).isEqualTo(
+ (byte) expectedBytes.length
+ );
byte[] stringBytes = new byte[expectedBytes.length];
result.readBytes(stringBytes);
assertThat(stringBytes).isEqualTo(expectedBytes);
@@ -393,7 +409,9 @@ class BytesSerializerTest {
@Test
void shouldSerializeTimestampStrategy() {
// given
- var strategy =
PollingStrategy.timestamp(BigInteger.valueOf(1234567890));
+ var strategy = PollingStrategy.timestamp(
+ BigInteger.valueOf(1234567890)
+ );
// when
ByteBuf result = BytesSerializer.toBytes(strategy);
@@ -440,7 +458,7 @@ class BytesSerializerTest {
@Test
void shouldSerializeEmptyHeaders() {
// given
- Map<String, HeaderValue> headers = new HashMap<>();
+ Map<HeaderKey, HeaderValue> headers = new HashMap<>();
// when
ByteBuf result = BytesSerializer.toBytes(headers);
@@ -452,18 +470,22 @@ class BytesSerializerTest {
@Test
void shouldSerializeSingleHeader() {
// given
- Map<String, HeaderValue> headers = new HashMap<>();
- headers.put("key1", new HeaderValue(HeaderKind.Raw, "value1"));
+ Map<HeaderKey, HeaderValue> headers = new HashMap<>();
+ headers.put(
+ HeaderKey.fromString("key1"),
+ new HeaderValue(HeaderKind.Raw, "value1")
+ );
// when
ByteBuf result = BytesSerializer.toBytes(headers);
// then
+ assertThat(result.readByte()).isEqualTo((byte) 2); // String kind
for key
assertThat(result.readIntLE()).isEqualTo(4); // "key1".length()
byte[] keyBytes = new byte[4];
result.readBytes(keyBytes);
assertThat(new String(keyBytes)).isEqualTo("key1");
- assertThat(result.readByte()).isEqualTo((byte) 1); // Raw kind
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Raw kind for
value
assertThat(result.readIntLE()).isEqualTo(6); // "value1".length()
byte[] valueBytes = new byte[6];
result.readBytes(valueBytes);
@@ -473,15 +495,21 @@ class BytesSerializerTest {
@Test
void shouldSerializeMultipleHeaders() {
// given
- Map<String, HeaderValue> headers = new HashMap<>();
- headers.put("k1", new HeaderValue(HeaderKind.Raw, "v1")); // 13
bytes
- headers.put("k2", new HeaderValue(HeaderKind.String, "v2")); // 13
bytes
+ Map<HeaderKey, HeaderValue> headers = new HashMap<>();
+ headers.put(
+ HeaderKey.fromString("k1"),
+ new HeaderValue(HeaderKind.Raw, "v1")
+ ); // 1+4+2+1+4+2 = 14 bytes
+ headers.put(
+ HeaderKey.fromString("k2"),
+ new HeaderValue(HeaderKind.String, "v2")
+ ); // 1+4+2+1+4+2 = 14 bytes
// when
ByteBuf result = BytesSerializer.toBytes(headers);
// then - verify buffer contains data for both headers
- assertThat(result.readableBytes()).isEqualTo(26);
+ assertThat(result.readableBytes()).isEqualTo(28);
}
}
@@ -493,14 +521,14 @@ class BytesSerializerTest {
// given
var messageId = new BytesMessageId(new byte[16]);
var header = new MessageHeader(
- BigInteger.valueOf(123), // checksum
- messageId,
- BigInteger.valueOf(0), // offset
- BigInteger.valueOf(1000), // timestamp
- BigInteger.valueOf(1000), // originTimestamp
- 0L, // userHeadersLength
- 5L // payloadLength
- );
+ BigInteger.valueOf(123), // checksum
+ messageId,
+ BigInteger.valueOf(0), // offset
+ BigInteger.valueOf(1000), // timestamp
+ BigInteger.valueOf(1000), // originTimestamp
+ 0L, // userHeadersLength
+ 5L // payloadLength
+ );
byte[] payload = "hello".getBytes();
var message = new Message(header, payload, new HashMap<>());
@@ -508,29 +536,34 @@ class BytesSerializerTest {
ByteBuf result = BytesSerializer.toBytes(message);
// then
- assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE +
5); // header + payload, no user headers
+ assertThat(result.readableBytes()).isEqualTo(
+ MessageHeader.SIZE + 5
+ ); // header + payload, no user headers
}
@Test
void shouldSerializeMessageWithUserHeaders() {
// given
var messageId = new BytesMessageId(new byte[16]);
- Map<String, HeaderValue> userHeaders = new HashMap<>();
- userHeaders.put("key", new HeaderValue(HeaderKind.Raw, "val"));
+ Map<HeaderKey, HeaderValue> userHeaders = new HashMap<>();
+ userHeaders.put(
+ HeaderKey.fromString("key"),
+ new HeaderValue(HeaderKind.Raw, "val")
+ );
// Calculate user headers size
ByteBuf headersBuf = BytesSerializer.toBytes(userHeaders);
int userHeadersLength = headersBuf.readableBytes();
var header = new MessageHeader(
- BigInteger.ZERO,
- messageId,
- BigInteger.ZERO,
- BigInteger.valueOf(1000),
- BigInteger.valueOf(1000),
- (long) userHeadersLength,
- 3L // "abc".length()
- );
+ BigInteger.ZERO,
+ messageId,
+ BigInteger.ZERO,
+ BigInteger.valueOf(1000),
+ BigInteger.valueOf(1000),
+ (long) userHeadersLength,
+ 3L // "abc".length()
+ );
byte[] payload = "abc".getBytes();
var message = new Message(header, payload, userHeaders);
@@ -538,7 +571,9 @@ class BytesSerializerTest {
ByteBuf result = BytesSerializer.toBytes(message);
// then
- assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE +
3 + userHeadersLength);
+ assertThat(result.readableBytes()).isEqualTo(
+ MessageHeader.SIZE + 3 + userHeadersLength
+ );
}
}
@@ -550,14 +585,14 @@ class BytesSerializerTest {
// given
var messageId = new BytesMessageId(new byte[16]);
var header = new MessageHeader(
- BigInteger.valueOf(999), // checksum
- messageId,
- BigInteger.valueOf(42), // offset
- BigInteger.valueOf(2000), // timestamp
- BigInteger.valueOf(1999), // originTimestamp
- 10L, // userHeadersLength
- 100L // payloadLength
- );
+ BigInteger.valueOf(999), // checksum
+ messageId,
+ BigInteger.valueOf(42), // offset
+ BigInteger.valueOf(2000), // timestamp
+ BigInteger.valueOf(1999), // originTimestamp
+ 10L, // userHeadersLength
+ 100L // payloadLength
+ );
// when
ByteBuf result = BytesSerializer.toBytes(header);
@@ -587,7 +622,18 @@ class BytesSerializerTest {
@Test
void shouldSerializeGlobalPermissions() {
// given
- var permissions = new GlobalPermissions(true, false, true, false,
true, false, true, false, true, false);
+ var permissions = new GlobalPermissions(
+ true,
+ false,
+ true,
+ false,
+ true,
+ false,
+ true,
+ false,
+ true,
+ false
+ );
// when
ByteBuf result = BytesSerializer.toBytes(permissions);
@@ -625,7 +671,15 @@ class BytesSerializerTest {
@Test
void shouldSerializeStreamPermissionsWithoutTopics() {
// given
- var permissions = new StreamPermissions(true, false, true, false,
true, false, new HashMap<>());
+ var permissions = new StreamPermissions(
+ true,
+ false,
+ true,
+ false,
+ true,
+ false,
+ new HashMap<>()
+ );
// when
ByteBuf result = BytesSerializer.toBytes(permissions);
@@ -645,7 +699,15 @@ class BytesSerializerTest {
// given
Map<Long, TopicPermissions> topicPerms = new HashMap<>();
topicPerms.put(1L, new TopicPermissions(true, true, true, true));
- var permissions = new StreamPermissions(true, true, true, true,
true, true, topicPerms);
+ var permissions = new StreamPermissions(
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ topicPerms
+ );
// when
ByteBuf result = BytesSerializer.toBytes(permissions);
@@ -661,7 +723,18 @@ class BytesSerializerTest {
@Test
void shouldSerializeFullPermissionsWithoutStreams() {
// given
- var globalPerms = new GlobalPermissions(true, true, true, true,
true, true, true, true, true, true);
+ var globalPerms = new GlobalPermissions(
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ true
+ );
var permissions = new Permissions(globalPerms, new HashMap<>());
// when
@@ -675,10 +748,31 @@ class BytesSerializerTest {
@Test
void shouldSerializeFullPermissionsWithStreams() {
// given
- var globalPerms =
- new GlobalPermissions(false, false, false, false, false,
false, false, false, false, false);
+ var globalPerms = new GlobalPermissions(
+ false,
+ false,
+ false,
+ false,
+ false,
+ false,
+ false,
+ false,
+ false,
+ false
+ );
Map<Long, StreamPermissions> streamPerms = new HashMap<>();
- streamPerms.put(1L, new StreamPermissions(true, true, true, true,
true, true, new HashMap<>()));
+ streamPerms.put(
+ 1L,
+ new StreamPermissions(
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ new HashMap<>()
+ )
+ );
var permissions = new Permissions(globalPerms, streamPerms);
// when
diff --git a/foreign/node/package.json b/foreign/node/package.json
index 8d2eb542b..e04752b5a 100644
--- a/foreign/node/package.json
+++ b/foreign/node/package.json
@@ -1,7 +1,7 @@
{
"name": "apache-iggy",
"type": "module",
- "version": "0.6.1-edge.1",
+ "version": "0.6.2-edge.1",
"description": "Official Apache Iggy NodeJS SDK",
"keywords": [
"iggy",
diff --git a/foreign/node/src/wire/message/header.utils.ts
b/foreign/node/src/wire/message/header.utils.ts
index fa6e9f57d..2b2be17a5 100644
--- a/foreign/node/src/wire/message/header.utils.ts
+++ b/foreign/node/src/wire/message/header.utils.ts
@@ -17,7 +17,6 @@
* under the License.
*/
-
import {
boolToBuf,
int8ToBuf,
@@ -29,8 +28,8 @@ import {
uint32ToBuf,
uint64ToBuf,
floatToBuf,
- doubleToBuf
-} from '../number.utils.js';
+ doubleToBuf,
+} from "../number.utils.js";
import {
type HeaderValueRaw,
@@ -51,45 +50,43 @@ import {
type HeaderKindId,
type HeaderKindValue,
HeaderKind,
- ReverseHeaderKind
-} from './header.type.js';
-
+ ReverseHeaderKind,
+} from "./header.type.js";
/**
* Union type of all possible header value types.
*/
export type HeaderValue =
- HeaderValueRaw |
- HeaderValueString |
- HeaderValueBool |
- HeaderValueInt8 |
- HeaderValueInt16 |
- HeaderValueInt32 |
- HeaderValueInt64 |
- HeaderValueInt128 |
- HeaderValueUint8 |
- HeaderValueUint16 |
- HeaderValueUint32 |
- HeaderValueUint64 |
- HeaderValueUint128 |
- HeaderValueFloat |
- HeaderValueDouble;
+ | HeaderValueRaw
+ | HeaderValueString
+ | HeaderValueBool
+ | HeaderValueInt8
+ | HeaderValueInt16
+ | HeaderValueInt32
+ | HeaderValueInt64
+ | HeaderValueInt128
+ | HeaderValueUint8
+ | HeaderValueUint16
+ | HeaderValueUint32
+ | HeaderValueUint64
+ | HeaderValueUint128
+ | HeaderValueFloat
+ | HeaderValueDouble;
/**
* Map of header names to header values.
*/
export type Headers = Record<string, HeaderValue>;
-
/**
* Internal representation of a header value in binary format.
*/
type BinaryHeaderValue = {
/** Header kind identifier */
- kind: number,// HeaderKind,
+ kind: number; // HeaderKind,
/** Serialized value as Buffer */
- value: Buffer
-}
+ value: Buffer;
+};
/**
* Serializes a header value to a Buffer based on its kind.
@@ -100,56 +97,58 @@ type BinaryHeaderValue = {
export const serializeHeaderValue = (header: HeaderValue) => {
const { kind, value } = header;
switch (kind) {
- case HeaderKind.Raw: return value;
- case HeaderKind.String: return Buffer.from(value);
- case HeaderKind.Bool: return boolToBuf(value);
- case HeaderKind.Int8: return int8ToBuf(value);
- case HeaderKind.Int16: return int16ToBuf(value);
- case HeaderKind.Int32: return int32ToBuf(value);
- case HeaderKind.Int64: return int64ToBuf(value);
- case HeaderKind.Int128: return value;
- case HeaderKind.Uint8: return uint8ToBuf(value);
- case HeaderKind.Uint16: return uint16ToBuf(value);
- case HeaderKind.Uint32: return uint32ToBuf(value);
- case HeaderKind.Uint64: return uint64ToBuf(value);
- case HeaderKind.Uint128: return value;
- case HeaderKind.Float: return floatToBuf(value);
- case HeaderKind.Double: return doubleToBuf(value);
+ case HeaderKind.Raw:
+ return value;
+ case HeaderKind.String:
+ return Buffer.from(value);
+ case HeaderKind.Bool:
+ return boolToBuf(value);
+ case HeaderKind.Int8:
+ return int8ToBuf(value);
+ case HeaderKind.Int16:
+ return int16ToBuf(value);
+ case HeaderKind.Int32:
+ return int32ToBuf(value);
+ case HeaderKind.Int64:
+ return int64ToBuf(value);
+ case HeaderKind.Int128:
+ return value;
+ case HeaderKind.Uint8:
+ return uint8ToBuf(value);
+ case HeaderKind.Uint16:
+ return uint16ToBuf(value);
+ case HeaderKind.Uint32:
+ return uint32ToBuf(value);
+ case HeaderKind.Uint64:
+ return uint64ToBuf(value);
+ case HeaderKind.Uint128:
+ return value;
+ case HeaderKind.Float:
+ return floatToBuf(value);
+ case HeaderKind.Double:
+ return doubleToBuf(value);
}
};
-
/**
* Serializes a single header key-value pair to wire format.
- * Format: [key_length][key][kind][value_length][value]
+ * Format: [key_kind][key_length][key][value_kind][value_length][value]
*
* @param key - Header key name
* @param v - Binary header value
* @returns Serialized header as Buffer
*/
export const serializeHeader = (key: string, v: BinaryHeaderValue) => {
- const bKey = Buffer.from(key)
- const b1 = uint32ToBuf(bKey.length);
- const b2 = Buffer.alloc(5);
- b2.writeUInt8(v.kind);
- b2.writeUInt32LE(v.value.length, 1);
-
- // @TODO debug
- // console.log(
- // 'SERIALIZE\n',
- // 'KEY-LEN', b1.length, b1.toString('hex'), '=', b1.readUInt32LE(0), '\n',
- // 'KEY', bKey.length, bKey.toString('hex'), '\n',
- // 'KIND', b2.readUInt8(0), b2.subarray(0, 1).toString('hex'), '\n',
- // 'V-LEN', b2.readUInt32LE(1), b2.subarray(1, 5).toString('hex'), '\n',
- // 'V', v.value.length, v.value.toString('hex')
- // );
-
- return Buffer.concat([
- b1,
- bKey,
- b2,
- v.value
- ]);
+ const bKey = Buffer.from(key);
+ const keyHeader = Buffer.alloc(5);
+ keyHeader.writeUInt8(HeaderKind.String);
+ keyHeader.writeUInt32LE(bKey.length, 1);
+
+ const valueHeader = Buffer.alloc(5);
+ valueHeader.writeUInt8(v.kind);
+ valueHeader.writeUInt32LE(v.value.length, 1);
+
+ return Buffer.concat([keyHeader, bKey, valueHeader, v.value]);
};
/** Empty headers buffer constant */
@@ -163,7 +162,7 @@ export const EMPTY_HEADERS = Buffer.alloc(0);
*/
const createHeaderValue = (header: HeaderValue): BinaryHeaderValue => ({
kind: header.kind,
- value: serializeHeaderValue(header)
+ value: serializeHeaderValue(header),
});
/**
@@ -173,12 +172,13 @@ const createHeaderValue = (header: HeaderValue):
BinaryHeaderValue => ({
* @returns Serialized headers buffer (empty if no headers)
*/
export const serializeHeaders = (headers?: Headers) => {
- if (!headers)
- return EMPTY_HEADERS;
+ if (!headers) return EMPTY_HEADERS;
- return Buffer.concat(Object.keys(headers).map(
- (c: string) => serializeHeader(c, createHeaderValue(headers[c]))
- ));
+ return Buffer.concat(
+ Object.keys(headers).map((c: string) =>
+ serializeHeader(c, createHeaderValue(headers[c])),
+ ),
+ );
};
// deserialize ...
@@ -191,10 +191,10 @@ export type ParsedHeaderValue = boolean | string | number
| bigint | Buffer;
*/
export type ParsedHeader = {
/** Header kind identifier */
- kind: ParsedHeaderValue,
+ kind: ParsedHeaderValue;
/** Deserialized value */
- value: ParsedHeaderValue
-}
+ value: ParsedHeaderValue;
+};
/** Header with its key included */
type HeaderWithKey = ParsedHeader & { key: string };
@@ -207,10 +207,10 @@ export type HeadersMap = Record<string, ParsedHeader>;
*/
type ParsedHeaderDeserialized = {
/** Number of bytes consumed */
- bytesRead: number,
+ bytesRead: number;
/** Deserialized header data with key */
- data: HeaderWithKey
-}
+ data: HeaderWithKey;
+};
/**
* Maps a numeric header kind to its string identifier.
@@ -223,7 +223,7 @@ export const mapHeaderKind = (k: number): HeaderKindId => {
if (!ReverseHeaderKind[k as HeaderKindValue])
throw new Error(`unknow header kind: ${k}`);
return ReverseHeaderKind[k as HeaderKindValue];
-}
+};
/**
* Deserializes a header value buffer based on its kind.
@@ -233,54 +233,77 @@ export const mapHeaderKind = (k: number): HeaderKindId =>
{
* @returns Deserialized value
* @throws Error if the header kind is invalid
*/
-export const deserializeHeaderValue =
- (kind: number, value: Buffer): ParsedHeaderValue => {
- switch (kind) {
- case HeaderKind.Int128:
- case HeaderKind.Uint128:
- case HeaderKind.Raw: return value;
- case HeaderKind.String: return value.toString();
- case HeaderKind.Int8: return value.readInt8();
- case HeaderKind.Int16: return value.readInt16LE();
- case HeaderKind.Int32: return value.readInt32LE();
- case HeaderKind.Int64: return value.readBigInt64LE();
- case HeaderKind.Uint8: return value.readUint8();
- case HeaderKind.Uint16: return value.readUint16LE();
- case HeaderKind.Uint32: return value.readUInt32LE();
- case HeaderKind.Uint64: return value.readBigUInt64LE();
- case HeaderKind.Bool: return value.readUInt8() === 1;
- case HeaderKind.Float: return value.readFloatLE();
- case HeaderKind.Double: return value.readDoubleLE();
- default: throw new Error(`deserializeHeaderValue: invalid HeaderKind
${kind}`);
- }
- };
+export const deserializeHeaderValue = (
+ kind: number,
+ value: Buffer,
+): ParsedHeaderValue => {
+ switch (kind) {
+ case HeaderKind.Int128:
+ case HeaderKind.Uint128:
+ case HeaderKind.Raw:
+ return value;
+ case HeaderKind.String:
+ return value.toString();
+ case HeaderKind.Int8:
+ return value.readInt8();
+ case HeaderKind.Int16:
+ return value.readInt16LE();
+ case HeaderKind.Int32:
+ return value.readInt32LE();
+ case HeaderKind.Int64:
+ return value.readBigInt64LE();
+ case HeaderKind.Uint8:
+ return value.readUint8();
+ case HeaderKind.Uint16:
+ return value.readUint16LE();
+ case HeaderKind.Uint32:
+ return value.readUInt32LE();
+ case HeaderKind.Uint64:
+ return value.readBigUInt64LE();
+ case HeaderKind.Bool:
+ return value.readUInt8() === 1;
+ case HeaderKind.Float:
+ return value.readFloatLE();
+ case HeaderKind.Double:
+ return value.readDoubleLE();
+ default:
+ throw new Error(`deserializeHeaderValue: invalid HeaderKind ${kind}`);
+ }
+};
/**
* Deserializes a single header from a buffer.
+ * Format: [key_kind][key_length][key][value_kind][value_length][value]
*
* @param p - Buffer containing serialized headers
* @param pos - Starting position in the buffer
* @returns Object with bytes read and deserialized header data
*/
-export const deserializeHeader = (p: Buffer, pos = 0):
ParsedHeaderDeserialized => {
- const keyLength = p.readUInt32LE(pos);
- const key = p.subarray(pos + 4, pos + 4 + keyLength).toString();
- pos += keyLength + 4;
- const rawKind = p.readUInt8(pos);
- // @TODO ?
- // const kind = mapHeaderKind(rawKind);
+export const deserializeHeader = (
+ p: Buffer,
+ pos = 0,
+): ParsedHeaderDeserialized => {
+ const _keyKind = p.readUInt8(pos);
+ const keyLength = p.readUInt32LE(pos + 1);
+ const key = p.subarray(pos + 5, pos + 5 + keyLength).toString();
+ pos += 5 + keyLength;
+
+ const valueKind = p.readUInt8(pos);
const valueLength = p.readUInt32LE(pos + 1);
- const value = deserializeHeaderValue(rawKind, p.subarray(pos + 5, pos + 5 +
valueLength));
+ const value = deserializeHeaderValue(
+ valueKind,
+ p.subarray(pos + 5, pos + 5 + valueLength),
+ );
return {
- bytesRead: 4 + 4 + 1 + keyLength + valueLength,
+ bytesRead: 5 + keyLength + 5 + valueLength,
data: {
key,
- kind: rawKind,
- value
- }
+ kind: valueKind,
+ value,
+ },
};
-}
+};
/**
* Deserializes all headers from a buffer.
@@ -293,12 +316,15 @@ export const deserializeHeaders = (p: Buffer, pos = 0) =>
{
const headers: HeadersMap = {};
const len = p.length;
while (pos < len) {
- const { bytesRead, data: { kind, key, value } } = deserializeHeader(p,
pos);
+ const {
+ bytesRead,
+ data: { kind, key, value },
+ } = deserializeHeader(p, pos);
headers[key] = { kind, value };
pos += bytesRead;
}
return headers;
-}
+};
/**
* HeaderValue factory functions and utilities.
@@ -308,91 +334,91 @@ export const deserializeHeaders = (p: Buffer, pos = 0) =>
{
/** Creates a raw binary header value */
const Raw = (value: Buffer): HeaderValueRaw => ({
kind: HeaderKind.Raw,
- value
+ value,
});
/** Creates a string header value */
const String = (value: string): HeaderValueString => ({
kind: HeaderKind.String,
- value
+ value,
});
/** Creates a boolean header value */
const Bool = (value: boolean): HeaderValueBool => ({
kind: HeaderKind.Bool,
- value
+ value,
});
/** Creates an Int8 header value */
const Int8 = (value: number): HeaderValueInt8 => ({
kind: HeaderKind.Int8,
- value
+ value,
});
/** Creates an Int16 header value */
const Int16 = (value: number): HeaderValueInt16 => ({
kind: HeaderKind.Int16,
- value
+ value,
});
/** Creates an Int32 header value */
const Int32 = (value: number): HeaderValueInt32 => ({
kind: HeaderKind.Int32,
- value
+ value,
});
/** Creates an Int64 header value */
const Int64 = (value: bigint): HeaderValueInt64 => ({
kind: HeaderKind.Int64,
- value
+ value,
});
/** Creates an Int128 header value */
const Int128 = (value: Buffer): HeaderValueInt128 => ({
kind: HeaderKind.Int128,
- value
+ value,
});
/** Creates a Uint8 header value */
const Uint8 = (value: number): HeaderValueUint8 => ({
kind: HeaderKind.Uint8,
- value
+ value,
});
/** Creates a Uint16 header value */
const Uint16 = (value: number): HeaderValueUint16 => ({
kind: HeaderKind.Uint16,
- value
+ value,
});
/** Creates a Uint32 header value */
const Uint32 = (value: number): HeaderValueUint32 => ({
kind: HeaderKind.Uint32,
- value
+ value,
});
/** Creates a Uint64 header value */
const Uint64 = (value: bigint): HeaderValueUint64 => ({
kind: HeaderKind.Uint64,
- value
+ value,
});
/** Creates a Uint128 header value */
const Uint128 = (value: Buffer): HeaderValueUint128 => ({
kind: HeaderKind.Uint128,
- value
+ value,
});
/** Creates a Float header value */
const Float = (value: number): HeaderValueFloat => ({
kind: HeaderKind.Float,
- value
+ value,
});
/** Creates a Double header value */
const Double = (value: number): HeaderValueDouble => ({
kind: HeaderKind.Double,
- value
+ value,
});
/** Gets the kind identifier string of a header value */
@@ -420,11 +446,9 @@ export const HeaderValue = {
Float,
Double,
getKind,
- getValue
+ getValue,
};
-
-
// export type InputHeaderValue = boolean | number | string | bigint | Buffer;
// export type InputHeaders = Record<string, InputHeaderValue>;
@@ -457,7 +481,6 @@ export const HeaderValue = {
// export const createHeaderValueString = (v: string): HeaderValue =>
// ({ kind: HeaderKind.String, value: Buffer.from(v) });
-
// // guess wire type from js type ?
// const guessHeaderValue = (v: InputHeaderValue): HeaderValue => {
// if (typeof v === 'number') {