This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 8664e0fec docs(js): add JSDoc comments (#2534)
8664e0fec is described below
commit 8664e0fec5176ec0d4baced7371fe9bae06a3a2c
Author: BoHyeon Kim <[email protected]>
AuthorDate: Wed Jan 7 22:03:57 2026 +0900
docs(js): add JSDoc comments (#2534)
---
foreign/node/src/client/client.connection.ts | 80 ++++++++++++++
foreign/node/src/client/client.debug.ts | 4 +
foreign/node/src/client/client.socket.ts | 101 +++++++++++++++++
foreign/node/src/client/client.ts | 66 ++++++++++-
foreign/node/src/client/client.type.ts | 74 +++++++++++++
foreign/node/src/client/client.utils.ts | 29 +++++
foreign/node/src/debug-send.ts | 2 +-
foreign/node/src/type.utils.ts | 14 +++
foreign/node/src/wire/command.utils.ts | 20 +++-
.../wire/consumer-group/create-group.command.ts | 13 +++
.../wire/consumer-group/delete-group.command.ts | 13 +++
.../consumer-group/ensure-group.virtual.command.ts | 16 ++-
.../src/wire/consumer-group/get-group.command.ts | 13 +++
.../src/wire/consumer-group/get-groups.command.ts | 12 ++
.../node/src/wire/consumer-group/group.utils.ts | 34 ++++++
.../src/wire/consumer-group/join-group.command.ts | 13 +++
.../src/wire/consumer-group/leave-group.command.ts | 13 +++
foreign/node/src/wire/identifier.utils.ts | 28 ++++-
.../wire/message/flush-unsaved-buffers.command.ts | 13 +++
foreign/node/src/wire/message/header.type.ts | 23 ++++
foreign/node/src/wire/message/header.utils.ts | 108 +++++++++++++++++-
foreign/node/src/wire/message/iggy-header.utils.ts | 39 ++++++-
foreign/node/src/wire/message/message.utils.ts | 68 +++++++++++-
.../node/src/wire/message/partitioning.utils.ts | 57 +++++++++-
.../node/src/wire/message/poll-messages.command.ts | 17 +++
foreign/node/src/wire/message/poll.utils.ts | 102 ++++++++++++++++-
.../node/src/wire/message/send-messages.command.ts | 14 +++
foreign/node/src/wire/number.utils.ts | 81 +++++++++++++-
.../node/src/wire/offset/delete-offset.command.ts | 12 +-
foreign/node/src/wire/offset/get-offset.command.ts | 14 +++
foreign/node/src/wire/offset/offset.utils.ts | 48 ++++++++
.../node/src/wire/offset/store-offset.command.ts | 15 +++
.../src/wire/partition/create-partition.command.ts | 13 +++
.../src/wire/partition/delete-partition.command.ts | 17 ++-
foreign/node/src/wire/partition/partition.utils.ts | 9 ++
foreign/node/src/wire/serialize.utils.ts | 20 ++++
.../src/wire/session/login-with-token.command.ts | 11 ++
foreign/node/src/wire/session/login.command.ts | 7 ++
foreign/node/src/wire/session/login.utils.ts | 25 +++++
foreign/node/src/wire/session/logout.command.ts | 7 ++
.../node/src/wire/stream/create-stream.command.ts | 14 ++-
.../node/src/wire/stream/delete-stream.command.ts | 11 ++
.../wire/stream/ensure-stream.virtual.command.ts | 9 +-
foreign/node/src/wire/stream/get-stream.command.ts | 11 ++
.../node/src/wire/stream/get-streams.command.ts | 11 +-
.../node/src/wire/stream/purge-stream.command.ts | 11 ++
foreign/node/src/wire/stream/stream.utils.ts | 22 ++++
.../node/src/wire/stream/update-stream.command.ts | 12 ++
.../node/src/wire/token/create-token.command.ts | 12 ++
.../node/src/wire/token/delete-token.command.ts | 11 ++
foreign/node/src/wire/token/get-tokens.command.ts | 7 ++
foreign/node/src/wire/token/token.utils.ts | 40 +++++++
.../node/src/wire/topic/create-topic.command.ts | 17 +++
.../node/src/wire/topic/delete-topic.command.ts | 15 ++-
.../src/wire/topic/ensure-topic.virtual.command.ts | 12 +-
foreign/node/src/wire/topic/get-topic.command.ts | 14 ++-
foreign/node/src/wire/topic/get-topics.command.ts | 11 ++
foreign/node/src/wire/topic/purge-topic.command.ts | 12 ++
foreign/node/src/wire/topic/topic.utils.ts | 73 +++++++++++++
.../node/src/wire/topic/update-topic.command.ts | 17 +++
.../node/src/wire/user/change-password.command.ts | 13 +++
foreign/node/src/wire/user/create-user.command.ts | 14 +++
foreign/node/src/wire/user/delete-user.command.ts | 11 ++
foreign/node/src/wire/user/get-user.command.ts | 11 ++
foreign/node/src/wire/user/get-users.command.ts | 9 +-
foreign/node/src/wire/user/permissions.utils.ts | 121 +++++++++++++++++++++
.../src/wire/user/update-permissions.command.ts | 12 ++
foreign/node/src/wire/user/update-user.command.ts | 13 +++
foreign/node/src/wire/user/user.utils.ts | 46 ++++++++
foreign/node/src/wire/uuid.utils.ts | 14 ++-
70 files changed, 1835 insertions(+), 36 deletions(-)
diff --git a/foreign/node/src/client/client.connection.ts
b/foreign/node/src/client/client.connection.ts
index 8dbb83a63..71e0fc35b 100644
--- a/foreign/node/src/client/client.connection.ts
+++ b/foreign/node/src/client/client.connection.ts
@@ -27,15 +27,33 @@ import { serializeCommand } from './client.utils.js';
import { debug } from './client.debug.js';
+/**
+ * Creates a TCP socket connection.
+ *
+ * @param options - TCP connection options
+ * @returns TCP socket
+ */
const createTcpSocket = (options: TcpOption): Socket => {
return createConnection(options);
};
+/**
+ * Creates a TLS socket connection.
+ *
+ * @param options - TLS connection options including port
+ * @returns TLS socket
+ */
const createTlsSocket = ({ port, ...options }: TlsOption): Socket => {
const socket = TLSConnect(port, options);
return socket;
};
+/**
+ * Creates a socket based on the transport type in the configuration.
+ *
+ * @param config - Client configuration with transport type
+ * @returns Socket for the specified transport
+ */
const getTransport = (config: ClientConfig): Socket => {
const { transport, options } = config;
switch (transport) {
@@ -46,12 +64,24 @@ const getTransport = (config: ClientConfig): Socket => {
}
};
+/**
+ * Default reconnection settings.
+ * Attempts reconnection every 5 seconds, up to 12 times.
+ */
const DefaultReconnectOption: ReconnectOption = {
enabled: true,
interval: 5 * 1000,
maxRetries: 12
}
+/**
+ * Recreates a socket after a delay.
+ * Used for reconnection attempts.
+ *
+ * @param option - Client configuration
+ * @param timer - Delay in milliseconds before recreating
+ * @returns Promise resolving to a new socket
+ */
function recreate(option: ClientConfig, timer = 1000): Promise<Socket> {
return new Promise((resolve) => {
setTimeout(() => {
@@ -60,21 +90,40 @@ function recreate(option: ClientConfig, timer = 1000):
Promise<Socket> {
});
}
+/** Socket error with optional error code */
type SocketError = Error & { code?: string };
+/**
+ * Manages the low-level TCP/TLS connection to the Iggy server.
+ * Handles connection lifecycle, reconnection, and data buffering.
+ */
export class IggyConnection extends EventEmitter {
+ /** Client configuration */
public config: ClientConfig
+ /** Underlying socket connection */
public socket: Socket;
+ /** Whether the connection is established */
public connected: boolean;
+ /** Whether a connection attempt is in progress */
public connecting: boolean;
+ /** Whether the connection is being intentionally closed */
public ending: boolean;
+ /** Whether waiting for more data to complete a response */
private waitingResponseEnd: boolean;
+ /** Reconnection configuration */
private reconnectOption: ReconnectOption;
+ /** Number of reconnection attempts made */
private reconnectCount: number;
+ /** Buffer for incomplete response data */
private readBuffers: Buffer;
+ /**
+ * Creates a new IggyConnection.
+ *
+ * @param config - Client configuration
+ */
constructor(config: ClientConfig) {
super();
this.config = config;
@@ -88,6 +137,12 @@ export class IggyConnection extends EventEmitter {
this.readBuffers = Buffer.allocUnsafe(0);
}
+ /**
+ * Establishes the connection to the server.
+ * Sets up event handlers for data, errors, and disconnection.
+ *
+ * @returns Promise that resolves when connected
+ */
connect() {
this.connecting = true;
@@ -122,6 +177,12 @@ export class IggyConnection extends EventEmitter {
});
}
+ /**
+ * Attempts to reconnect to the server.
+ * Respects maxRetries limit and emits error when exceeded.
+ *
+ * @param err - Optional error that triggered the reconnection
+ */
async reconnect(err?: Error) {
const { enabled, interval, maxRetries } = this.reconnectOption
debug(
@@ -149,16 +210,28 @@ export class IggyConnection extends EventEmitter {
this.connect();
}
+ /**
+ * Destroys the connection and marks it as ending.
+ */
_destroy() {
this.ending = true;
this.socket.destroy();
}
+ /**
+ * Clears the response buffer and resets the waiting state.
+ */
_endResponseWait() {
this.readBuffers = Buffer.allocUnsafe(0);
this.waitingResponseEnd = false;
}
+ /**
+ * Handles incoming data from the socket.
+ * Buffers incomplete responses and emits complete ones.
+ *
+ * @param data - Incoming data buffer
+ */
_onData(data: Buffer) {
debug(
'ONDATA',
@@ -211,6 +284,13 @@ export class IggyConnection extends EventEmitter {
this._endResponseWait();
}
+ /**
+ * Writes a command to the socket.
+ *
+ * @param command - Command code
+ * @param payload - Command payload
+ * @returns True if the write was successful
+ */
writeCommand(command: number, payload: Buffer): boolean {
const cmd = serializeCommand(command, payload);
return this.socket.write(cmd);
diff --git a/foreign/node/src/client/client.debug.ts
b/foreign/node/src/client/client.debug.ts
index 1ead3505e..6e7a3afdf 100644
--- a/foreign/node/src/client/client.debug.ts
+++ b/foreign/node/src/client/client.debug.ts
@@ -20,4 +20,8 @@
import Debug from 'debug';
+/**
+ * Debug logger for the Iggy client.
+ * Enable with DEBUG=iggy:client environment variable.
+ */
export const debug = Debug('iggy:client');
diff --git a/foreign/node/src/client/client.socket.ts
b/foreign/node/src/client/client.socket.ts
index 92df8b91b..83f8efb29 100644
--- a/foreign/node/src/client/client.socket.ts
+++ b/foreign/node/src/client/client.socket.ts
@@ -31,29 +31,55 @@ import { IggyConnection } from './client.connection.js';
import { LOGIN, LOGIN_WITH_TOKEN, PING } from '../wire/index.js';
+/**
+ * Command codes that can be executed without authentication.
+ */
const UNLOGGED_COMMAND_CODE = [
PING.code,
LOGIN.code,
LOGIN_WITH_TOKEN.code
];
+/**
+ * Represents a queued command job waiting to be executed.
+ */
type Job = {
+ /** Command code */
command: number,
+ /** Command payload */
payload: Buffer,
+ /** Promise resolve function */
resolve: (v: CommandResponse | PromiseLike<CommandResponse>) => void,
+ /** Promise reject function */
reject: (e: unknown) => void
};
+/**
+ * Manages command execution and response handling for the Iggy server.
+ * Implements command queuing, authentication, and heartbeat functionality.
+ */
export class CommandResponseStream extends EventEmitter {
+ /** Client configuration */
private options: ClientConfig;
+ /** Underlying connection to the server */
private connection: IggyConnection;
+ /** Queue of pending command jobs */
private _execQueue: Job[];
+ /** Whether the stream is currently processing a command */
public busy: boolean;
+ /** Whether the client has been authenticated */
isAuthenticated: boolean;
+ /** Authenticated user ID */
userId?: number;
+ /** Heartbeat interval timer handle */
heartbeatIntervalHandler?: NodeJS.Timeout;
+ /**
+ * Creates a new CommandResponseStream.
+ *
+ * @param options - Client configuration
+ */
constructor(options: ClientConfig) {
super();
this.options = options;
@@ -64,6 +90,9 @@ export class CommandResponseStream extends EventEmitter {
this._init();
};
+ /**
+ * Initializes the stream by setting up heartbeat and connection event
handlers.
+ */
_init() {
this.heartbeat(this.options.heartbeatInterval);
this.connection.on('disconnected', async () => {
@@ -71,6 +100,16 @@ export class CommandResponseStream extends EventEmitter {
});
}
+ /**
+ * Sends a command to the server.
+ * Automatically handles connection and authentication if needed.
+ *
+ * @param command - Command code to send
+ * @param payload - Command payload buffer
+ * @param handleResponse - Whether to parse the response (default: true)
+ * @param last - Whether to add to end of queue (default: true)
+ * @returns Promise resolving to the command response
+ */
async sendCommand(
command: number,
payload: Buffer,
@@ -93,6 +132,12 @@ export class CommandResponseStream extends EventEmitter {
});
}
+ /**
+ * Processes queued commands sequentially.
+ * Emits 'finishQueue' when all commands are processed.
+ *
+ * @param handleResponse - Whether to parse responses
+ */
async _processQueue(handleResponse = true): Promise<void> {
if (this.busy)
return;
@@ -111,6 +156,14 @@ export class CommandResponseStream extends EventEmitter {
this.emit('finishQueue');
}
+ /**
+ * Processes a single command by writing it to the connection and waiting
for response.
+ *
+ * @param command - Command code
+ * @param payload - Command payload
+ * @param handleResp - Whether to parse the response
+ * @returns Promise resolving to the command response
+ */
_processNext(
command: number,
payload: Buffer,
@@ -135,11 +188,22 @@ export class CommandResponseStream extends EventEmitter {
});
}
+ /**
+ * Fails all queued commands with the given error.
+ *
+ * @param err - Error to reject all queued commands with
+ */
_failQueue(err: Error) {
this._execQueue.forEach(({ reject }) => reject(err));
this._execQueue = [];
}
+ /**
+ * Authenticates the client with the server.
+ *
+ * @param creds - Authentication credentials (token or password)
+ * @returns True if authentication succeeded
+ */
async authenticate(creds: ClientCredentials) {
const r = ('token' in creds) ?
await this._authWithToken(creds) :
@@ -149,24 +213,46 @@ export class CommandResponseStream extends EventEmitter {
return this.isAuthenticated;
}
+ /**
+ * Authenticates using username and password.
+ *
+ * @param creds - Password credentials
+ * @returns Login response with user ID
+ */
async _authWithPassword(creds: PasswordCredentials) {
const pl = LOGIN.serialize(creds);
const logr = await this.sendCommand(LOGIN.code, pl, true, false);
return LOGIN.deserialize(logr);
}
+ /**
+ * Authenticates using a token.
+ *
+ * @param creds - Token credentials
+ * @returns Login response with user ID
+ */
async _authWithToken(creds: TokenCredentials) {
const pl = LOGIN_WITH_TOKEN.serialize(creds);
const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl, true,
false);
return LOGIN_WITH_TOKEN.deserialize(logr);
}
+ /**
+ * Sends a ping command to the server.
+ *
+ * @returns Ping response
+ */
async ping() {
const pl = PING.serialize();
const pingR = await this.sendCommand(PING.code, pl, true);
return PING.deserialize(pingR);
}
+ /**
+ * Starts sending periodic heartbeat pings to keep the connection alive.
+ *
+ * @param interval - Heartbeat interval in milliseconds
+ */
heartbeat(interval?: number) {
if (!interval)
return
@@ -179,10 +265,19 @@ export class CommandResponseStream extends EventEmitter {
}, interval);
}
+ /**
+ * Returns the underlying socket as a readable stream.
+ *
+ * @returns The connection socket
+ */
getReadStream() {
return this.connection.socket;
}
+ /**
+ * Destroys the stream and cleans up resources.
+ * Stops heartbeat and destroys the connection.
+ */
destroy() {
if (this.heartbeatIntervalHandler)
clearInterval(this.heartbeatIntervalHandler);
@@ -191,6 +286,12 @@ export class CommandResponseStream extends EventEmitter {
};
+/**
+ * Creates a new RawClient instance.
+ *
+ * @param options - Client configuration
+ * @returns RawClient instance
+ */
export function getRawClient(options: ClientConfig): RawClient {
return new CommandResponseStream(options);
}
diff --git a/foreign/node/src/client/client.ts
b/foreign/node/src/client/client.ts
index f373eee8e..3e81663ac 100644
--- a/foreign/node/src/client/client.ts
+++ b/foreign/node/src/client/client.ts
@@ -25,7 +25,12 @@ import { CommandAPI } from '../wire/command-set.js';
import { debug } from './client.debug.js';
-// create & destroy must be async
+/**
+ * Creates a pool factory for managing RawClient instances.
+ *
+ * @param config - Client configuration
+ * @returns Pool factory with create and destroy methods
+ */
const createPoolFactory = (config: ClientConfig) => ({
create: async function () {
return getRawClient(config);
@@ -35,6 +40,13 @@ const createPoolFactory = (config: ClientConfig) => ({
}
});
+/**
+ * Creates a client provider that uses connection pooling.
+ * Automatically acquires and releases clients from the pool.
+ *
+ * @param config - Client configuration including pool size options
+ * @returns Client provider function with attached pool reference
+ */
const poolClientProvider = (config: ClientConfig) => {
const min = config.poolSize?.min || 1;
const max = config.poolSize?.max || 4;
@@ -53,10 +65,21 @@ const poolClientProvider = (config: ClientConfig) => {
};
+/**
+ * Iggy client with connection pooling support.
+ * Manages a pool of connections for efficient resource utilization.
+ */
export class Client extends CommandAPI {
+ /** Client configuration */
_config: ClientConfig
+ /** Connection pool instance */
_pool: Pool<RawClient>
+ /**
+ * Creates a new pooled client.
+ *
+ * @param config - Client configuration
+ */
constructor(config: ClientConfig) {
const pcp = poolClientProvider(config);
super(pcp);
@@ -64,6 +87,9 @@ export class Client extends CommandAPI {
this._pool = pcp._pool;
};
+ /**
+ * Destroys the client and drains all connections from the pool.
+ */
async destroy() {
debug('destroying client pool. pool size is', this._pool.size);
await this._pool.drain();
@@ -72,6 +98,12 @@ export class Client extends CommandAPI {
}
}
+/**
+ * Creates a client provider that reuses a single connection.
+ *
+ * @param config - Client configuration
+ * @returns Client provider function that always returns the same client
+ */
const singleClientProvider = (config: ClientConfig) => {
const c = getRawClient(config);
return async function singleClientProvider() {
@@ -79,14 +111,27 @@ const singleClientProvider = (config: ClientConfig) => {
}
}
+/**
+ * Iggy client that uses a single persistent connection.
+ * Suitable for applications that don't need connection pooling.
+ */
export class SingleClient extends CommandAPI {
+ /** Client configuration */
_config: ClientConfig
+ /**
+ * Creates a new single-connection client.
+ *
+ * @param config - Client configuration
+ */
constructor(config: ClientConfig) {
super(singleClientProvider(config));
this._config = config;
}
+ /**
+ * Destroys the client connection.
+ */
async destroy() {
const s = await this.clientProvider();
s.destroy();
@@ -94,11 +139,23 @@ export class SingleClient extends CommandAPI {
};
+/**
+ * Simple Iggy client wrapper around an existing RawClient.
+ * Useful when you already have a RawClient instance.
+ */
export class SimpleClient extends CommandAPI {
+ /**
+ * Creates a new simple client from an existing RawClient.
+ *
+ * @param client - Existing RawClient instance
+ */
constructor(client: RawClient) {
super(() => Promise.resolve(client));
}
+ /**
+ * Destroys the underlying client connection.
+ */
async destroy() {
const s = await this.clientProvider();
s.destroy();
@@ -106,6 +163,13 @@ export class SimpleClient extends CommandAPI {
};
+/**
+ * Creates a SimpleClient with the given configuration.
+ * Convenience function for quickly creating a client.
+ *
+ * @param config - Client configuration
+ * @returns SimpleClient instance
+ */
export const getClient = async (config: ClientConfig) => {
const cli = getRawClient(config);
return new SimpleClient(cli);
diff --git a/foreign/node/src/client/client.type.ts
b/foreign/node/src/client/client.type.ts
index 4ddf62726..cea847b7a 100644
--- a/foreign/node/src/client/client.type.ts
+++ b/foreign/node/src/client/client.type.ts
@@ -22,61 +22,135 @@ import type { Readable } from 'stream';
import { type TcpSocketConnectOpts } from 'node:net';
import { type ConnectionOptions } from 'node:tls';
+/**
+ * TCP socket connection options.
+ * Alias for Node.js TcpSocketConnectOpts.
+ */
export type TcpOption = TcpSocketConnectOpts;
+
+/**
+ * TLS socket connection options.
+ * Combines port number with Node.js TLS ConnectionOptions.
+ */
export type TlsOption = { port: number } & ConnectionOptions;
+/**
+ * Response from a command sent to the Iggy server.
+ */
export type CommandResponse = {
+ /** Response status code (0 indicates success) */
status: number,
+ /** Length of the response data in bytes */
length: number,
+ /** Response payload data */
data: Buffer
};
+/**
+ * Low-level client interface for communicating with the Iggy server.
+ * Provides direct access to command sending and event handling.
+ */
export type RawClient = {
+ /** Sends a command to the server and returns the response */
sendCommand: (
code: number, payload: Buffer, handleResponse?: boolean
) => Promise<CommandResponse>,
+ /** Whether the client has been authenticated */
isAuthenticated: boolean
+ /** Authenticates the client with the server */
authenticate: (c: ClientCredentials) => Promise<boolean>
+ /** Destroys the client connection */
destroy: () => void,
+ /** Registers an event listener */
on: (ev: string, cb: (e?: unknown) => void) => void
+ /** Registers a one-time event listener */
once: (ev: string, cb: (e?: unknown) => void) => void
+ /** Returns the underlying readable stream */
getReadStream: () => Readable
}
+/**
+ * Function type that provides a RawClient instance.
+ * Used for dependency injection and connection pooling.
+ */
export type ClientProvider = () => Promise<RawClient>;
+/**
+ * Available transport protocols for connecting to the Iggy server.
+ */
export const Transports = ['TCP', 'TLS' /**, 'QUIC' */] as const;
+
+/**
+ * Transport protocol type.
+ * Currently supports 'TCP' and 'TLS'.
+ */
export type TransportType = typeof Transports[number];
+/**
+ * Configuration options for automatic reconnection.
+ */
export type ReconnectOption = {
+ /** Whether automatic reconnection is enabled */
enabled: boolean,
+ /** Interval between reconnection attempts in milliseconds */
interval: number,
+ /** Maximum number of reconnection attempts */
maxRetries: number
}
+/**
+ * Union type for transport-specific connection options.
+ */
export type TransportOption = TcpOption | TlsOption;
+/**
+ * Token-based authentication credentials.
+ */
export type TokenCredentials = {
+ /** Authentication token */
token: string
}
+/**
+ * Username/password authentication credentials.
+ */
export type PasswordCredentials = {
+ /** Username for authentication */
username: string,
+ /** Password for authentication */
password: string
}
+/**
+ * Union type for client authentication credentials.
+ * Supports either token-based or password-based authentication.
+ */
export type ClientCredentials = TokenCredentials | PasswordCredentials;
+/**
+ * Connection pool size configuration.
+ */
export type PoolSizeOption = {
+ /** Minimum number of connections in the pool */
min?: number,
+ /** Maximum number of connections in the pool */
max?: number
}
+/**
+ * Complete client configuration for connecting to the Iggy server.
+ */
export type ClientConfig = {
+ /** Transport protocol to use (TCP or TLS) */
transport: TransportType,
+ /** Transport-specific connection options */
options: TransportOption,
+ /** Authentication credentials */
credentials: ClientCredentials,
+ /** Connection pool size configuration */
poolSize?: PoolSizeOption,
+ /** Automatic reconnection configuration */
reconnect?: ReconnectOption,
+ /** Interval for sending heartbeat pings in milliseconds */
heartbeatInterval?: number
}
diff --git a/foreign/node/src/client/client.utils.ts
b/foreign/node/src/client/client.utils.ts
index e1593634a..c4de16f36 100644
--- a/foreign/node/src/client/client.utils.ts
+++ b/foreign/node/src/client/client.utils.ts
@@ -24,6 +24,13 @@ import { translateCommandCode } from
'../wire/command.code.js';
import { debug } from './client.debug.js';
+/**
+ * Parses a raw response buffer into a structured CommandResponse.
+ * Extracts status code, length, and payload data from the buffer.
+ *
+ * @param r - Raw response buffer from the server
+ * @returns Parsed command response with status, length, and data
+ */
export const handleResponse = (r: Buffer) => {
const status = r.readUint32LE(0);
const length = r.readUint32LE(4);
@@ -33,6 +40,12 @@ export const handleResponse = (r: Buffer) => {
}
};
+/**
+ * Creates a Transform stream that parses response buffers.
+ * Transforms raw server responses into just the data payload.
+ *
+ * @returns Transform stream for processing server responses
+ */
export const handleResponseTransform = () => new Transform({
transform(chunk: Buffer, encoding: BufferEncoding, cb: TransformCallback) {
try {
@@ -45,11 +58,27 @@ export const handleResponseTransform = () => new Transform({
}
});
+/**
+ * Deserializes a void response from the server.
+ * Returns true if the command succeeded with no data.
+ *
+ * @param r - Command response to check
+ * @returns True if the response indicates success with no data
+ */
export const deserializeVoidResponse =
(r: CommandResponse) => r.status === 0 && r.data.length === 0;
+/** Length of the command code in bytes */
const COMMAND_LENGTH = 4;
+/**
+ * Serializes a command and its payload into a buffer for sending to the
server.
+ * Creates the wire format: [payload_size (4 bytes)][command (4
bytes)][payload]
+ *
+ * @param command - Command code to send
+ * @param payload - Command payload buffer
+ * @returns Buffer ready to be sent to the server
+ */
export const serializeCommand = (command: number, payload: Buffer) => {
const payloadSize = payload.length + COMMAND_LENGTH;
const data = Buffer.allocUnsafe(8 + payload.length);
diff --git a/foreign/node/src/debug-send.ts b/foreign/node/src/debug-send.ts
index 5ba70553b..bb28c5297 100644
--- a/foreign/node/src/debug-send.ts
+++ b/foreign/node/src/debug-send.ts
@@ -19,7 +19,7 @@
import assert from 'node:assert/strict';
-import { ClientConfig, SingleClient } from './client/index.js';
+import { type ClientConfig, SingleClient } from './client/index.js';
import { groupConsumerStream } from './stream/consumer-stream.js';
import { PollingStrategy, type PollMessagesResponse } from './wire/index.js';
diff --git a/foreign/node/src/type.utils.ts b/foreign/node/src/type.utils.ts
index 10fea2e3e..f71b5d11b 100644
--- a/foreign/node/src/type.utils.ts
+++ b/foreign/node/src/type.utils.ts
@@ -18,6 +18,15 @@
*/
+/**
+ * Reverses the keys and values of a record object.
+ * Creates a new record where the original values become keys and vice versa.
+ *
+ * @typeParam T - Type of the original keys
+ * @typeParam U - Type of the original values (becomes keys in result)
+ * @param input - Record object to reverse
+ * @returns New record with swapped keys and values
+ */
export function reverseRecord<
T extends PropertyKey,
U extends PropertyKey,
@@ -30,4 +39,9 @@ export function reverseRecord<
) as Record<U, T>
}
+/**
+ * Extracts the value types from an object type.
+ *
+ * @typeParam T - Object type to extract values from
+ */
export type ValueOf<T> = T[keyof T];
diff --git a/foreign/node/src/wire/command.utils.ts
b/foreign/node/src/wire/command.utils.ts
index 5e5ff9196..50718c1ce 100644
--- a/foreign/node/src/wire/command.utils.ts
+++ b/foreign/node/src/wire/command.utils.ts
@@ -20,15 +20,31 @@
import type { CommandResponse, ClientProvider } from
'../client/client.type.js';
-// export type ArgTypes<F extends Function> = F extends (...args: infer A) =>
any ? A : never;
-
+/**
+ * Represents a command that can be sent to the Iggy server.
+ *
+ * @typeParam I - Input type for the command arguments
+ * @typeParam O - Output type for the command response
+ */
export type Command<I, O> = {
+ /** Command code identifying the operation */
code: number,
+ /** Function to serialize command arguments to a Buffer */
serialize: (args: I) => Buffer,
+ /** Function to deserialize the server response */
deserialize: (r: CommandResponse) => O
}
+/**
+ * Wraps a command definition into an executable function.
+ * Creates a function that handles serialization, sending, and deserialization.
+ *
+ * @typeParam I - Input type for the command arguments
+ * @typeParam O - Output type for the command response
+ * @param cmd - Command definition with code, serialize, and deserialize
functions
+ * @returns A function that takes a ClientProvider and returns an async
command executor
+ */
export function wrapCommand<I, O>(cmd: Command<I, O>) {
return (getClient: ClientProvider) =>
async (arg: I) => cmd.deserialize(
diff --git a/foreign/node/src/wire/consumer-group/create-group.command.ts
b/foreign/node/src/wire/consumer-group/create-group.command.ts
index 5434d8441..314d3117d 100644
--- a/foreign/node/src/wire/consumer-group/create-group.command.ts
+++ b/foreign/node/src/wire/consumer-group/create-group.command.ts
@@ -24,12 +24,22 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
import { deserializeConsumerGroup, type ConsumerGroup } from
'./group.utils.js';
+/**
+ * Parameters for the create consumer group command.
+ */
export type CreateGroup = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Consumer group name (1-255 bytes) */
name: string,
};
+/**
+ * Create consumer group command definition.
+ * Creates a new consumer group within a topic.
+ */
export const CREATE_GROUP = {
code: COMMAND_CODE.CreateGroup,
@@ -57,4 +67,7 @@ export const CREATE_GROUP = {
};
+/**
+ * Executable create consumer group command function.
+ */
export const createGroup = wrapCommand<CreateGroup,
ConsumerGroup>(CREATE_GROUP);
diff --git a/foreign/node/src/wire/consumer-group/delete-group.command.ts
b/foreign/node/src/wire/consumer-group/delete-group.command.ts
index b7c77c00b..c2d6a00eb 100644
--- a/foreign/node/src/wire/consumer-group/delete-group.command.ts
+++ b/foreign/node/src/wire/consumer-group/delete-group.command.ts
@@ -24,12 +24,22 @@ import { deserializeVoidResponse } from
'../../client/client.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the delete consumer group command.
+ */
export type DeleteGroup = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Consumer group identifier (ID or name) */
groupId: Id
};
+/**
+ * Delete consumer group command definition.
+ * Removes a consumer group from a topic.
+ */
export const DELETE_GROUP = {
code: COMMAND_CODE.DeleteGroup,
@@ -40,4 +50,7 @@ export const DELETE_GROUP = {
deserialize: deserializeVoidResponse
};
+/**
+ * Executable delete consumer group command function.
+ */
export const deleteGroup = wrapCommand<DeleteGroup, boolean>(DELETE_GROUP);
diff --git
a/foreign/node/src/wire/consumer-group/ensure-group.virtual.command.ts
b/foreign/node/src/wire/consumer-group/ensure-group.virtual.command.ts
index 6c4d0c4b7..9316a8c51 100644
--- a/foreign/node/src/wire/consumer-group/ensure-group.virtual.command.ts
+++ b/foreign/node/src/wire/consumer-group/ensure-group.virtual.command.ts
@@ -19,12 +19,19 @@
import type { Id } from '../identifier.utils.js';
-import { ClientProvider } from '../../client/index.js';
+import type { ClientProvider } from '../../client/index.js';
import { createGroup } from './create-group.command.js';
import { getGroup } from './get-group.command.js';
import { joinGroup } from './join-group.command.js';
+/**
+ * Creates a virtual command that ensures a consumer group exists.
+ * If the group does not exist, it will be created.
+ *
+ * @param c - Client provider function
+ * @returns Function that ensures a consumer group exists by name
+ */
export const ensureConsumerGroup = (c: ClientProvider) =>
async function ensureConsumerGroup(
streamId: Id,
@@ -37,6 +44,13 @@ export const ensureConsumerGroup = (c: ClientProvider) =>
return group;
}
+/**
+ * Creates a virtual command that ensures a consumer group exists and joins it.
+ * If the group does not exist, it will be created, then the client joins the
group.
+ *
+ * @param c - Client provider function
+ * @returns Function that ensures a consumer group exists and joins it
+ */
export const ensureConsumerGroupAndJoin = (c: ClientProvider) =>
async function ensureConsumerGroupAndJoin(
streamId: Id,
diff --git a/foreign/node/src/wire/consumer-group/get-group.command.ts
b/foreign/node/src/wire/consumer-group/get-group.command.ts
index 497a48358..89d4b9204 100644
--- a/foreign/node/src/wire/consumer-group/get-group.command.ts
+++ b/foreign/node/src/wire/consumer-group/get-group.command.ts
@@ -27,12 +27,22 @@ import {
type ConsumerGroup
} from './group.utils.js';
+/**
+ * Parameters for the get consumer group command.
+ */
export type GetGroup = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Consumer group identifier (ID or name) */
groupId: Id
};
+/**
+ * Get consumer group command definition.
+ * Retrieves information about a specific consumer group.
+ */
export const GET_GROUP = {
code: COMMAND_CODE.GetGroup,
@@ -48,4 +58,7 @@ export const GET_GROUP = {
};
+/**
+ * Executable get consumer group command function.
+ */
export const getGroup = wrapCommand<GetGroup, ConsumerGroup | null>(GET_GROUP);
diff --git a/foreign/node/src/wire/consumer-group/get-groups.command.ts
b/foreign/node/src/wire/consumer-group/get-groups.command.ts
index 1279cc1a0..2554125a2 100644
--- a/foreign/node/src/wire/consumer-group/get-groups.command.ts
+++ b/foreign/node/src/wire/consumer-group/get-groups.command.ts
@@ -25,11 +25,20 @@ import { deserializeConsumerGroups } from
'./group.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the get consumer groups command.
+ */
export type GetGroups = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id
};
+/**
+ * Get consumer groups command definition.
+ * Retrieves all consumer groups within a topic.
+ */
export const GET_GROUPS = {
code: COMMAND_CODE.GetGroups,
@@ -46,4 +55,7 @@ export const GET_GROUPS = {
};
+/**
+ * Executable get consumer groups command function.
+ */
export const getGroups = wrapCommand<GetGroups, ConsumerGroup[]>(GET_GROUPS);
diff --git a/foreign/node/src/wire/consumer-group/group.utils.ts
b/foreign/node/src/wire/consumer-group/group.utils.ts
index 4fb85b88e..b63191d99 100644
--- a/foreign/node/src/wire/consumer-group/group.utils.ts
+++ b/foreign/node/src/wire/consumer-group/group.utils.ts
@@ -20,18 +20,38 @@
import { serializeIdentifier, type Id } from '../identifier.utils.js';
+/**
+ * Consumer group information.
+ */
export type ConsumerGroup = {
+ /** Consumer group ID */
id: number,
+ /** Consumer group name */
name: string,
+ /** Number of members in the group */
membersCount: number,
+ /** Number of partitions assigned to the group */
partitionsCount: number,
};
+/**
+ * Result of deserializing a consumer group.
+ */
type ConsumerGroupDeserialized = {
+ /** Number of bytes consumed */
bytesRead: number,
+ /** Deserialized consumer group data */
data: ConsumerGroup
};
+/**
+ * Serializes stream, topic, and group identifiers for targeting a consumer
group.
+ *
+ * @param streamId - Stream identifier (ID or name)
+ * @param topicId - Topic identifier (ID or name)
+ * @param groupId - Consumer group identifier (ID or name)
+ * @returns Buffer containing serialized identifiers
+ */
export const serializeTargetGroup = (streamId: Id, topicId: Id, groupId: Id)
=> {
return Buffer.concat([
serializeIdentifier(streamId),
@@ -41,6 +61,13 @@ export const serializeTargetGroup = (streamId: Id, topicId:
Id, groupId: Id) =>
};
+/**
+ * Deserializes a consumer group from a buffer.
+ *
+ * @param r - Buffer containing serialized consumer group data
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized consumer group data
+ */
export const deserializeConsumerGroup = (r: Buffer, pos = 0):
ConsumerGroupDeserialized => {
const id = r.readUInt32LE(pos);
const partitionsCount = r.readUInt32LE(pos + 4);
@@ -59,6 +86,13 @@ export const deserializeConsumerGroup = (r: Buffer, pos =
0): ConsumerGroupDeser
}
};
+/**
+ * Deserializes multiple consumer groups from a buffer.
+ *
+ * @param r - Buffer containing serialized consumer groups data
+ * @param pos - Starting position in the buffer
+ * @returns Array of deserialized consumer groups
+ */
export const deserializeConsumerGroups = (r: Buffer, pos = 0) => {
const end = r.length;
const cgroups = [];
diff --git a/foreign/node/src/wire/consumer-group/join-group.command.ts
b/foreign/node/src/wire/consumer-group/join-group.command.ts
index ebf75fffa..47dd6e290 100644
--- a/foreign/node/src/wire/consumer-group/join-group.command.ts
+++ b/foreign/node/src/wire/consumer-group/join-group.command.ts
@@ -24,12 +24,22 @@ import { deserializeVoidResponse } from
'../../client/client.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the join consumer group command.
+ */
export type JoinGroup = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Consumer group identifier (ID or name) */
groupId: Id
};
+/**
+ * Join consumer group command definition.
+ * Adds the current client as a member of the consumer group.
+ */
export const JOIN_GROUP = {
code: COMMAND_CODE.JoinGroup,
@@ -41,4 +51,7 @@ export const JOIN_GROUP = {
};
+/**
+ * Executable join consumer group command function.
+ */
export const joinGroup = wrapCommand<JoinGroup, boolean>(JOIN_GROUP);
diff --git a/foreign/node/src/wire/consumer-group/leave-group.command.ts
b/foreign/node/src/wire/consumer-group/leave-group.command.ts
index 798b6f3e8..4059e7fb7 100644
--- a/foreign/node/src/wire/consumer-group/leave-group.command.ts
+++ b/foreign/node/src/wire/consumer-group/leave-group.command.ts
@@ -24,12 +24,22 @@ import { deserializeVoidResponse } from
'../../client/client.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the leave consumer group command.
+ */
export type LeaveGroup = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Consumer group identifier (ID or name) */
groupId: Id
};
+/**
+ * Leave consumer group command definition.
+ * Removes the current client from the consumer group.
+ */
export const LEAVE_GROUP = {
code: COMMAND_CODE.LeaveGroup,
@@ -40,4 +50,7 @@ export const LEAVE_GROUP = {
deserialize: deserializeVoidResponse
};
+/**
+ * Executable leave consumer group command function.
+ */
export const leaveGroup = wrapCommand<LeaveGroup, boolean>(LEAVE_GROUP);
diff --git a/foreign/node/src/wire/identifier.utils.ts
b/foreign/node/src/wire/identifier.utils.ts
index 0934b4dce..165789a85 100644
--- a/foreign/node/src/wire/identifier.utils.ts
+++ b/foreign/node/src/wire/identifier.utils.ts
@@ -18,14 +18,27 @@
*/
+/** Identifier kind for numeric IDs */
const NUMERIC = 1;
+/** Identifier kind for string IDs */
const STRING = 2;
type NUMERIC = typeof NUMERIC;
type STRING = typeof STRING;
+/**
+ * Identifier type that can be either a numeric ID or a string name.
+ * Used to identify streams, topics, partitions, and other resources.
+ */
export type Id = number | string;
+/**
+ * Serializes an identifier (numeric or string) to a Buffer for wire protocol.
+ *
+ * @param id - Numeric ID or string name to serialize
+ * @returns Buffer containing the serialized identifier
+ * @throws Error if the identifier type is not supported
+ */
export const serializeIdentifier = (id: Id): Buffer => {
if ('string' === typeof id) {
return serializeStringId(id);
@@ -33,9 +46,16 @@ export const serializeIdentifier = (id: Id): Buffer => {
if ('number' === typeof id) {
return serializeNumericId(id);
}
- throw new Error(`Unsuported id type (${id} - ${typeof id})`);
+ throw new Error(`Unsupported id type (${id} - ${typeof id})`);
};
+/**
+ * Serializes a string identifier to a Buffer.
+ *
+ * @param id - String name to serialize (1-255 bytes)
+ * @returns Buffer containing kind, length, and string bytes
+ * @throws Error if the string length is not between 1 and 255 bytes
+ */
const serializeStringId = (id: string): Buffer => {
const b = Buffer.alloc(1 + 1);
const bId = Buffer.from(id);
@@ -49,6 +69,12 @@ const serializeStringId = (id: string): Buffer => {
]);
};
+/**
+ * Serializes a numeric identifier to a Buffer.
+ *
+ * @param id - Numeric ID to serialize (32-bit unsigned integer)
+ * @returns Buffer containing kind, length, and ID bytes in little-endian
format
+ */
const serializeNumericId = (id: number): Buffer => {
const b = Buffer.alloc(1 + 1 + 4);
b.writeUInt8(NUMERIC);
diff --git a/foreign/node/src/wire/message/flush-unsaved-buffers.command.ts
b/foreign/node/src/wire/message/flush-unsaved-buffers.command.ts
index 30a9afb81..458bd3af1 100644
--- a/foreign/node/src/wire/message/flush-unsaved-buffers.command.ts
+++ b/foreign/node/src/wire/message/flush-unsaved-buffers.command.ts
@@ -24,12 +24,22 @@ import { deserializeVoidResponse } from
'../../client/client.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the flush unsaved buffers command.
+ */
export type FlushUnsavedBuffer = {
+ /** Stream identifier */
streamId: Id,
+ /** Topic identifier */
topicId: Id,
+ /** Partition ID to flush */
partitionId: number,
};
+/**
+ * Flush unsaved buffers command definition.
+ * Forces buffered messages to be persisted to disk.
+ */
export const FLUSH_UNSAVED_BUFFERS = {
code: COMMAND_CODE.FlushUnsavedBuffers,
@@ -40,4 +50,7 @@ export const FLUSH_UNSAVED_BUFFERS = {
deserialize: deserializeVoidResponse
};
+/**
+ * Executable flush unsaved buffers command function.
+ */
export const flushUnsavedBuffers = wrapCommand<FlushUnsavedBuffer,
boolean>(FLUSH_UNSAVED_BUFFERS);
diff --git a/foreign/node/src/wire/message/header.type.ts
b/foreign/node/src/wire/message/header.type.ts
index df04821a1..602e51450 100644
--- a/foreign/node/src/wire/message/header.type.ts
+++ b/foreign/node/src/wire/message/header.type.ts
@@ -19,6 +19,10 @@
import { type ValueOf, reverseRecord } from "../../type.utils.js";
+/**
+ * Enumeration of header value types supported in message headers.
+ * Each type maps to a numeric identifier for wire protocol encoding.
+ */
export const HeaderKind = {
Raw: 1,
String: 2,
@@ -37,81 +41,100 @@ export const HeaderKind = {
Double: 15
} as const;
+/** Type alias for the HeaderKind object */
export type HeaderKind = typeof HeaderKind;
+/** String literal type of header kind names */
export type HeaderKindId = keyof HeaderKind;
+/** Numeric values of header kinds */
export type HeaderKindValue = ValueOf<HeaderKind>;
+/** Reverse mapping from numeric value to header kind name */
export const ReverseHeaderKind = reverseRecord(HeaderKind);
+/** Raw binary header value */
export type HeaderValueRaw = {
kind: HeaderKind['Raw'],
value: Buffer
}
+/** String header value */
export type HeaderValueString = {
kind: HeaderKind['String']
value: string
}
+/** Boolean header value */
export type HeaderValueBool = {
kind: HeaderKind['Bool'],
value: boolean
}
+/** Signed 8-bit integer header value */
export type HeaderValueInt8 = {
kind: HeaderKind['Int8'],
value: number
}
+/** Signed 16-bit integer header value */
export type HeaderValueInt16 = {
kind: HeaderKind['Int16'],
value: number
};
+/** Signed 32-bit integer header value */
export type HeaderValueInt32 = {
kind: HeaderKind['Int32'],
value: number
}
+/** Signed 64-bit integer header value */
export type HeaderValueInt64 = {
kind: HeaderKind['Int64'],
value: bigint
}
+/** Signed 128-bit integer header value */
export type HeaderValueInt128 = {
kind: HeaderKind['Int128'],
value: Buffer // | ArrayBuffer // ?
}
+/** Unsigned 8-bit integer header value */
export type HeaderValueUint8 = {
kind: HeaderKind['Uint8'],
value: number
}
+/** Unsigned 16-bit integer header value */
export type HeaderValueUint16 = {
kind: HeaderKind['Uint16'],
value: number
}
+/** Unsigned 32-bit integer header value */
export type HeaderValueUint32 = {
kind: HeaderKind['Uint32'],
value: number
}
+/** Unsigned 64-bit integer header value */
export type HeaderValueUint64 = {
kind: HeaderKind['Uint64'],
value: bigint
}
+/** Unsigned 128-bit integer header value */
export type HeaderValueUint128 = {
kind: HeaderKind['Uint128'],
value: Buffer // | ArrayBuffer // ?
}
+/** 32-bit floating point header value */
export type HeaderValueFloat = {
kind: HeaderKind['Float'],
value: number
}
+/** 64-bit floating point (double) header value */
export type HeaderValueDouble = {
kind: HeaderKind['Double'],
value: number
diff --git a/foreign/node/src/wire/message/header.utils.ts
b/foreign/node/src/wire/message/header.utils.ts
index 15db4fa62..fa6e9f57d 100644
--- a/foreign/node/src/wire/message/header.utils.ts
+++ b/foreign/node/src/wire/message/header.utils.ts
@@ -55,6 +55,9 @@ import {
} from './header.type.js';
+/**
+ * Union type of all possible header value types.
+ */
export type HeaderValue =
HeaderValueRaw |
HeaderValueString |
@@ -72,14 +75,28 @@ export type HeaderValue =
HeaderValueFloat |
HeaderValueDouble;
+/**
+ * Map of header names to header values.
+ */
export type Headers = Record<string, HeaderValue>;
+/**
+ * Internal representation of a header value in binary format.
+ */
type BinaryHeaderValue = {
+ /** Header kind identifier */
kind: number,// HeaderKind,
+ /** Serialized value as Buffer */
value: Buffer
}
+/**
+ * Serializes a header value to a Buffer based on its kind.
+ *
+ * @param header - Header value to serialize
+ * @returns Serialized value as Buffer
+ */
export const serializeHeaderValue = (header: HeaderValue) => {
const { kind, value } = header;
switch (kind) {
@@ -102,6 +119,14 @@ export const serializeHeaderValue = (header: HeaderValue)
=> {
};
+/**
+ * Serializes a single header key-value pair to wire format.
+ * Format: [key_length][key][kind][value_length][value]
+ *
+ * @param key - Header key name
+ * @param v - Binary header value
+ * @returns Serialized header as Buffer
+ */
export const serializeHeader = (key: string, v: BinaryHeaderValue) => {
const bKey = Buffer.from(key)
const b1 = uint32ToBuf(bKey.length);
@@ -127,13 +152,26 @@ export const serializeHeader = (key: string, v:
BinaryHeaderValue) => {
]);
};
+/** Empty headers buffer constant */
export const EMPTY_HEADERS = Buffer.alloc(0);
+/**
+ * Creates a binary header value from a typed header value.
+ *
+ * @param header - Typed header value
+ * @returns Binary header value
+ */
const createHeaderValue = (header: HeaderValue): BinaryHeaderValue => ({
kind: header.kind,
value: serializeHeaderValue(header)
});
+/**
+ * Serializes all headers to a single buffer.
+ *
+ * @param headers - Optional headers map
+ * @returns Serialized headers buffer (empty if no headers)
+ */
export const serializeHeaders = (headers?: Headers) => {
if (!headers)
return EMPTY_HEADERS;
@@ -145,28 +183,56 @@ export const serializeHeaders = (headers?: Headers) => {
// deserialize ...
+/** Possible JavaScript types for deserialized header values */
export type ParsedHeaderValue = boolean | string | number | bigint | Buffer;
+/**
+ * Deserialized header with kind and value.
+ */
export type ParsedHeader = {
+ /** Header kind identifier */
kind: ParsedHeaderValue,
+ /** Deserialized value */
value: ParsedHeaderValue
}
+/** Header with its key included */
type HeaderWithKey = ParsedHeader & { key: string };
+/** Map of header keys to parsed headers */
export type HeadersMap = Record<string, ParsedHeader>;
+/**
+ * Result of deserializing a single header.
+ */
type ParsedHeaderDeserialized = {
+ /** Number of bytes consumed */
bytesRead: number,
+ /** Deserialized header data with key */
data: HeaderWithKey
}
+/**
+ * Maps a numeric header kind to its string identifier.
+ *
+ * @param k - Numeric header kind value
+ * @returns Header kind identifier string
+ * @throws Error if the header kind is unknown
+ */
export const mapHeaderKind = (k: number): HeaderKindId => {
if (!ReverseHeaderKind[k as HeaderKindValue])
throw new Error(`unknow header kind: ${k}`);
return ReverseHeaderKind[k as HeaderKindValue];
}
+/**
+ * Deserializes a header value buffer based on its kind.
+ *
+ * @param kind - Numeric header kind
+ * @param value - Raw value buffer
+ * @returns Deserialized value
+ * @throws Error if the header kind is invalid
+ */
export const deserializeHeaderValue =
(kind: number, value: Buffer): ParsedHeaderValue => {
switch (kind) {
@@ -189,13 +255,20 @@ export const deserializeHeaderValue =
}
};
+/**
+ * Deserializes a single header from a buffer.
+ *
+ * @param p - Buffer containing serialized headers
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized header data
+ */
export const deserializeHeader = (p: Buffer, pos = 0):
ParsedHeaderDeserialized => {
const keyLength = p.readUInt32LE(pos);
const key = p.subarray(pos + 4, pos + 4 + keyLength).toString();
pos += keyLength + 4;
const rawKind = p.readUInt8(pos);
// @TODO ?
- // const kind = mapHeaderKind(rawKind);
+ // const kind = mapHeaderKind(rawKind);
const valueLength = p.readUInt32LE(pos + 1);
const value = deserializeHeaderValue(rawKind, p.subarray(pos + 5, pos + 5 +
valueLength));
@@ -209,6 +282,13 @@ export const deserializeHeader = (p: Buffer, pos = 0):
ParsedHeaderDeserialized
};
}
+/**
+ * Deserializes all headers from a buffer.
+ *
+ * @param p - Buffer containing serialized headers
+ * @param pos - Starting position in the buffer
+ * @returns Map of header keys to parsed headers
+ */
export const deserializeHeaders = (p: Buffer, pos = 0) => {
const headers: HeadersMap = {};
const len = p.length;
@@ -220,85 +300,109 @@ export const deserializeHeaders = (p: Buffer, pos = 0)
=> {
return headers;
}
-/** HeaderValue Helper */
+/**
+ * HeaderValue factory functions and utilities.
+ * Provides type-safe constructors for each header value type.
+ */
+
+/** Creates a raw binary header value */
const Raw = (value: Buffer): HeaderValueRaw => ({
kind: HeaderKind.Raw,
value
});
+/** Creates a string header value */
const String = (value: string): HeaderValueString => ({
kind: HeaderKind.String,
value
});
+/** Creates a boolean header value */
const Bool = (value: boolean): HeaderValueBool => ({
kind: HeaderKind.Bool,
value
});
+/** Creates an Int8 header value */
const Int8 = (value: number): HeaderValueInt8 => ({
kind: HeaderKind.Int8,
value
});
+/** Creates an Int16 header value */
const Int16 = (value: number): HeaderValueInt16 => ({
kind: HeaderKind.Int16,
value
});
+/** Creates an Int32 header value */
const Int32 = (value: number): HeaderValueInt32 => ({
kind: HeaderKind.Int32,
value
});
+/** Creates an Int64 header value */
const Int64 = (value: bigint): HeaderValueInt64 => ({
kind: HeaderKind.Int64,
value
});
+/** Creates an Int128 header value */
const Int128 = (value: Buffer): HeaderValueInt128 => ({
kind: HeaderKind.Int128,
value
});
+/** Creates a Uint8 header value */
const Uint8 = (value: number): HeaderValueUint8 => ({
kind: HeaderKind.Uint8,
value
});
+/** Creates a Uint16 header value */
const Uint16 = (value: number): HeaderValueUint16 => ({
kind: HeaderKind.Uint16,
value
});
+/** Creates a Uint32 header value */
const Uint32 = (value: number): HeaderValueUint32 => ({
kind: HeaderKind.Uint32,
value
});
+/** Creates a Uint64 header value */
const Uint64 = (value: bigint): HeaderValueUint64 => ({
kind: HeaderKind.Uint64,
value
});
+/** Creates a Uint128 header value */
const Uint128 = (value: Buffer): HeaderValueUint128 => ({
kind: HeaderKind.Uint128,
value
});
+/** Creates a Float header value */
const Float = (value: number): HeaderValueFloat => ({
kind: HeaderKind.Float,
value
});
+/** Creates a Double header value */
const Double = (value: number): HeaderValueDouble => ({
kind: HeaderKind.Double,
value
});
+/** Gets the kind identifier string of a header value */
const getKind = (h: HeaderValue) => mapHeaderKind(h.kind);
+/** Gets the value from a header value */
const getValue = (h: HeaderValue) => h.value;
+/**
+ * HeaderValue factory object with constructors for all header types.
+ */
export const HeaderValue = {
Raw,
String,
diff --git a/foreign/node/src/wire/message/iggy-header.utils.ts
b/foreign/node/src/wire/message/iggy-header.utils.ts
index e5f3335e6..d26ef4dec 100644
--- a/foreign/node/src/wire/message/iggy-header.utils.ts
+++ b/foreign/node/src/wire/message/iggy-header.utils.ts
@@ -21,19 +21,41 @@ import { toDate } from "../serialize.utils.js";
import { u128LEBufToBigint } from "../number.utils.js";
+/**
+ * Iggy message header containing metadata for each message.
+ */
export type IggyMessageHeader = {
+ /** Message checksum for integrity verification */
checksum: bigint,
- id: string | BigInt,
+ /** Unique message identifier (UUID or numeric) */
+ id: string | bigint,
+ /** Message offset within the partition */
offset: bigint,
+ /** Server-assigned timestamp */
timestamp: Date,
+ /** Client-provided origin timestamp */
originTimestamp: Date,
+ /** Length of user-defined headers in bytes */
userHeadersLength: number,
+ /** Length of message payload in bytes */
payloadLength: number
};
-// u64 + u128 + u64 + u64 + u64 + u32 + u32
+/**
+ * Size of the Iggy message header in bytes.
+ * Layout: u64 (checksum) + u128 (id) + u64 (offset) + u64 (timestamp) + u64
(originTimestamp) + u32 (userHeadersLength) + u32 (payloadLength)
+ */
export const IGGY_MESSAGE_HEADER_SIZE = 8 + 16 + 8 + 8 + 8 + 4 + 4;
+/**
+ * Serializes an Iggy message header to wire format.
+ * Sets checksum, offset, and timestamp to zero (filled by server).
+ *
+ * @param id - Message ID as 16-byte buffer
+ * @param payload - Message payload
+ * @param userHeaders - Serialized user headers
+ * @returns Serialized header buffer
+ */
export const serializeIggyMessageHeader = (
id: Buffer,
payload: Buffer,
@@ -50,8 +72,21 @@ export const serializeIggyMessageHeader = (
return b;
};
+/**
+ * Deserializes a message ID from a 16-byte buffer to BigInt.
+ *
+ * @param b - 16-byte buffer containing the message ID
+ * @returns Message ID as BigInt
+ */
export const deserialiseMessageId = (b: Buffer) => u128LEBufToBigint(b);
+/**
+ * Deserializes Iggy message headers from a buffer.
+ *
+ * @param b - Buffer containing the serialized header
+ * @returns Parsed IggyMessageHeader object
+ * @throws Error if buffer length doesn't match expected header size
+ */
export const deserializeIggyMessageHeaders = (b: Buffer) => {
if(b.length !== IGGY_MESSAGE_HEADER_SIZE)
throw new Error(
diff --git a/foreign/node/src/wire/message/message.utils.ts
b/foreign/node/src/wire/message/message.utils.ts
index 2ac5311cf..f0b614d11 100644
--- a/foreign/node/src/wire/message/message.utils.ts
+++ b/foreign/node/src/wire/message/message.utils.ts
@@ -28,23 +28,44 @@ import { serializeIggyMessageHeader } from
'./iggy-header.utils.js';
const debug = Debug('iggy:client');
-/** index size per messages in bit */
+/** Size of the message index entry in bytes (16 bytes per message) */
const INDEX_SIZE = 16;
+/** Valid types for message ID: numeric, bigint, or UUID string */
export type MessageIdKind = number | bigint | string;
+/**
+ * Message creation parameters.
+ */
export type CreateMessage = {
- id?: MessageIdKind,
+ /** Optional message ID (auto-generated if not provided) */
+ id?: MessageIdKind,
+ /** Optional user-defined headers */
headers?: Headers,
+ /** Message payload as string or Buffer */
payload: string | Buffer
};
+/**
+ * Type guard to check if a value is a valid message ID.
+ *
+ * @param x - Value to check
+ * @returns True if the value is a valid MessageIdKind
+ */
export const isValidMessageId = (x?: unknown): x is MessageIdKind =>
x === undefined ||
'string' === typeof x ||
'bigint' === typeof x ||
'number' === typeof x;
+/**
+ * Serializes a message ID to a 16-byte buffer.
+ * Supports undefined (zero), numeric, bigint, and UUID string formats.
+ *
+ * @param id - Message ID to serialize
+ * @returns 16-byte buffer containing the serialized ID
+ * @throws Error if the ID format is invalid
+ */
export const serializeMessageId = (id?: unknown) => {
if(!isValidMessageId(id))
@@ -73,6 +94,13 @@ export const serializeMessageId = (id?: unknown) => {
}
+/**
+ * Serializes a single message to wire format.
+ * Format: [iggy_header][payload][user_headers]
+ *
+ * @param msg - Message to serialize
+ * @returns Serialized message buffer
+ */
export const serializeMessage = (msg: CreateMessage) => {
const { id, headers, payload } = msg;
@@ -80,7 +108,7 @@ export const serializeMessage = (msg: CreateMessage) => {
const bUserHeaders = serializeHeaders(headers);
const bPayload = 'string' === typeof payload ? Buffer.from(payload) : payload
const bIggyMessageHeader = serializeIggyMessageHeader(bId, bPayload,
bUserHeaders);
-
+
const r = Buffer.concat([
bIggyMessageHeader,
bPayload,
@@ -94,13 +122,26 @@ export const serializeMessage = (msg: CreateMessage) => {
'payload', bPayload.length, bPayload.toString('hex'),
'full len', r.length //, r.toString('hex')
);
-
+
return r;
};
+/**
+ * Serializes multiple messages to an array of buffers.
+ *
+ * @param messages - Array of messages to serialize
+ * @returns Array of serialized message buffers
+ */
export const serializeMessages = (messages: CreateMessage[]) =>
messages.map(c => serializeMessage(c));
+/**
+ * Creates an index buffer for a batch of messages.
+ * Each index entry is 16 bytes tracking message positions.
+ *
+ * @param messages - Array of serialized message buffers
+ * @returns Index buffer
+ */
export const createMessagesIndex = (messages: Buffer[]) => {
const bIndex = Buffer.allocUnsafe(messages.length * INDEX_SIZE);
let currentIndex = 0;
@@ -115,6 +156,16 @@ export const createMessagesIndex = (messages: Buffer[]) =>
{
return bIndex;
}
+/**
+ * Serializes a send messages command payload.
+ * Includes stream/topic identifiers, partitioning, and all messages with
index.
+ *
+ * @param streamId - Stream identifier
+ * @param topicId - Topic identifier
+ * @param messages - Array of messages to send
+ * @param partitioning - Optional partitioning strategy
+ * @returns Serialized command payload
+ */
export const serializeSendMessages = (
streamId: Id,
topicId: Id,
@@ -144,6 +195,15 @@ export const serializeSendMessages = (
]);
};
+/**
+ * Serializes a flush unsaved buffers command payload.
+ *
+ * @param streamId - Stream identifier
+ * @param topicId - Topic identifier
+ * @param partitionId - Partition ID to flush
+ * @param fsync - Whether to force sync to disk
+ * @returns Serialized command payload
+ */
export const serializeFlushUnsavedBuffers = (
streamId: Id,
topicId: Id,
diff --git a/foreign/node/src/wire/message/partitioning.utils.ts
b/foreign/node/src/wire/message/partitioning.utils.ts
index 20d9e82ca..8dd2e289a 100644
--- a/foreign/node/src/wire/message/partitioning.utils.ts
+++ b/foreign/node/src/wire/message/partitioning.utils.ts
@@ -22,59 +22,94 @@ import { uint32ToBuf, uint64ToBuf } from
'../number.utils.js';
import type { ValueOf } from '../../type.utils.js';
+/**
+ * Enumeration of partition selection strategies.
+ */
export const PartitionKind = {
+ /** Server selects partition using round-robin */
Balanced : 1,
+ /** Client specifies exact partition ID */
PartitionId : 2,
+ /** Client provides a key for consistent hashing */
MessageKey : 3
} as const;
-
+/** Type alias for the PartitionKind object */
export type PartitionKind = typeof PartitionKind;
+/** String literal type of partition kind names */
export type PartitionKindId = keyof PartitionKind;
+/** Numeric values of partition kinds */
export type PartitionKindValue = ValueOf<PartitionKind>
+/** Balanced partitioning (server selects partition) */
export type Balanced = {
kind: PartitionKind['Balanced'],
value: null
};
+/** Explicit partition ID selection */
export type PartitionId = {
kind: PartitionKind['PartitionId'],
- value: number // uint32
+ /** Partition ID (uint32) */
+ value: number
};
-// string | uint32/64/128
+/** Possible types for message key values */
export type MessageKeyValue = string | number | bigint | Buffer;
+/** Message key-based partitioning for consistent hashing */
export type MessageKey = {
kind: PartitionKind['MessageKey'],
value: MessageKeyValue
};
+/** Union of all partitioning strategies */
export type Partitioning = Balanced | PartitionId | MessageKey;
+/** Balanced partitioning constant */
const Balanced: Balanced = {
kind: PartitionKind.Balanced,
value: null
};
+/**
+ * Creates a partition ID partitioning strategy.
+ *
+ * @param id - Partition ID to target
+ * @returns PartitionId partitioning object
+ */
const PartitionId = (id: number): PartitionId => ({
kind: PartitionKind.PartitionId,
value: id
});
+/**
+ * Creates a message key partitioning strategy.
+ *
+ * @param key - Key for consistent hashing
+ * @returns MessageKey partitioning object
+ */
const MessageKey = (key: MessageKeyValue): MessageKey => ({
kind: PartitionKind.MessageKey,
value: key
});
-// Helper
+/**
+ * Factory object for creating partitioning strategies.
+ */
export const Partitioning = {
Balanced,
PartitionId,
MessageKey
};
+/**
+ * Serializes a message key value to a buffer.
+ *
+ * @param v - Message key value
+ * @returns Serialized buffer
+ * @throws Error if the value type is not supported
+ */
export const serializeMessageKey = (v: MessageKeyValue) => {
if (v instanceof Buffer) return v;
if ('string' === typeof v) return Buffer.from(v);
@@ -83,6 +118,12 @@ export const serializeMessageKey = (v: MessageKeyValue) => {
throw new Error(`cannot serialize messageKey ${v}, ${typeof v}`);
};
+/**
+ * Serializes the value portion of a partitioning strategy.
+ *
+ * @param part - Partitioning strategy
+ * @returns Serialized value buffer
+ */
export const serializePartitioningValue = (part: Partitioning): Buffer => {
const { kind, value } = part;
switch (kind) {
@@ -92,11 +133,19 @@ export const serializePartitioningValue = (part:
Partitioning): Buffer => {
}
};
+/** Default partitioning strategy (balanced) */
export const default_partionning: Balanced = {
kind: PartitionKind.Balanced,
value: null
};
+/**
+ * Serializes a partitioning strategy to wire format.
+ * Format: [kind (1 byte)][value_length (1 byte)][value]
+ *
+ * @param p - Optional partitioning strategy (defaults to balanced)
+ * @returns Serialized partitioning buffer
+ */
export const serializePartitioning = (p?: Partitioning) => {
const part = p || default_partionning;
const b = Buffer.alloc(2);
diff --git a/foreign/node/src/wire/message/poll-messages.command.ts
b/foreign/node/src/wire/message/poll-messages.command.ts
index 4135ac1f9..c50cd797c 100644
--- a/foreign/node/src/wire/message/poll-messages.command.ts
+++ b/foreign/node/src/wire/message/poll-messages.command.ts
@@ -29,16 +29,30 @@ import {
} from './poll.utils.js';
+/**
+ * Parameters for the poll messages command.
+ */
export type PollMessages = {
+ /** Stream identifier */
streamId: Id,
+ /** Topic identifier */
topicId: Id,
+ /** Consumer configuration */
consumer: Consumer,
+ /** Partition ID (null for all partitions) */
partitionId: number | null,
+ /** Strategy for selecting messages */
pollingStrategy: PollingStrategy,
+ /** Maximum number of messages to poll */
count: number,
+ /** Whether to auto-commit offset after polling */
autocommit: boolean
};
+/**
+ * Poll messages command definition.
+ * Retrieves messages from a topic partition.
+ */
export const POLL_MESSAGES = {
code: COMMAND_CODE.PollMessages,
@@ -55,4 +69,7 @@ export const POLL_MESSAGES = {
}
};
+/**
+ * Executable poll messages command function.
+ */
export const pollMessages = wrapCommand<PollMessages,
PollMessagesResponse>(POLL_MESSAGES);
diff --git a/foreign/node/src/wire/message/poll.utils.ts
b/foreign/node/src/wire/message/poll.utils.ts
index 6346e8194..fbdaa244d 100644
--- a/foreign/node/src/wire/message/poll.utils.ts
+++ b/foreign/node/src/wire/message/poll.utils.ts
@@ -26,46 +26,65 @@ import { Transform, type TransformCallback } from
'node:stream';
import {
deserializeIggyMessageHeaders,
IGGY_MESSAGE_HEADER_SIZE,
- IggyMessageHeader
+ type IggyMessageHeader
} from './iggy-header.utils.js';
+/**
+ * Enumeration of message polling strategies.
+ */
export const PollingStrategyKind = {
+ /** Poll from a specific offset */
Offset: 1,
+ /** Poll from a specific timestamp */
Timestamp: 2,
+ /** Poll from the first message */
First: 3,
+ /** Poll from the last message */
Last: 4,
+ /** Poll the next unconsumed message */
Next: 5
} as const;
+/** Type alias for the PollingStrategyKind object */
export type PollingStrategyKind = typeof PollingStrategyKind;
+/** String literal type of polling strategy names */
export type PollingStrategyKindId = keyof PollingStrategyKind;
+/** Numeric values of polling strategies */
export type PollingStrategyKindValue = ValueOf<PollingStrategyKind>
+/** Polling from a specific offset */
export type OffsetPollingStrategy = {
kind: PollingStrategyKind['Offset'],
+ /** Offset to start polling from */
value: bigint
}
+/** Polling from a specific timestamp */
export type TimestampPollingStrategy = {
kind: PollingStrategyKind['Timestamp'],
+ /** Timestamp in microseconds */
value: bigint
}
+/** Polling from the first message */
export type FirstPollingStrategy = {
kind: PollingStrategyKind['First'],
value: 0n
}
+/** Polling from the last message */
export type LastPollingStrategy = {
kind: PollingStrategyKind['Last'],
value: 0n
}
+/** Polling the next unconsumed message */
export type NextPollingStrategy = {
kind: PollingStrategyKind['Next'],
value: 0n
}
+/** Union of all polling strategy types */
export type PollingStrategy =
OffsetPollingStrategy |
TimestampPollingStrategy |
@@ -74,32 +93,49 @@ export type PollingStrategy =
NextPollingStrategy;
+/** Next polling strategy constant */
const Next: NextPollingStrategy = {
kind: PollingStrategyKind.Next,
value:0n
};
+/** First polling strategy constant */
const First: FirstPollingStrategy = {
kind: PollingStrategyKind.First,
value:0n
};
+/** Last polling strategy constant */
const Last: LastPollingStrategy = {
kind: PollingStrategyKind.Last,
value:0n
};
+/**
+ * Creates an offset polling strategy.
+ *
+ * @param n - Offset to start from
+ * @returns Offset polling strategy
+ */
const Offset = (n: bigint): OffsetPollingStrategy => ({
kind: PollingStrategyKind.Offset,
value: n
});
+/**
+ * Creates a timestamp polling strategy.
+ *
+ * @param n - Timestamp in microseconds
+ * @returns Timestamp polling strategy
+ */
const Timestamp = (n: bigint): TimestampPollingStrategy => ({
kind: PollingStrategyKind.Timestamp,
value: n
});
-// helper
+/**
+ * Factory object for creating polling strategies.
+ */
export const PollingStrategy = {
Next,
First,
@@ -109,6 +145,18 @@ export const PollingStrategy = {
};
+/**
+ * Serializes a poll messages command payload.
+ *
+ * @param streamId - Stream identifier
+ * @param topicId - Topic identifier
+ * @param consumer - Consumer configuration
+ * @param partitionId - Partition ID (null for all partitions)
+ * @param pollingStrategy - Strategy for selecting messages
+ * @param count - Maximum number of messages to poll
+ * @param autocommit - Whether to auto-commit offset after polling
+ * @returns Serialized command payload
+ */
export const serializePollMessages = (
streamId: Id,
topicId: Id,
@@ -130,37 +178,74 @@ export const serializePollMessages = (
]);
};
+/**
+ * Enumeration of message states.
+ */
export const MessageState = {
+ /** Message is available for consumption */
Available: 1,
+ /** Message is temporarily unavailable */
Unavailable: 10,
+ /** Message processing failed */
Poisoned: 20,
+ /** Message is scheduled for deletion */
MarkedForDeletion: 30
}
+/** Type alias for the MessageState object */
type MessageState = typeof MessageState;
+/** String literal type of message state names */
type MessageStateId = keyof MessageState;
+/** Numeric values of message states */
type MessageStateValue = ValueOf<MessageState>;
+/** Reverse mapping from numeric value to state name */
const ReverseMessageState = reverseRecord(MessageState);
+/**
+ * Maps a numeric message state to its string identifier.
+ *
+ * @param k - Numeric state value
+ * @returns State identifier string
+ * @throws Error if the state is unknown
+ */
export const mapMessageState = (k: number): MessageStateId => {
if(!ReverseMessageState[k as MessageStateValue])
throw new Error(`unknow message state: ${k}`);
return ReverseMessageState[k as MessageStateValue];
}
+/**
+ * A polled message with headers, payload, and user headers.
+ */
export type Message = {
+ /** Iggy message header metadata */
headers: IggyMessageHeader,
+ /** Message payload data */
payload: Buffer,
+ /** User-defined headers */
userHeaders: HeadersMap
};
+/**
+ * Response from a poll messages command.
+ */
export type PollMessagesResponse = {
+ /** Partition the messages came from */
partitionId: number,
+ /** Current offset in the partition */
currentOffset: bigint,
+ /** Number of messages returned */
count: number,
+ /** Array of polled messages */
messages: Message[]
};
+/**
+ * Deserializes an array of messages from a buffer.
+ *
+ * @param b - Buffer containing serialized messages
+ * @returns Array of deserialized messages
+ */
export const deserializeMessages = (b: Buffer) => {
const messages: Message[] = [];
let pos = 0;
@@ -190,6 +275,13 @@ export const deserializeMessages = (b: Buffer) => {
return messages;
}
+/**
+ * Deserializes a poll messages response from a buffer.
+ *
+ * @param r - Response buffer
+ * @param pos - Starting position
+ * @returns Parsed PollMessagesResponse
+ */
export const deserializePollMessages = (r: Buffer, pos = 0) => {
const partitionId = r.readUInt32LE(pos);
const currentOffset = r.readBigUInt64LE(pos + 4);
@@ -204,7 +296,11 @@ export const deserializePollMessages = (r: Buffer, pos =
0) => {
}
};
-
+/**
+ * Creates a Transform stream for deserializing poll messages responses.
+ *
+ * @returns Transform stream that outputs PollMessagesResponse objects
+ */
export const deserializePollMessagesTransform = () => new Transform({
objectMode: true,
transform(chunk: Buffer, encoding: BufferEncoding, cb: TransformCallback) {
diff --git a/foreign/node/src/wire/message/send-messages.command.ts
b/foreign/node/src/wire/message/send-messages.command.ts
index 334fa867e..0d3bb7a48 100644
--- a/foreign/node/src/wire/message/send-messages.command.ts
+++ b/foreign/node/src/wire/message/send-messages.command.ts
@@ -25,13 +25,24 @@ import { deserializeVoidResponse } from
'../../client/client.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the send messages command.
+ */
export type SendMessages = {
+ /** Stream identifier */
streamId: Id,
+ /** Topic identifier */
topicId: Id,
+ /** Array of messages to send */
messages: CreateMessage[],
+ /** Optional partitioning strategy */
partition?: Partitioning,
};
+/**
+ * Send messages command definition.
+ * Publishes messages to a topic.
+ */
export const SEND_MESSAGES = {
code: COMMAND_CODE.SendMessages,
@@ -42,4 +53,7 @@ export const SEND_MESSAGES = {
deserialize: deserializeVoidResponse
};
+/**
+ * Executable send messages command function.
+ */
export const sendMessages = wrapCommand<SendMessages, boolean>(SEND_MESSAGES);
diff --git a/foreign/node/src/wire/number.utils.ts
b/foreign/node/src/wire/number.utils.ts
index 7ab9d7907..18b0750af 100644
--- a/foreign/node/src/wire/number.utils.ts
+++ b/foreign/node/src/wire/number.utils.ts
@@ -18,80 +18,157 @@
*/
+/**
+ * Converts a boolean value to a 1-byte Buffer.
+ *
+ * @param v - Boolean value to convert
+ * @returns Buffer containing 0x00 (false) or 0x01 (true)
+ */
export const boolToBuf = (v: boolean) => {
const b = Buffer.allocUnsafe(1);
b.writeUInt8(!v ? 0 : 1);
return b;
}
+/**
+ * Converts an 8-bit signed integer to a 1-byte Buffer.
+ *
+ * @param v - Signed integer value (-128 to 127)
+ * @returns Buffer containing the value
+ */
export const int8ToBuf = (v: number) => {
const b = Buffer.allocUnsafe(1);
b.writeInt8(v);
return b;
}
+/**
+ * Converts a 16-bit signed integer to a 2-byte Buffer in little-endian format.
+ *
+ * @param v - Signed integer value (-32,768 to 32,767)
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const int16ToBuf = (v: number) => {
const b = Buffer.allocUnsafe(2);
b.writeInt16LE(v);
return b;
}
+/**
+ * Converts a 32-bit signed integer to a 4-byte Buffer in little-endian format.
+ *
+ * @param v - Signed integer value (-2,147,483,648 to 2,147,483,647)
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const int32ToBuf = (v: number) => {
const b = Buffer.allocUnsafe(4);
b.writeInt32LE(v);
return b;
}
+/**
+ * Converts a 64-bit signed BigInt to an 8-byte Buffer in little-endian format.
+ *
+ * @param v - BigInt value
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const int64ToBuf = (v: bigint) => {
const b = Buffer.allocUnsafe(8);
b.writeBigInt64LE(v);
return b;
}
+/**
+ * Converts an 8-bit unsigned integer to a 1-byte Buffer.
+ *
+ * @param v - Unsigned integer value (0 to 255)
+ * @returns Buffer containing the value
+ */
export const uint8ToBuf = (v: number) => {
const b = Buffer.allocUnsafe(1);
b.writeUInt8(v);
return b;
}
+/**
+ * Converts a 16-bit unsigned integer to a 2-byte Buffer in little-endian
format.
+ *
+ * @param v - Unsigned integer value (0 to 65,535)
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const uint16ToBuf = (v: number) => {
const b = Buffer.allocUnsafe(2);
b.writeUInt16LE(v);
return b;
}
+/**
+ * Converts a 32-bit unsigned integer to a 4-byte Buffer in little-endian
format.
+ *
+ * @param v - Unsigned integer value (0 to 4,294,967,295)
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const uint32ToBuf = (v: number) => {
const b = Buffer.allocUnsafe(4);
b.writeUInt32LE(v);
return b;
}
+/**
+ * Converts a 64-bit unsigned BigInt to an 8-byte Buffer in little-endian
format.
+ *
+ * @param v - Unsigned BigInt value
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const uint64ToBuf = (v: bigint) => {
const b = Buffer.allocUnsafe(8);
b.writeBigUInt64LE(v);
return b;
}
+/**
+ * Converts a 32-bit floating-point number to a 4-byte Buffer in little-endian
format.
+ *
+ * @param v - Float value (IEEE 754 single precision)
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const floatToBuf = (v: number) => {
const b = Buffer.allocUnsafe(4);
b.writeFloatLE(v);
return b;
}
+/**
+ * Converts a 64-bit floating-point number to an 8-byte Buffer in
little-endian format.
+ *
+ * @param v - Double value (IEEE 754 double precision)
+ * @returns Buffer containing the value in little-endian byte order
+ */
export const doubleToBuf = (v: number) => {
const b = Buffer.allocUnsafe(8);
b.writeDoubleLE(v);
return b;
}
-// bigint => u128 LE
+/**
+ * Converts a BigInt to a 128-bit unsigned integer Buffer in little-endian
format.
+ *
+ * @param num - BigInt value to convert
+ * @param width - Width in bytes (default: 16)
+ * @returns Buffer containing the value in little-endian byte order
+ */
export function u128ToBuf(num: bigint, width = 16): Buffer {
const hex = num.toString(16);
const b = Buffer.from(hex.padStart(width * 2, '0').slice(0, width * 2),
'hex');
return b.reverse();
}
-// u128 LE => Bigint
+/**
+ * Converts a 128-bit unsigned integer Buffer in little-endian format to a
BigInt.
+ *
+ * @param b - Buffer containing the value in little-endian byte order
+ * @returns BigInt representation of the value
+ */
export function u128LEBufToBigint(b: Buffer): bigint {
const hex = b.reverse().toString('hex');
return hex.length === 0 ? BigInt(0) : BigInt(`0x${hex}`);
diff --git a/foreign/node/src/wire/offset/delete-offset.command.ts
b/foreign/node/src/wire/offset/delete-offset.command.ts
index fae8e0f0e..c8c28552a 100644
--- a/foreign/node/src/wire/offset/delete-offset.command.ts
+++ b/foreign/node/src/wire/offset/delete-offset.command.ts
@@ -19,15 +19,22 @@
import { deserializeVoidResponse } from '../../client/client.utils.js';
-import type { CommandResponse } from '../../client/client.type.js';
import type { GetOffset } from './get-offset.command.js';
import { wrapCommand } from '../command.utils.js';
import { serializeGetOffset } from './offset.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the delete offset command.
+ * Same as GetOffset parameters.
+ */
export type DeleteOffset = GetOffset;
+/**
+ * Delete offset command definition.
+ * Removes a stored consumer offset.
+ */
export const DELETE_OFFSET = {
code: COMMAND_CODE.DeleteConsumerOffset,
@@ -39,4 +46,7 @@ export const DELETE_OFFSET = {
};
+/**
+ * Executable delete offset command function.
+ */
export const deleteOffset = wrapCommand<DeleteOffset, boolean>(DELETE_OFFSET);
diff --git a/foreign/node/src/wire/offset/get-offset.command.ts
b/foreign/node/src/wire/offset/get-offset.command.ts
index b4220280f..beb66b953 100644
--- a/foreign/node/src/wire/offset/get-offset.command.ts
+++ b/foreign/node/src/wire/offset/get-offset.command.ts
@@ -24,14 +24,25 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
import { serializeGetOffset, type Consumer, type OffsetResponse } from
'./offset.utils.js';
+/**
+ * Parameters for the get offset command.
+ */
export type GetOffset = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Consumer identifier (single or group) */
consumer: Consumer,
+ /** Partition ID (required for single consumer, null for group) */
partitionId: number | null
};
+/**
+ * Get offset command definition.
+ * Retrieves the current and stored offset for a consumer.
+ */
export const GET_OFFSET = {
code: COMMAND_CODE.GetOffset,
@@ -56,4 +67,7 @@ export const GET_OFFSET = {
};
+/**
+ * Executable get offset command function.
+ */
export const getOffset = wrapCommand<GetOffset, OffsetResponse |
null>(GET_OFFSET);
diff --git a/foreign/node/src/wire/offset/offset.utils.ts
b/foreign/node/src/wire/offset/offset.utils.ts
index 30e8fac96..436b35a48 100644
--- a/foreign/node/src/wire/offset/offset.utils.ts
+++ b/foreign/node/src/wire/offset/offset.utils.ts
@@ -22,42 +22,80 @@ import type { ValueOf } from '../../type.utils.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { uint8ToBuf } from '../number.utils.js';
+/**
+ * Consumer kind options for offset operations.
+ */
export const ConsumerKind = {
+ /** Single consumer (not part of a group) */
Single: 1,
+ /** Consumer group member */
Group: 2
} as const;
+/** Type alias for the ConsumerKind object */
export type ConsumerKind = typeof ConsumerKind;
+/** String literal type of consumer kind names */
export type ConsumerKindId = keyof ConsumerKind;
+/** Numeric values of consumer kinds */
export type ConsumerKindValue = ValueOf<ConsumerKind>
+/**
+ * Consumer identifier for offset operations.
+ */
export type Consumer = {
+ /** Consumer kind (Single or Group) */
kind: ConsumerKindValue,
+ /** Consumer or group identifier */
id: Id
}
+/**
+ * Default single consumer instance.
+ */
export const ConsumerSingle = {
kind: ConsumerKind.Single,
id: 0
};
+/** Type for single consumer */
export type ConsumerSingle = typeof ConsumerSingle;
+/**
+ * Consumer factory for creating consumer identifiers.
+ */
export const Consumer = {
+ /** Single consumer instance */
Single: ConsumerSingle,
+ /** Creates a group consumer identifier */
Group: (groupId: Id) => ({
kind: ConsumerKind.Group,
id: groupId
})
}
+/**
+ * Response from offset operations.
+ */
export type OffsetResponse = {
+ /** Partition ID */
partitionId: number,
+ /** Current offset in the partition */
currentOffset: bigint,
+ /** Stored consumer offset */
storedOffset: bigint
};
+/**
+ * Serializes parameters for get/delete offset operations.
+ *
+ * @param streamId - Stream identifier (ID or name)
+ * @param topicId - Topic identifier (ID or name)
+ * @param consumer - Consumer identifier (single or group)
+ * @param partitionId - Partition ID (required for single consumer, optional
for group)
+ * @returns Buffer containing serialized offset request
+ * @throws Error if partitionId is null for single consumer kind
+ */
export const serializeGetOffset = (
streamId: Id,
topicId: Id,
@@ -92,6 +130,16 @@ export const serializeGetOffset = (
]);
};
+/**
+ * Serializes parameters for store offset operation.
+ *
+ * @param streamId - Stream identifier (ID or name)
+ * @param topicId - Topic identifier (ID or name)
+ * @param consumer - Consumer identifier (single or group)
+ * @param partitionId - Partition ID (required for single consumer, optional
for group)
+ * @param offset - Offset value to store
+ * @returns Buffer containing serialized store offset request
+ */
export const serializeStoreOffset = (
streamId: Id,
topicId: Id,
diff --git a/foreign/node/src/wire/offset/store-offset.command.ts
b/foreign/node/src/wire/offset/store-offset.command.ts
index c64e2e955..d3f6a1ba4 100644
--- a/foreign/node/src/wire/offset/store-offset.command.ts
+++ b/foreign/node/src/wire/offset/store-offset.command.ts
@@ -24,14 +24,26 @@ import { COMMAND_CODE } from '../command.code.js';
import { type Id } from '../identifier.utils.js';
import { serializeStoreOffset, type Consumer } from './offset.utils.js';
+/**
+ * Parameters for the store offset command.
+ */
export type StoreOffset = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Consumer identifier (single or group) */
consumer: Consumer,
+ /** Partition ID (required for single consumer, null for group) */
partitionId: number | null,
+ /** Offset value to store */
offset: bigint
};
+/**
+ * Store offset command definition.
+ * Persists a consumer's offset for a partition.
+ */
export const STORE_OFFSET = {
code: COMMAND_CODE.StoreOffset,
@@ -42,4 +54,7 @@ export const STORE_OFFSET = {
};
+/**
+ * Executable store offset command function.
+ */
export const storeOffset = wrapCommand<StoreOffset, boolean>(STORE_OFFSET);
diff --git a/foreign/node/src/wire/partition/create-partition.command.ts
b/foreign/node/src/wire/partition/create-partition.command.ts
index 510bf5905..6cc258c12 100644
--- a/foreign/node/src/wire/partition/create-partition.command.ts
+++ b/foreign/node/src/wire/partition/create-partition.command.ts
@@ -24,12 +24,22 @@ import { deserializeVoidResponse } from
'../../client/client.utils.js';
import type { Id } from '../identifier.utils.js';
import { serializePartitionParams } from './partition.utils.js';
+/**
+ * Parameters for the create partition command.
+ */
export type CreatePartition = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Number of partitions to create (1-1000, default: 1) */
partitionCount?: number
};
+/**
+ * Create partition command definition.
+ * Adds new partitions to a topic.
+ */
export const CREATE_PARTITION = {
code: COMMAND_CODE.CreatePartitions,
@@ -41,4 +51,7 @@ export const CREATE_PARTITION = {
};
+/**
+ * Executable create partition command function.
+ */
export const createPartition = wrapCommand<CreatePartition,
boolean>(CREATE_PARTITION);
diff --git a/foreign/node/src/wire/partition/delete-partition.command.ts
b/foreign/node/src/wire/partition/delete-partition.command.ts
index 8f1fd844a..882bc6cb5 100644
--- a/foreign/node/src/wire/partition/delete-partition.command.ts
+++ b/foreign/node/src/wire/partition/delete-partition.command.ts
@@ -24,21 +24,34 @@ import type { Id } from '../identifier.utils.js';
import { serializePartitionParams } from './partition.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the delete partition command.
+ */
export type DeletePartition = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Number of partitions to delete (1-1000) */
partitionCount: number
};
-
+
+/**
+ * Delete partitions command definition.
+ * Removes partitions from a topic.
+ */
export const DELETE_PARTITIONS = {
code: COMMAND_CODE.DeletePartitions,
serialize: ({ streamId, topicId, partitionCount }: DeletePartition) => {
return serializePartitionParams(streamId, topicId, partitionCount);
},
-
+
deserialize: deserializeVoidResponse
};
+/**
+ * Executable delete partition command function.
+ */
export const deletePartition = wrapCommand<DeletePartition,
boolean>(DELETE_PARTITIONS);
diff --git a/foreign/node/src/wire/partition/partition.utils.ts
b/foreign/node/src/wire/partition/partition.utils.ts
index a60be9f2a..e39b502d4 100644
--- a/foreign/node/src/wire/partition/partition.utils.ts
+++ b/foreign/node/src/wire/partition/partition.utils.ts
@@ -21,6 +21,15 @@
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { uint32ToBuf } from '../number.utils.js';
+/**
+ * Serializes partition parameters for create/delete operations.
+ *
+ * @param streamId - Stream identifier (ID or name)
+ * @param topicId - Topic identifier (ID or name)
+ * @param partitionCount - Number of partitions (1-1000)
+ * @returns Buffer containing serialized partition request
+ * @throws Error if partitionCount is not between 1 and 1000
+ */
export const serializePartitionParams = (
streamId: Id, topicId: Id, partitionCount = 1,
) => {
diff --git a/foreign/node/src/wire/serialize.utils.ts
b/foreign/node/src/wire/serialize.utils.ts
index b5a82f8cb..c4ad0b03b 100644
--- a/foreign/node/src/wire/serialize.utils.ts
+++ b/foreign/node/src/wire/serialize.utils.ts
@@ -18,10 +18,30 @@
*/
+/**
+ * Converts a microsecond timestamp (BigInt) to a JavaScript Date object.
+ *
+ * @param n - Timestamp in microseconds as BigInt
+ * @returns JavaScript Date object
+ */
export const toDate = (n: bigint): Date => new Date(Number(n / BigInt(1000)));
+/**
+ * Serializes a UUID string to a 16-byte Buffer.
+ * Removes dashes from the UUID and converts to binary format.
+ *
+ * @param id - UUID string (e.g., "550e8400-e29b-41d4-a716-446655440000")
+ * @returns 16-byte Buffer containing the UUID
+ */
export const serializeUUID = (id: string) => Buffer.from(id.replaceAll('-',
''), 'hex');
+/**
+ * Deserializes a 16-byte Buffer to a UUID string.
+ * Converts binary format to standard UUID string with dashes.
+ *
+ * @param p - 16-byte Buffer containing the UUID
+ * @returns UUID string (e.g., "550e8400-e29b-41d4-a716-446655440000")
+ */
export const deserializeUUID = (p: Buffer) => {
const v = p.toString('hex');
return `${v.slice(0, 8)}-` +
diff --git a/foreign/node/src/wire/session/login-with-token.command.ts
b/foreign/node/src/wire/session/login-with-token.command.ts
index 08bc3f605..238bd602b 100644
--- a/foreign/node/src/wire/session/login-with-token.command.ts
+++ b/foreign/node/src/wire/session/login-with-token.command.ts
@@ -23,10 +23,18 @@ import { serializeLoginWithToken, type LoginResponse } from
'./login.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the login with token command.
+ */
export type LoginWithTokenParam = {
+ /** Access token string (1-255 bytes) */
token: string
};
+/**
+ * Login with token command definition.
+ * Authenticates a user with an access token.
+ */
export const LOGIN_WITH_TOKEN = {
code: COMMAND_CODE.LoginWithAccessToken,
@@ -38,4 +46,7 @@ export const LOGIN_WITH_TOKEN = {
};
+/**
+ * Executable login with token command function.
+ */
export const loginWithToken = wrapCommand<LoginWithTokenParam,
LoginResponse>(LOGIN_WITH_TOKEN);
diff --git a/foreign/node/src/wire/session/login.command.ts
b/foreign/node/src/wire/session/login.command.ts
index 198ce6624..37ba0b96c 100644
--- a/foreign/node/src/wire/session/login.command.ts
+++ b/foreign/node/src/wire/session/login.command.ts
@@ -28,6 +28,10 @@ import {
} from './login.utils.js';
+/**
+ * Login command definition.
+ * Authenticates a user with username and password.
+ */
export const LOGIN = {
code: COMMAND_CODE.LoginUser,
@@ -39,4 +43,7 @@ export const LOGIN = {
};
+/**
+ * Executable login command function.
+ */
export const login = wrapCommand<LoginCredentials, LoginResponse>(LOGIN);
diff --git a/foreign/node/src/wire/session/login.utils.ts
b/foreign/node/src/wire/session/login.utils.ts
index d0b12c100..9bc51b054 100644
--- a/foreign/node/src/wire/session/login.utils.ts
+++ b/foreign/node/src/wire/session/login.utils.ts
@@ -19,17 +19,35 @@
import { uint32ToBuf, uint8ToBuf } from "../number.utils.js";
+/**
+ * Response from a successful login.
+ */
export type LoginResponse = {
+ /** The authenticated user's ID */
userId: number
}
+/**
+ * Credentials for user login.
+ */
export type LoginCredentials = {
+ /** Username (1-255 bytes) */
username: string,
+ /** Password (1-255 bytes) */
password: string,
+ /** Optional client version string */
version?: string,
+ /** Optional client context string */
context?: string
}
+/**
+ * Serializes login credentials for the login command.
+ *
+ * @param credentials - Login credentials object
+ * @returns Buffer containing serialized login request
+ * @throws Error if username or password is not between 1 and 255 bytes
+ */
export const serializeLoginUser = ({
username,
password,
@@ -64,6 +82,13 @@ export const serializeLoginUser = ({
};
+/**
+ * Serializes a token for the login with token command.
+ *
+ * @param token - Access token string (1-255 bytes)
+ * @returns Buffer containing serialized token login request
+ * @throws Error if token is not between 1 and 255 bytes
+ */
export const serializeLoginWithToken = (token: string) => {
const bToken = Buffer.from(token);
diff --git a/foreign/node/src/wire/session/logout.command.ts
b/foreign/node/src/wire/session/logout.command.ts
index 22c0da29d..c4df0ffef 100644
--- a/foreign/node/src/wire/session/logout.command.ts
+++ b/foreign/node/src/wire/session/logout.command.ts
@@ -22,6 +22,10 @@ import { deserializeVoidResponse } from
'../../client/client.utils.js';
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Logout command definition.
+ * Ends the current user session.
+ */
export const LOGOUT = {
code: COMMAND_CODE.LogoutUser,
@@ -33,4 +37,7 @@ export const LOGOUT = {
};
+/**
+ * Executable logout command function.
+ */
export const logout = wrapCommand<void, boolean>(LOGOUT);
diff --git a/foreign/node/src/wire/stream/create-stream.command.ts
b/foreign/node/src/wire/stream/create-stream.command.ts
index 336083d9a..c8ee0720b 100644
--- a/foreign/node/src/wire/stream/create-stream.command.ts
+++ b/foreign/node/src/wire/stream/create-stream.command.ts
@@ -23,11 +23,20 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
import { deserializeToStream, type Stream } from './stream.utils.js';
+/**
+ * Parameters for the create stream command.
+ */
export type CreateStream = {
- streamId?: number, // Optional - auto-assigned by server if not provided
+ /** Optional stream ID (auto-assigned by server if not provided) */
+ streamId?: number,
+ /** Stream name (1-255 bytes) */
name: string
};
+/**
+ * Create stream command definition.
+ * Creates a new stream with the specified name.
+ */
export const CREATE_STREAM = {
code: COMMAND_CODE.CreateStream,
@@ -53,4 +62,7 @@ export const CREATE_STREAM = {
};
+/**
+ * Executable create stream command function.
+ */
export const createStream = wrapCommand<CreateStream, Stream>(CREATE_STREAM);
diff --git a/foreign/node/src/wire/stream/delete-stream.command.ts
b/foreign/node/src/wire/stream/delete-stream.command.ts
index a76a492b5..aad416451 100644
--- a/foreign/node/src/wire/stream/delete-stream.command.ts
+++ b/foreign/node/src/wire/stream/delete-stream.command.ts
@@ -23,10 +23,18 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
+/**
+ * Parameters for the delete stream command.
+ */
export type DeleteStream = {
+ /** Stream identifier (ID or name) */
streamId: Id
};
+/**
+ * Delete stream command definition.
+ * Removes a stream and all its topics from the system.
+ */
export const DELETE_STREAM = {
code: COMMAND_CODE.DeleteStream,
@@ -38,4 +46,7 @@ export const DELETE_STREAM = {
};
+/**
+ * Executable delete stream command function.
+ */
export const deleteStream = wrapCommand<DeleteStream, boolean>(DELETE_STREAM);
diff --git a/foreign/node/src/wire/stream/ensure-stream.virtual.command.ts
b/foreign/node/src/wire/stream/ensure-stream.virtual.command.ts
index 436c47f9c..ed8d6a88d 100644
--- a/foreign/node/src/wire/stream/ensure-stream.virtual.command.ts
+++ b/foreign/node/src/wire/stream/ensure-stream.virtual.command.ts
@@ -18,11 +18,18 @@
*/
-import { ClientProvider } from '../../client/index.js';
+import type { ClientProvider } from '../../client/index.js';
import { createStream } from './create-stream.command.js';
import { getStream } from './get-stream.command.js';
+/**
+ * Creates a virtual command that ensures a stream exists.
+ * If the stream does not exist, it will be created.
+ *
+ * @param c - Client provider function
+ * @returns Function that ensures a stream exists by name
+ */
export const ensureStream = (c: ClientProvider) =>
async function ensureStream(streamName: string) {
const stream = await getStream(c)({ streamId: streamName });
diff --git a/foreign/node/src/wire/stream/get-stream.command.ts
b/foreign/node/src/wire/stream/get-stream.command.ts
index a324708e2..66828e8f0 100644
--- a/foreign/node/src/wire/stream/get-stream.command.ts
+++ b/foreign/node/src/wire/stream/get-stream.command.ts
@@ -24,10 +24,18 @@ import { COMMAND_CODE } from '../command.code.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { deserializeToStream, type Stream } from './stream.utils.js';
+/**
+ * Parameters for the get stream command.
+ */
export type GetStream = {
+ /** Stream identifier (ID or name) */
streamId: Id
};
+/**
+ * Get stream command definition.
+ * Retrieves a single stream by ID or name.
+ */
export const GET_STREAM = {
code: COMMAND_CODE.GetStream,
@@ -43,4 +51,7 @@ export const GET_STREAM = {
}
+/**
+ * Executable get stream command function.
+ */
export const getStream = wrapCommand<GetStream, Stream | null>(GET_STREAM);
diff --git a/foreign/node/src/wire/stream/get-streams.command.ts
b/foreign/node/src/wire/stream/get-streams.command.ts
index e3b01c19f..247bb3e78 100644
--- a/foreign/node/src/wire/stream/get-streams.command.ts
+++ b/foreign/node/src/wire/stream/get-streams.command.ts
@@ -23,11 +23,15 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
import { deserializeToStream, type Stream } from './stream.utils.js';
+/**
+ * Get streams command definition.
+ * Retrieves all streams.
+ */
export const GET_STREAMS = {
code: COMMAND_CODE.GetStreams,
-
+
serialize: () => Buffer.alloc(0),
-
+
deserialize: (r: CommandResponse) => {
const payloadSize = r.data.length;
const streams = [];
@@ -42,4 +46,7 @@ export const GET_STREAMS = {
};
+/**
+ * Executable get streams command function.
+ */
export const getStreams = wrapCommand<void, Stream[]>(GET_STREAMS);
diff --git a/foreign/node/src/wire/stream/purge-stream.command.ts
b/foreign/node/src/wire/stream/purge-stream.command.ts
index 9a59c5348..e4ebc256f 100644
--- a/foreign/node/src/wire/stream/purge-stream.command.ts
+++ b/foreign/node/src/wire/stream/purge-stream.command.ts
@@ -23,10 +23,18 @@ import { COMMAND_CODE } from '../command.code.js';
import { deserializeVoidResponse } from '../../client/client.utils.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
+/**
+ * Parameters for the purge stream command.
+ */
export type PurgeStream = {
+ /** Stream identifier (ID or name) */
streamId: Id
};
+/**
+ * Purge stream command definition.
+ * Deletes all messages from a stream while keeping the stream itself.
+ */
export const PURGE_STREAM = {
code: COMMAND_CODE.PurgeStream,
@@ -38,4 +46,7 @@ export const PURGE_STREAM = {
};
+/**
+ * Executable purge stream command function.
+ */
export const purgeStream = wrapCommand<PurgeStream, boolean>(PURGE_STREAM);
diff --git a/foreign/node/src/wire/stream/stream.utils.ts
b/foreign/node/src/wire/stream/stream.utils.ts
index 63863af28..a905954d4 100644
--- a/foreign/node/src/wire/stream/stream.utils.ts
+++ b/foreign/node/src/wire/stream/stream.utils.ts
@@ -20,20 +20,42 @@
import { toDate } from '../serialize.utils.js';
+/**
+ * Stream information returned from the server.
+ */
export type Stream = {
+ /** Stream ID */
id: number,
+ /** Stream name */
name: string,
+ /** Number of topics in the stream */
topicsCount: number,
+ /** Total size of the stream in bytes */
sizeBytes: bigint,
+ /** Total number of messages in the stream */
messagesCount: bigint,
+ /** Stream creation timestamp */
createdAt: Date
}
+/**
+ * Result of deserializing a stream.
+ */
type StreamDeserialized = {
+ /** Number of bytes consumed */
bytesRead: number,
+ /** Deserialized stream data */
data: Stream
};
+/**
+ * Deserializes a stream from a buffer.
+ *
+ * @param r - Buffer containing serialized stream data
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized stream data
+ * @throws Error if the buffer is empty (stream does not exist)
+ */
export const deserializeToStream = (r: Buffer, pos = 0): StreamDeserialized =>
{
if (r.length === 0)
throw new Error('Steam does not exist');
diff --git a/foreign/node/src/wire/stream/update-stream.command.ts
b/foreign/node/src/wire/stream/update-stream.command.ts
index 869b15d2c..4aff370a8 100644
--- a/foreign/node/src/wire/stream/update-stream.command.ts
+++ b/foreign/node/src/wire/stream/update-stream.command.ts
@@ -23,11 +23,20 @@ import { COMMAND_CODE } from '../command.code.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { uint8ToBuf } from '../number.utils.js';
+/**
+ * Parameters for the update stream command.
+ */
export type UpdateStream = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** New stream name (1-255 bytes) */
name: string
}
+/**
+ * Update stream command definition.
+ * Updates a stream's name.
+ */
export const UPDATE_STREAM = {
code: COMMAND_CODE.UpdateStream,
@@ -49,4 +58,7 @@ export const UPDATE_STREAM = {
};
+/**
+ * Executable update stream command function.
+ */
export const updateStream = wrapCommand<UpdateStream, boolean>(UPDATE_STREAM);
diff --git a/foreign/node/src/wire/token/create-token.command.ts
b/foreign/node/src/wire/token/create-token.command.ts
index f96ec05a0..7fd32b690 100644
--- a/foreign/node/src/wire/token/create-token.command.ts
+++ b/foreign/node/src/wire/token/create-token.command.ts
@@ -23,11 +23,20 @@ import { deserializeCreateToken, type CreateTokenResponse }
from './token.utils.
import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the create access token command.
+ */
export type CreateToken = {
+ /** Token name (1-255 bytes) */
name: string,
+ /** Token expiry time in seconds (default: 600) */
expiry?: bigint
}
+/**
+ * Create access token command definition.
+ * Creates a new access token for authentication.
+ */
export const CREATE_TOKEN = {
code: COMMAND_CODE.CreateAccessToken,
@@ -49,4 +58,7 @@ export const CREATE_TOKEN = {
};
+/**
+ * Executable create access token command function.
+ */
export const createToken = wrapCommand<CreateToken,
CreateTokenResponse>(CREATE_TOKEN);
diff --git a/foreign/node/src/wire/token/delete-token.command.ts
b/foreign/node/src/wire/token/delete-token.command.ts
index d9837e471..2e596e8a4 100644
--- a/foreign/node/src/wire/token/delete-token.command.ts
+++ b/foreign/node/src/wire/token/delete-token.command.ts
@@ -23,10 +23,18 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
import { uint8ToBuf } from '../number.utils.js';
+/**
+ * Parameters for the delete access token command.
+ */
export type DeleteToken = {
+ /** Token name (1-255 bytes) */
name: string
};
+/**
+ * Delete access token command definition.
+ * Removes an existing access token.
+ */
export const DELETE_TOKEN = {
code: COMMAND_CODE.DeleteAccessToken,
@@ -46,4 +54,7 @@ export const DELETE_TOKEN = {
deserialize: deserializeVoidResponse
};
+/**
+ * Executable delete access token command function.
+ */
export const deleteToken = wrapCommand<DeleteToken, boolean>(DELETE_TOKEN);
diff --git a/foreign/node/src/wire/token/get-tokens.command.ts
b/foreign/node/src/wire/token/get-tokens.command.ts
index eb5273706..74bb7b1ff 100644
--- a/foreign/node/src/wire/token/get-tokens.command.ts
+++ b/foreign/node/src/wire/token/get-tokens.command.ts
@@ -24,6 +24,10 @@ import { COMMAND_CODE } from '../command.code.js';
import { deserializeTokens, type Token } from './token.utils.js';
+/**
+ * Get access tokens command definition.
+ * Retrieves all access tokens for the current user.
+ */
export const GET_TOKENS = {
code: COMMAND_CODE.GetAccessTokens,
@@ -33,4 +37,7 @@ export const GET_TOKENS = {
};
+/**
+ * Executable get access tokens command function.
+ */
export const getTokens = wrapCommand<void, Token[]>(GET_TOKENS);
diff --git a/foreign/node/src/wire/token/token.utils.ts
b/foreign/node/src/wire/token/token.utils.ts
index 75549ffb0..3033313fd 100644
--- a/foreign/node/src/wire/token/token.utils.ts
+++ b/foreign/node/src/wire/token/token.utils.ts
@@ -20,31 +20,64 @@
import { toDate } from '../serialize.utils.js';
+/**
+ * Response from creating an access token.
+ */
export type CreateTokenResponse = {
+ /** The generated access token string */
token: string
};
+/**
+ * Result of deserializing a token creation response.
+ */
type TokenDeserialized = {
+ /** Number of bytes consumed */
bytesRead: number,
+ /** Deserialized token response */
data: CreateTokenResponse
};
+/**
+ * Access token information.
+ */
export type Token = {
+ /** Token name */
name: string,
+ /** Token expiry timestamp (null if no expiry) */
expiry: Date | null
}
+/**
+ * Result of deserializing a token.
+ */
type TokenSerialized = {
+ /** Number of bytes consumed */
bytesRead: number,
+ /** Deserialized token data */
data: Token
}
+/**
+ * Deserializes a token creation response from a buffer.
+ *
+ * @param p - Buffer containing serialized token response
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and token string
+ */
export const deserializeCreateToken = (p: Buffer, pos = 0): TokenDeserialized
=> {
const len = p.readUInt8(pos);
const token = p.subarray(pos + 1, pos + 1 + len).toString();
return { bytesRead: 1 + len, data: { token } };
}
+/**
+ * Deserializes a token from a buffer.
+ *
+ * @param p - Buffer containing serialized token data
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized token data
+ */
export const deserializeToken = (p: Buffer, pos = 0): TokenSerialized => {
const nameLength = p.readUInt8(pos);
const name = p.subarray(pos + 1, pos + 1 + nameLength).toString();
@@ -64,6 +97,13 @@ export const deserializeToken = (p: Buffer, pos = 0):
TokenSerialized => {
};
}
+/**
+ * Deserializes multiple tokens from a buffer.
+ *
+ * @param p - Buffer containing serialized tokens data
+ * @param pos - Starting position in the buffer
+ * @returns Array of deserialized tokens
+ */
export const deserializeTokens = (p: Buffer, pos = 0): Token[] => {
const tokens = [];
const len = p.length;
diff --git a/foreign/node/src/wire/topic/create-topic.command.ts
b/foreign/node/src/wire/topic/create-topic.command.ts
index 76d9e51c0..11a70ccf1 100644
--- a/foreign/node/src/wire/topic/create-topic.command.ts
+++ b/foreign/node/src/wire/topic/create-topic.command.ts
@@ -30,16 +30,30 @@ import {
} from './topic.utils.js';
+/**
+ * Parameters for the create topic command.
+ */
export type CreateTopic = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic name (1-255 bytes) */
name: string,
+ /** Number of partitions to create */
partitionCount: number,
+ /** Compression algorithm (None or Gzip) */
compressionAlgorithm: CompressionAlgorithmT,
+ /** Message expiry time in microseconds (0 = unlimited) */
messageExpiry?: bigint,
+ /** Maximum topic size in bytes (0 = unlimited) */
maxTopicSize?: bigint,
+ /** Replication factor (1-255) */
replicationFactor?: number
};
+/**
+ * Create topic command definition.
+ * Creates a new topic within a stream.
+ */
export const CREATE_TOPIC = {
code: COMMAND_CODE.CreateTopic,
@@ -85,4 +99,7 @@ export const CREATE_TOPIC = {
};
+/**
+ * Executable create topic command function.
+ */
export const createTopic = wrapCommand<CreateTopic, Topic>(CREATE_TOPIC);
diff --git a/foreign/node/src/wire/topic/delete-topic.command.ts
b/foreign/node/src/wire/topic/delete-topic.command.ts
index 167696b84..64126bee6 100644
--- a/foreign/node/src/wire/topic/delete-topic.command.ts
+++ b/foreign/node/src/wire/topic/delete-topic.command.ts
@@ -24,12 +24,22 @@ import { COMMAND_CODE } from '../command.code.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { uint32ToBuf } from '../number.utils.js';
-type DeleteTopic = {
+/**
+ * Parameters for the delete topic command.
+ */
+export type DeleteTopic = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** Number of partitions in the topic */
partitionsCount: number
}
+/**
+ * Delete topic command definition.
+ * Permanently removes a topic and all its data from a stream.
+ */
export const DELETE_TOPIC = {
code: COMMAND_CODE.DeleteTopic,
@@ -45,4 +55,7 @@ export const DELETE_TOPIC = {
};
+/**
+ * Executable delete topic command function.
+ */
export const deleteTopic = wrapCommand<DeleteTopic, boolean>(DELETE_TOPIC);
diff --git a/foreign/node/src/wire/topic/ensure-topic.virtual.command.ts
b/foreign/node/src/wire/topic/ensure-topic.virtual.command.ts
index e9641d6ec..cb5b26a46 100644
--- a/foreign/node/src/wire/topic/ensure-topic.virtual.command.ts
+++ b/foreign/node/src/wire/topic/ensure-topic.virtual.command.ts
@@ -18,11 +18,19 @@
*/
-import { Id } from '../identifier.utils.js';
-import { ClientProvider } from '../../client/index.js';
+import type { Id } from '../identifier.utils.js';
+import type { ClientProvider } from '../../client/index.js';
import { createTopic } from './create-topic.command.js';
import { getTopic } from './get-topic.command.js';
+
+/**
+ * Creates a virtual command that ensures a topic exists.
+ * If the topic does not exist, it will be created with default settings.
+ *
+ * @param c - Client provider function
+ * @returns Function that ensures a topic exists by name
+ */
export const ensureTopic = (c: ClientProvider) =>
async function ensureTopic(
streamId: Id,
diff --git a/foreign/node/src/wire/topic/get-topic.command.ts
b/foreign/node/src/wire/topic/get-topic.command.ts
index da161be5c..d43327440 100644
--- a/foreign/node/src/wire/topic/get-topic.command.ts
+++ b/foreign/node/src/wire/topic/get-topic.command.ts
@@ -24,11 +24,20 @@ import { COMMAND_CODE } from '../command.code.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { deserializeTopic, type Topic } from './topic.utils.js';
-type GetTopic = {
+/**
+ * Parameters for the get topic command.
+ */
+export type GetTopic = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id
}
+/**
+ * Get topic command definition.
+ * Retrieves detailed information about a specific topic.
+ */
export const GET_TOPIC = {
code: COMMAND_CODE.GetTopic,
@@ -47,4 +56,7 @@ export const GET_TOPIC = {
};
+/**
+ * Executable get topic command function.
+ */
export const getTopic = wrapCommand<GetTopic, Topic | null>(GET_TOPIC);
diff --git a/foreign/node/src/wire/topic/get-topics.command.ts
b/foreign/node/src/wire/topic/get-topics.command.ts
index b08b94ac8..afcfea484 100644
--- a/foreign/node/src/wire/topic/get-topics.command.ts
+++ b/foreign/node/src/wire/topic/get-topics.command.ts
@@ -24,10 +24,18 @@ import { COMMAND_CODE } from '../command.code.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { deserializeTopics, type Topic } from './topic.utils.js';
+/**
+ * Parameters for the get topics command.
+ */
export type GetTopics = {
+ /** Stream identifier (ID or name) */
streamId: Id
};
+/**
+ * Get topics command definition.
+ * Retrieves all topics within a stream.
+ */
export const GET_TOPICS = {
code: COMMAND_CODE.GetTopics,
@@ -40,4 +48,7 @@ export const GET_TOPICS = {
}
};
+/**
+ * Executable get topics command function.
+ */
export const getTopics = wrapCommand<GetTopics, Topic[]>(GET_TOPICS);
diff --git a/foreign/node/src/wire/topic/purge-topic.command.ts
b/foreign/node/src/wire/topic/purge-topic.command.ts
index e92f990c8..0d291bab8 100644
--- a/foreign/node/src/wire/topic/purge-topic.command.ts
+++ b/foreign/node/src/wire/topic/purge-topic.command.ts
@@ -23,11 +23,20 @@ import { COMMAND_CODE } from '../command.code.js';
import { deserializeVoidResponse } from '../../client/client.utils.js';
import { wrapCommand } from '../command.utils.js';
+/**
+ * Parameters for the purge topic command.
+ */
export type PurgeTopic = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id
};
+/**
+ * Purge topic command definition.
+ * Deletes all messages from a topic while keeping the topic itself.
+ */
export const PURGE_TOPIC = {
code: COMMAND_CODE.PurgeTopic,
@@ -42,4 +51,7 @@ export const PURGE_TOPIC = {
};
+/**
+ * Executable purge topic command function.
+ */
export const purgeTopic = wrapCommand<PurgeTopic, boolean>(PURGE_TOPIC);
diff --git a/foreign/node/src/wire/topic/topic.utils.ts
b/foreign/node/src/wire/topic/topic.utils.ts
index 953ca0978..dcfe4359e 100644
--- a/foreign/node/src/wire/topic/topic.utils.ts
+++ b/foreign/node/src/wire/topic/topic.utils.ts
@@ -21,55 +21,106 @@
import { toDate } from '../serialize.utils.js';
import type { ValueOf } from '../../type.utils.js';
+/**
+ * Basic topic information without partition details.
+ */
export type BaseTopic = {
+ /** Topic ID */
id: number
+ /** Topic name */
name: string,
+ /** Topic creation timestamp */
createdAt: Date,
+ /** Number of partitions */
partitionsCount: number,
+ /** Compression algorithm used */
compressionAlgorithm: number,
+ /** Message expiry time in microseconds (0 = unlimited) */
messageExpiry: bigint,
+ /** Maximum topic size in bytes (0 = unlimited) */
maxTopicSize: bigint,
+ /** Replication factor */
replicationFactor: number
+ /** Total size of the topic in bytes */
sizeBytes: bigint,
+ /** Total number of messages in the topic */
messagesCount: bigint,
};
+/**
+ * Partition information within a topic.
+ */
export type Partition = {
+ /** Partition ID */
id: number,
+ /** Partition creation timestamp */
createdAt: Date,
+ /** Number of segments in the partition */
segmentsCount: number,
+ /** Current offset in the partition */
currentOffset: bigint,
+ /** Total size of the partition in bytes */
sizeBytes: bigint,
+ /** Total number of messages in the partition */
messagesCount: bigint
};
+/** Topic with partition details */
export type Topic = BaseTopic & { partitions: Partition[] };
+/** Base serialization result */
type Serialized = { bytesRead: number };
+/** Result of deserializing a partition */
type PartitionSerialized = { data: Partition } & Serialized;
+/** Result of deserializing a base topic */
type BaseTopicSerialized = { data: BaseTopic } & Serialized;
+/** Result of deserializing a topic */
type TopicSerialized = { data: Topic } & Serialized;
+/**
+ * Compression algorithm options.
+ */
export const CompressionAlgorithm = {
+ /** No compression */
None: 1,
+ /** Gzip compression */
Gzip: 2
};
+/** Type alias for the CompressionAlgorithm object */
export type CompressionAlgorithmKind = typeof CompressionAlgorithm;
+/** String literal type of compression algorithm names */
export type CompressionAlgorithmKindId = keyof CompressionAlgorithm;
+/** Numeric values of compression algorithms */
export type CompressionAlgorithmKindValue = ValueOf<CompressionAlgorithm>;
+/** No compression type */
export type CompressionAlgorithmNone = CompressionAlgorithmKind['None'];
+/** Gzip compression type */
export type CompressionAlgorithmGzip = CompressionAlgorithmKind['Gzip'];
+/** Union of compression algorithm types */
export type CompressionAlgorithm = CompressionAlgorithmNone |
CompressionAlgorithmGzip;
+/**
+ * Type guard for valid compression algorithms.
+ *
+ * @param ca - Compression algorithm value to check
+ * @returns True if the value is a valid compression algorithm
+ */
export const isValidCompressionAlgorithm = (ca: number): ca is
CompressionAlgorithm =>
Object.values(CompressionAlgorithm).includes(ca);
+/**
+ * Deserializes a base topic from a buffer.
+ *
+ * @param p - Buffer containing serialized topic data
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized topic data
+ */
export const deserializeBaseTopic = (p: Buffer, pos = 0): BaseTopicSerialized
=> {
const id = p.readUInt32LE(pos);
const createdAt = toDate(p.readBigUint64LE(pos + 4));
@@ -102,6 +153,13 @@ export const deserializeBaseTopic = (p: Buffer, pos = 0):
BaseTopicSerialized =>
};
+/**
+ * Deserializes a partition from a buffer.
+ *
+ * @param p - Buffer containing serialized partition data
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized partition data
+ */
export const deserializePartition = (p: Buffer, pos = 0): PartitionSerialized
=> {
return {
bytesRead: 4 + 8 + 4 + 8 + 8 + 8,
@@ -117,6 +175,14 @@ export const deserializePartition = (p: Buffer, pos = 0):
PartitionSerialized =>
};
+/**
+ * Deserializes a topic with partitions from a buffer.
+ *
+ * @param p - Buffer containing serialized topic data
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized topic with partitions
+ * @throws Error if the buffer is empty (topic does not exist)
+ */
export const deserializeTopic = (p: Buffer, pos = 0): TopicSerialized => {
if (p.length === 0)
throw new Error('Topic does not exist');
@@ -135,6 +201,13 @@ export const deserializeTopic = (p: Buffer, pos = 0):
TopicSerialized => {
};
+/**
+ * Deserializes multiple topics from a buffer.
+ *
+ * @param p - Buffer containing serialized topics data
+ * @param pos - Starting position in the buffer
+ * @returns Array of deserialized topics
+ */
export const deserializeTopics = (p: Buffer, pos = 0): Topic[] => {
const topics = [];
const len = p.length;
diff --git a/foreign/node/src/wire/topic/update-topic.command.ts
b/foreign/node/src/wire/topic/update-topic.command.ts
index 24f137264..8197f5ede 100644
--- a/foreign/node/src/wire/topic/update-topic.command.ts
+++ b/foreign/node/src/wire/topic/update-topic.command.ts
@@ -29,16 +29,30 @@ import {
} from './topic.utils.js';
+/**
+ * Parameters for the update topic command.
+ */
export type UpdateTopic = {
+ /** Stream identifier (ID or name) */
streamId: Id,
+ /** Topic identifier (ID or name) */
topicId: Id,
+ /** New topic name (1-255 bytes) */
name: string,
+ /** Compression algorithm (None or Gzip) */
compressionAlgorithm?: CompressionAlgorithmT,
+ /** Message expiry time in microseconds (0 = unlimited) */
messageExpiry?: bigint,
+ /** Maximum topic size in bytes (0 = unlimited) */
maxTopicSize?: bigint,
+ /** Replication factor (1-255) */
replicationFactor?: number,
};
+/**
+ * Update topic command definition.
+ * Updates a topic's configuration.
+ */
export const UPDATE_TOPIC = {
code: COMMAND_CODE.UpdateTopic,
@@ -79,4 +93,7 @@ export const UPDATE_TOPIC = {
};
+/**
+ * Executable update topic command function.
+ */
export const updateTopic = wrapCommand<UpdateTopic, boolean>(UPDATE_TOPIC);
diff --git a/foreign/node/src/wire/user/change-password.command.ts
b/foreign/node/src/wire/user/change-password.command.ts
index fb1f229ea..cde1294b7 100644
--- a/foreign/node/src/wire/user/change-password.command.ts
+++ b/foreign/node/src/wire/user/change-password.command.ts
@@ -25,13 +25,23 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
+/**
+ * Parameters for the change password command.
+ */
export type ChangePassword = {
+ /** User ID */
userId: number,
+ /** Current password (1-255 bytes) */
currentPassword: string,
+ /** New password (1-255 bytes) */
newPassword: string
};
+/**
+ * Change password command definition.
+ * Changes a user's password.
+ */
export const CHANGE_PASSWORD = {
code: COMMAND_CODE.ChangePassword,
@@ -60,4 +70,7 @@ export const CHANGE_PASSWORD = {
};
+/**
+ * Executable change password command function.
+ */
export const changePassword = wrapCommand<ChangePassword,
boolean>(CHANGE_PASSWORD);
diff --git a/foreign/node/src/wire/user/create-user.command.ts
b/foreign/node/src/wire/user/create-user.command.ts
index a38c06ce4..9c501225e 100644
--- a/foreign/node/src/wire/user/create-user.command.ts
+++ b/foreign/node/src/wire/user/create-user.command.ts
@@ -26,13 +26,24 @@ import { uint8ToBuf, uint32ToBuf, boolToBuf } from
'../number.utils.js';
import { serializePermissions, type UserPermissions } from
'./permissions.utils.js';
+/**
+ * Parameters for the create user command.
+ */
export type CreateUser = {
+ /** Username (1-255 bytes) */
username: string,
+ /** Password (1-255 bytes) */
password: string,
+ /** User status (Active or Inactive) */
status: UserStatus
+ /** Optional user permissions */
permissions?: UserPermissions
};
+/**
+ * Create user command definition.
+ * Creates a new user with the specified credentials and permissions.
+ */
export const CREATE_USER = {
code: COMMAND_CODE.CreateUser,
@@ -64,4 +75,7 @@ export const CREATE_USER = {
};
+/**
+ * Executable create user command function.
+ */
export const createUser = wrapCommand<CreateUser, User>(CREATE_USER);
diff --git a/foreign/node/src/wire/user/delete-user.command.ts
b/foreign/node/src/wire/user/delete-user.command.ts
index 20d8466c3..1ac2a9b43 100644
--- a/foreign/node/src/wire/user/delete-user.command.ts
+++ b/foreign/node/src/wire/user/delete-user.command.ts
@@ -23,10 +23,18 @@ import { wrapCommand } from '../command.utils.js';
import { COMMAND_CODE } from '../command.code.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
+/**
+ * Parameters for the delete user command.
+ */
export type DeleteUser = {
+ /** User identifier (ID or username) */
userId: Id
}
+/**
+ * Delete user command definition.
+ * Removes a user from the system.
+ */
export const DELETE_USER = {
code: COMMAND_CODE.DeleteUser,
@@ -38,4 +46,7 @@ export const DELETE_USER = {
};
+/**
+ * Executable delete user command function.
+ */
export const deleteUser = wrapCommand<DeleteUser, boolean>(DELETE_USER);
diff --git a/foreign/node/src/wire/user/get-user.command.ts
b/foreign/node/src/wire/user/get-user.command.ts
index d564e9bf4..d96fc0de2 100644
--- a/foreign/node/src/wire/user/get-user.command.ts
+++ b/foreign/node/src/wire/user/get-user.command.ts
@@ -25,10 +25,18 @@ import { serializeIdentifier, type Id } from
'../identifier.utils.js';
import { deserializeUser, type User } from './user.utils.js';
+/**
+ * Parameters for the get user command.
+ */
export type GetUser = {
+ /** User identifier (ID or username) */
userId: Id
};
+/**
+ * Get user command definition.
+ * Retrieves a single user with permissions.
+ */
export const GET_USER = {
code: COMMAND_CODE.GetUser,
@@ -44,4 +52,7 @@ export const GET_USER = {
};
+/**
+ * Executable get user command function.
+ */
export const getUser = wrapCommand<GetUser, User | null>(GET_USER);
diff --git a/foreign/node/src/wire/user/get-users.command.ts
b/foreign/node/src/wire/user/get-users.command.ts
index 36c3e325b..4da2ff691 100644
--- a/foreign/node/src/wire/user/get-users.command.ts
+++ b/foreign/node/src/wire/user/get-users.command.ts
@@ -24,13 +24,20 @@ import { COMMAND_CODE } from '../command.code.js';
import { deserializeUsers, type BaseUser } from './user.utils.js';
+/**
+ * Get users command definition.
+ * Retrieves all users (without permissions).
+ */
export const GET_USERS = {
code: COMMAND_CODE.GetUsers,
serialize: () => Buffer.alloc(0),
-
+
deserialize: (r: CommandResponse) => deserializeUsers(r.data)
};
+/**
+ * Executable get users command function.
+ */
export const getUsers = wrapCommand<void, BaseUser[]>(GET_USERS);
diff --git a/foreign/node/src/wire/user/permissions.utils.ts
b/foreign/node/src/wire/user/permissions.utils.ts
index 410f8c01f..598b451fc 100644
--- a/foreign/node/src/wire/user/permissions.utils.ts
+++ b/foreign/node/src/wire/user/permissions.utils.ts
@@ -20,64 +20,136 @@
import { uint8ToBuf, uint32ToBuf } from '../number.utils.js';
+/**
+ * Global permissions for server-wide operations.
+ */
export type GlobalPermissions = {
+ /** Can manage server configuration */
ManageServers: boolean,
+ /** Can read server information */
ReadServers: boolean,
+ /** Can manage users */
ManageUsers: boolean,
+ /** Can read user information */
ReadUsers: boolean,
+ /** Can manage streams */
ManageStreams: boolean,
+ /** Can read stream information */
ReadStreams: boolean,
+ /** Can manage topics */
ManageTopics: boolean,
+ /** Can read topic information */
ReadTopics: boolean,
+ /** Can poll messages */
PollMessages: boolean,
+ /** Can send messages */
SendMessages: boolean
};
+/**
+ * Result of deserializing global permissions.
+ */
type GlobalPermissionsDeserialized = {
+ /** Number of bytes consumed */
bytesRead: number,
+ /** Deserialized permissions */
data: GlobalPermissions
};
+/**
+ * Permissions for a specific topic.
+ */
export type TopicPermissions = {
+ /** Can manage the topic */
manage: boolean,
+ /** Can read the topic */
read: boolean,
+ /** Can poll messages from the topic */
pollMessages: boolean,
+ /** Can send messages to the topic */
sendMessages: boolean
};
+/**
+ * Topic permissions with topic identifier.
+ */
export type TopicPerms = {
+ /** Topic ID */
topicId: number,
+ /** Topic permissions */
permissions: TopicPermissions
};
+/** Result of deserializing topic permissions */
type TopicPermissionsDeserialized = { bytesRead: number } & TopicPerms;
+/**
+ * Permissions for a specific stream.
+ */
export type StreamPermissions = {
+ /** Can manage the stream */
manageStream: boolean,
+ /** Can read the stream */
readStream: boolean,
+ /** Can manage topics within the stream */
manageTopics: boolean,
+ /** Can read topics within the stream */
readTopics: boolean,
+ /** Can poll messages from the stream */
pollMessages: boolean,
+ /** Can send messages to the stream */
sendMessages: boolean,
};
+/**
+ * Stream permissions with stream identifier and topic permissions.
+ */
export type StreamPerms = {
+ /** Stream ID */
streamId: number,
+ /** Stream-level permissions */
permissions: StreamPermissions,
+ /** Topic-specific permissions within the stream */
topics: TopicPerms[]
};
+/** Result of deserializing stream permissions */
type StreamPermissionsDeserialized = { bytesRead: number } & StreamPerms;
+/**
+ * Complete user permissions including global and stream-level permissions.
+ */
export type UserPermissions = {
+ /** Global permissions */
global: GlobalPermissions,
+ /** Stream-specific permissions */
streams: StreamPerms[]
};
+/**
+ * Converts a number to a boolean (1 = true).
+ *
+ * @param u - Number to convert
+ * @returns True if u equals 1
+ */
const toBool = (u: number) => u === 1;
+
+/**
+ * Converts a boolean to a byte value.
+ *
+ * @param b - Boolean value
+ * @returns 1 if true, 0 if false
+ */
const boolToByte = (b: boolean) => b ? 1 : 0;
+/**
+ * Deserializes global permissions from a buffer.
+ *
+ * @param p - Buffer containing serialized permissions
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized permissions
+ */
export const deserializeGlobalPermissions =
(p: Buffer, pos = 0): GlobalPermissionsDeserialized => {
return {
@@ -97,6 +169,12 @@ export const deserializeGlobalPermissions =
};
};
+/**
+ * Serializes global permissions to a buffer.
+ *
+ * @param p - Global permissions to serialize
+ * @returns Serialized permissions buffer (10 bytes)
+ */
export const serializeGlobalPermission = (p: GlobalPermissions) => {
return Buffer.concat([
uint8ToBuf(boolToByte(p.ManageServers)),
@@ -112,6 +190,13 @@ export const serializeGlobalPermission = (p:
GlobalPermissions) => {
]);
}
+/**
+ * Deserializes topic permissions from a buffer.
+ *
+ * @param p - Buffer containing serialized permissions
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read, topic ID, and permissions
+ */
export const deserializeTopicPermissions =
(p: Buffer, pos = 0): TopicPermissionsDeserialized => {
const topicId = p.readUInt32LE(pos);
@@ -129,6 +214,12 @@ export const deserializeTopicPermissions =
}
};
+/**
+ * Serializes topic permissions to a buffer.
+ *
+ * @param p - Topic permissions to serialize
+ * @returns Serialized permissions buffer (8 bytes)
+ */
export const serializeTopicPermissions = (p: TopicPerms) => {
return Buffer.concat([
uint32ToBuf(p.topicId),
@@ -139,6 +230,14 @@ export const serializeTopicPermissions = (p: TopicPerms)
=> {
]);
};
+/**
+ * Deserializes stream permissions from a buffer.
+ * Includes nested topic permissions if present.
+ *
+ * @param p - Buffer containing serialized permissions
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read, stream ID, permissions, and topics
+ */
export const deserializeStreamPermissions =
(p: Buffer, pos = 0): StreamPermissionsDeserialized => {
const start = pos;
@@ -177,6 +276,13 @@ export const deserializeStreamPermissions =
}
};
+/**
+ * Serializes stream permissions to a buffer.
+ * Includes nested topic permissions if present.
+ *
+ * @param p - Stream permissions to serialize
+ * @returns Serialized permissions buffer
+ */
export const serializeStreamPermissions = (p: StreamPerms) => {
const bStream = Buffer.concat([
uint32ToBuf(p.streamId),
@@ -200,6 +306,14 @@ export const serializeStreamPermissions = (p: StreamPerms)
=> {
]), bHead);
}
+/**
+ * Deserializes complete user permissions from a buffer.
+ * Includes global permissions and stream-level permissions.
+ *
+ * @param p - Buffer containing serialized permissions
+ * @param pos - Starting position in the buffer
+ * @returns Deserialized user permissions
+ */
export const deserializePermissions = (p: Buffer, pos = 0): UserPermissions =>
{
const { bytesRead, data } = deserializeGlobalPermissions(p, pos);
pos += bytesRead;
@@ -226,6 +340,13 @@ export const deserializePermissions = (p: Buffer, pos =
0): UserPermissions => {
}
};
+/**
+ * Serializes user permissions to a buffer.
+ * Returns a single zero byte if no permissions provided.
+ *
+ * @param p - Optional user permissions to serialize
+ * @returns Serialized permissions buffer
+ */
export const serializePermissions = (p?: UserPermissions) => {
if (!p)
return uint8ToBuf(0);
diff --git a/foreign/node/src/wire/user/update-permissions.command.ts
b/foreign/node/src/wire/user/update-permissions.command.ts
index db5bc70e2..a1e482085 100644
--- a/foreign/node/src/wire/user/update-permissions.command.ts
+++ b/foreign/node/src/wire/user/update-permissions.command.ts
@@ -26,11 +26,20 @@ import { serializeIdentifier, type Id } from
'../identifier.utils.js';
import { serializePermissions, type UserPermissions } from
'./permissions.utils.js';
+/**
+ * Parameters for the update permissions command.
+ */
export type UpdatePermissions = {
+ /** User identifier (ID or username) */
userId: Id,
+ /** New permissions to set */
permissions: UserPermissions
};
+/**
+ * Update permissions command definition.
+ * Updates a user's permissions.
+ */
export const UPDATE_PERMISSIONS = {
code: COMMAND_CODE.UpdatePermissions,
@@ -50,4 +59,7 @@ export const UPDATE_PERMISSIONS = {
};
+/**
+ * Executable update permissions command function.
+ */
export const updatePermissions = wrapCommand<UpdatePermissions,
boolean>(UPDATE_PERMISSIONS);
diff --git a/foreign/node/src/wire/user/update-user.command.ts
b/foreign/node/src/wire/user/update-user.command.ts
index 4e76bc458..3e5ab572e 100644
--- a/foreign/node/src/wire/user/update-user.command.ts
+++ b/foreign/node/src/wire/user/update-user.command.ts
@@ -26,12 +26,22 @@ import { serializeIdentifier, type Id } from
'../identifier.utils.js';
import { uint8ToBuf } from '../number.utils.js';
+/**
+ * Parameters for the update user command.
+ */
export type UpdateUser = {
+ /** User identifier (ID or username) */
userId: Id,
+ /** Optional new username (1-255 bytes) */
username?: string,
+ /** Optional new status */
status?: UserStatus
};
+/**
+ * Update user command definition.
+ * Updates a user's username and/or status.
+ */
export const UPDATE_USER = {
code: COMMAND_CODE.UpdateUser,
@@ -70,4 +80,7 @@ export const UPDATE_USER = {
};
+/**
+ * Executable update user command function.
+ */
export const updateUser = wrapCommand<UpdateUser, boolean>(UPDATE_USER);
diff --git a/foreign/node/src/wire/user/user.utils.ts
b/foreign/node/src/wire/user/user.utils.ts
index 132eb30b5..aa411990a 100644
--- a/foreign/node/src/wire/user/user.utils.ts
+++ b/foreign/node/src/wire/user/user.utils.ts
@@ -21,25 +21,49 @@
import { toDate } from '../serialize.utils.js';
import { deserializePermissions, type UserPermissions } from
'./permissions.utils.js';
+/**
+ * Basic user information without permissions.
+ */
export type BaseUser = {
+ /** User ID */
id: number,
+ /** User creation timestamp */
createdAt: Date,
+ /** User status (Active/Inactive) */
status: string,
+ /** Username */
userName: string
};
+/**
+ * Result of deserializing a base user.
+ */
type BaseUserDeserialized = {
+ /** Number of bytes consumed */
bytesRead: number,
+ /** Deserialized user data */
data: BaseUser
};
+/** User with permissions information */
export type User = BaseUser & { permissions: UserPermissions | null };
+/**
+ * User status enumeration.
+ */
export enum UserStatus {
+ /** Active user */
Active = 1,
+ /** Inactive user */
Inactive = 2,
};
+/**
+ * Converts a numeric status code to a status string.
+ *
+ * @param t - Numeric status code
+ * @returns Status string ('Active', 'Inactive', or unknown)
+ */
const statusString = (t: number): string => {
switch (t.toString()) {
case '1': return 'Active';
@@ -49,6 +73,14 @@ const statusString = (t: number): string => {
}
}
+/**
+ * Deserializes a base user from a buffer.
+ *
+ * @param p - Buffer containing serialized user data
+ * @param pos - Starting position in the buffer
+ * @returns Object with bytes read and deserialized user data
+ * @throws Error if the buffer is empty (user does not exist)
+ */
export const deserializeBaseUser = (p: Buffer, pos = 0): BaseUserDeserialized
=> {
if (p.length === 0)
throw new Error('User does not exist');
@@ -70,6 +102,13 @@ export const deserializeBaseUser = (p: Buffer, pos = 0):
BaseUserDeserialized =>
}
};
+/**
+ * Deserializes a user with permissions from a buffer.
+ *
+ * @param p - Buffer containing serialized user data
+ * @param pos - Starting position in the buffer
+ * @returns Deserialized user with permissions
+ */
export const deserializeUser = (p: Buffer, pos = 0): User => {
const { bytesRead, data } = deserializeBaseUser(p, pos);
pos += bytesRead;
@@ -87,6 +126,13 @@ export const deserializeUser = (p: Buffer, pos = 0): User
=> {
};
+/**
+ * Deserializes multiple base users from a buffer.
+ *
+ * @param p - Buffer containing serialized users data
+ * @param pos - Starting position in the buffer
+ * @returns Array of deserialized base users
+ */
export const deserializeUsers = (p: Buffer, pos = 0): BaseUser[] => {
const users = [];
const end = p.length;
diff --git a/foreign/node/src/wire/uuid.utils.ts
b/foreign/node/src/wire/uuid.utils.ts
index 9c5f890be..530abcdee 100644
--- a/foreign/node/src/wire/uuid.utils.ts
+++ b/foreign/node/src/wire/uuid.utils.ts
@@ -20,8 +20,20 @@
import { uuidv7, UUID } from "uuidv7";
+/**
+ * Parses a UUID string into a UUID object.
+ *
+ * @param uid - UUID string to parse
+ * @returns Parsed UUID object
+ */
export const parse = (uid: string) => UUID.parse(uid);
-// https://github.com/LiosK/uuidv7
+/**
+ * Generates a new UUID v7 (time-ordered) string.
+ * UUID v7 is a time-ordered UUID that provides better database indexing
performance.
+ *
+ * @returns UUID v7 string
+ * @see https://github.com/LiosK/uuidv7
+ */
export const v7 = () => uuidv7();