This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a418e823 feat: split transport from client, add heartbeat and
reconnection support (#1809)
a418e823 is described below
commit a418e823669ed0860f63ef1394ab292df524cb91
Author: T1B0 <[email protected]>
AuthorDate: Thu May 29 22:07:33 2025 +0200
feat: split transport from client, add heartbeat and reconnection support
(#1809)
- split transport code from client
- add simple reconnection support (interval + maxRetries)
- add heartbeat support
- better error handling
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
foreign/node/src/client/client.socket.ts | 208 +++++++++-----
foreign/node/src/client/client.transport.ts | 154 ++++++++++
foreign/node/src/client/client.ts | 110 ++++----
foreign/node/src/client/client.type.ts | 27 +-
foreign/node/src/client/index.ts | 2 -
foreign/node/src/debug-send.ts | 102 ++++---
foreign/node/src/debug.ts | 20 +-
foreign/node/src/e2e/tcp.consumer-group.e2e.ts | 2 +-
foreign/node/src/stream/consumer-stream.ts | 37 ++-
foreign/node/src/tcp.e2e.ts | 11 +-
foreign/node/src/tls.system.e2e.ts | 11 +-
foreign/node/src/wire/error.code.ts | 310 +++++++++++++--------
.../node/src/wire/stream/create-stream.command.ts | 2 +-
13 files changed, 659 insertions(+), 337 deletions(-)
diff --git a/foreign/node/src/client/client.socket.ts
b/foreign/node/src/client/client.socket.ts
index 1476e1d5..24af592c 100644
--- a/foreign/node/src/client/client.socket.ts
+++ b/foreign/node/src/client/client.socket.ts
@@ -18,33 +18,27 @@
*/
-import type { Socket } from 'node:net';
import { Duplex } from 'node:stream';
+
import type {
- ClientCredentials, CommandResponse, PasswordCredentials, TokenCredentials
+ ClientConfig,
+ ClientCredentials, CommandResponse,
+ PasswordCredentials, RawClient, TokenCredentials
} from './client.type.js';
+
import { handleResponse, serializeCommand } from './client.utils.js';
import { responseError } from '../wire/error.utils.js';
import { LOGIN } from '../wire/session/login.command.js';
import { LOGIN_WITH_TOKEN } from '../wire/session/login-with-token.command.js';
import { debug } from './client.debug.js';
+import { IggyTransport } from './client.transport.js';
+import { PING } from '../wire/index.js';
-export const wrapSocket = (socket: Socket) =>
- new Promise<CommandResponseStream>((resolve, reject) => {
- const responseStream = new CommandResponseStream(socket);
-
- socket.on('error', (err: unknown) => {
- console.error('RESPONSESTREAM ERROR', err)
- reject(err);
- });
- socket.once('connect', () => {
- debug('responseStream.connect event');
- resolve(responseStream);
- });
- socket.on('close', () => { debug('socket#close'); reject(); });
- socket.on('end', () => { console.error('socket#end'); reject(); });
- });
-
+const UNLOGGED_COMMAND_CODE = [
+ PING.code,
+ LOGIN.code,
+ LOGIN_WITH_TOKEN.code
+];
type WriteCb = ((error: Error | null | undefined) => void) | undefined
@@ -56,26 +50,29 @@ type Job = {
};
-export class CommandResponseStream extends Duplex {
- private _socket: Socket;
+export class CommandResponseStream extends Duplex {
+ private options: ClientConfig;
+ private transport: IggyTransport;
private _readPaused: boolean;
private _execQueue: Job[];
public busy: boolean;
isAuthenticated: boolean;
userId?: number;
- constructor(socket: Socket) {
+ constructor(options: ClientConfig) {
super();
- this._socket = this._wrapSocket(socket);
+ this.options = options;
+ this.transport = new IggyTransport(options);
this._readPaused = false;
this.busy = false;
- this._execQueue = [];
this.isAuthenticated = false;
+ this._execQueue = [];
+ this.wrapTransport();
+ this.heartbeat(options.heartbeatInterval);
};
-
- // Probably triggered by Duplex class
+
_destroy() {
- this._socket.destroy();
+ this.transport.socket.destroy();
}
_read(size: number): void {
@@ -85,49 +82,31 @@ export class CommandResponseStream extends Duplex {
}
_write(chunk: Buffer, encoding: BufferEncoding | undefined, cb?: WriteCb) {
- return this._socket.write(chunk, encoding, cb);
+ return this.transport.socket.write(chunk, encoding, cb);
};
writeCommand(command: number, payload: Buffer): boolean {
const cmd = serializeCommand(command, payload);
- return this._socket.write(cmd);
+ return this.transport.socket.write(cmd);
}
sendCommand(
- command: number, payload: Buffer, handleResponse = true
+ command: number, payload: Buffer, handleResponse = true, last = true
): Promise<CommandResponse> {
return new Promise((resolve, reject) => {
- this._execQueue.push({ command, payload, resolve, reject });
+ if(last)
+ this._execQueue.push({ command, payload, resolve, reject });
+ else
+ this._execQueue.unshift({ command, payload, resolve, reject });
this._processQueue(handleResponse);
});
}
- async authenticate(creds: ClientCredentials) {
- const r = ('token' in creds) ?
- await this._authWithToken(creds) :
- await this._authWithPassword(creds);
- this.isAuthenticated = true;
- this.userId = r.userId;
- return this.isAuthenticated;
- }
-
- async _authWithPassword(creds: PasswordCredentials) {
- const pl = LOGIN.serialize(creds);
- const logr = await this.sendCommand(LOGIN.code, pl);
- return LOGIN.deserialize(logr);
- }
-
- async _authWithToken(creds: TokenCredentials) {
- const pl = LOGIN_WITH_TOKEN.serialize(creds);
- const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl);
- return LOGIN_WITH_TOKEN.deserialize(logr);
- }
-
async _processQueue(handleResponse = true): Promise<void> {
if (this.busy)
return;
this.busy = true;
- while (this._execQueue.length > 0) {
+ while (this._execQueue.length > 0 && this.transport.socket.writable) {
const next = this._execQueue.shift();
if (!next) break;
const { command, payload, resolve, reject } = next;
@@ -146,8 +125,11 @@ export class CommandResponseStream extends Duplex {
payload: Buffer,
handleResp = true
): Promise<CommandResponse> {
- debug('==> writeCommand', this.writeCommand(command, payload));
+ const lastWrite = this.writeCommand(command, payload);
+ debug('==> writeCommand', lastWrite);
return new Promise((resolve, reject) => {
+ if(!lastWrite)
+ return reject(new Error('failed to write to socket'));
const errCb = (err: unknown) => reject(err);
this.once('error', errCb);
this.once('data', (resp) => {
@@ -162,35 +144,74 @@ export class CommandResponseStream extends Duplex {
});
}
+ _failQueue(err: Error) {
+ this._execQueue.forEach(({ reject }) => reject(err));
+ this._execQueue = [];
+ }
+
getReadStream() {
- return this;//.pipe(new PassThrough());
+ return this;
}
+
+ wrapTransport() {
+ this.transport.on('connect', () => {
+ debug('socket#connect event');
+ this.emit('connect');
+ if(this._execQueue.length > 0) {
+ if(!this.isAuthenticated) {
+ const needAuth = this._execQueue.some(
+ q => !UNLOGGED_COMMAND_CODE.includes(q.command)
+ );
+ if(needAuth)
+ this.authenticate(this.options.credentials);
+ }
+ this._processQueue();
+ }
+ });
- _wrapSocket(socket: Socket) {
- // pass through
- socket.on('close', hadError => this.emit('close', hadError));
- socket.on('connect', () => this.emit('connect'));
- socket.on('drain', () => this.emit('drain'));
- socket.on('end', () => this.emit('end'));
- socket.on('error', err => this.emit('error', err));
- socket.on(
+ this.transport.on('close', async (hadError?: boolean) => {
+ console.log(
+ 'socket#close',
+ { hadError, connected: this.transport.connected },
+ );
+ this.emit('close');
+ });
+
+ this.transport.on('end', async () => {
+ console.error('socket#end');
+ this.emit('end');
+ });
+
+ this.transport.on('error', (err) => {
+ console.error('socket#err', err);
+ this._failQueue(err);
+ this.emit('error', err);
+ });
+
+ this.transport.on('disconnected', async () => {
+ this.isAuthenticated = false;
+ });
+
+ this.transport.on('drain', () => this.emit('drain'));
+
+ this.transport.on(
'lookup',
(err, address, family, host) => this.emit('lookup', err, address,
family, host)
);
- socket.on('ready', () => this.emit('ready'));
- socket.on('timeout', () => this.emit('timeout'));
+ this.transport.on('ready', () => this.emit('ready'));
+ this.transport.on('timeout', () => this.emit('timeout'));
// customize data events
- socket.on('readable', () => this._onReadable());
- return socket;
+ this.transport.on('readable', () => this._onReadable());
+ return this.transport;
}
_onReadable() {
while (!this._readPaused) {
- const head = this._socket.read(8);
+ const head = this.transport.socket.read(8);
if (!head || head.length === 0) return;
if (head.length < 8) {
- this._socket.unshift(head);
+ this.transport.socket.unshift(head);
return;
}
/** first chunk[4:8] hold response length */
@@ -201,15 +222,15 @@ export class CommandResponseStream extends Duplex {
return;
}
- const payload = this._socket.read(responseSize);
+ const payload = this.transport.socket.read(responseSize);
debug('payload', payload, responseSize, head.readUInt32LE(0));
if (!payload) {
- this._socket.unshift(head);
+ this.transport.socket.unshift(head);
return;
}
/** payload is incomplete, unshift until next read */
if (payload.length < responseSize) {
- this._socket.unshift(Buffer.concat([head, payload]));
+ this.transport.socket.unshift(Buffer.concat([head, payload]));
return;
}
@@ -219,4 +240,49 @@ export class CommandResponseStream extends Duplex {
this._readPaused = true;
}
}
+
+ async authenticate(creds: ClientCredentials) {
+ const r = ('token' in creds) ?
+ await this._authWithToken(creds) :
+ await this._authWithPassword(creds);
+ this.isAuthenticated = true;
+ this.userId = r.userId;
+ return this.isAuthenticated;
+ }
+
+ async _authWithPassword(creds: PasswordCredentials) {
+ const pl = LOGIN.serialize(creds);
+ const logr = await this.sendCommand(LOGIN.code, pl, true, false);
+ return LOGIN.deserialize(logr);
+ }
+
+ async _authWithToken(creds: TokenCredentials) {
+ const pl = LOGIN_WITH_TOKEN.serialize(creds);
+ const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl, true,
false);
+ return LOGIN_WITH_TOKEN.deserialize(logr);
+ }
+
+ async ping() {
+ const pl = PING.serialize();
+ const pingR = await this.sendCommand(PING.code, pl, true);
+ return PING.deserialize(pingR);
+ }
+
+ heartbeat(interval?: number) {
+ if(!interval)
+ return
+
+ setInterval(async () => {
+ if(this.transport.connected) {
+ debug(`sending hearbeat ping (interval: ${interval} ms)`);
+ await this.ping()
+ }
+ }, interval);
+ }
+
+
};
+
+export function getRawClient(options: ClientConfig): RawClient {
+ return new CommandResponseStream(options);
+}
diff --git a/foreign/node/src/client/client.transport.ts
b/foreign/node/src/client/client.transport.ts
new file mode 100644
index 00000000..4614a12f
--- /dev/null
+++ b/foreign/node/src/client/client.transport.ts
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import { EventEmitter } from 'node:events';
+import { createConnection, type Socket } from 'node:net';
+import { connect as TLSConnect } from 'node:tls';
+import { debug } from './client.debug.js';
+import type { ClientConfig, TlsOption, TcpOption, ReconnectOption } from
'./client.type.js';
+
+
+const createTcpSocket = (options: TcpOption): Socket => {
+ return createConnection(options);
+};
+
+const createTlsSocket = ({ port, ...options }: TlsOption): Socket => {
+ const socket = TLSConnect(port, options);
+ socket.setEncoding('utf8');
+ return socket;
+};
+
+const getTransport = (config: ClientConfig): Socket => {
+ const { transport, options } = config;
+ switch (transport) {
+ case 'TLS': return createTlsSocket(options);
+ case 'TCP':
+ default:
+ return createTcpSocket(options);
+ }
+};
+
+const DefaultReconnectOption: ReconnectOption = {
+ enabled: true,
+ interval: 5 * 1000,
+ maxRetries: 12
+}
+
+function recreate(option: ClientConfig, timer=1000): Promise<Socket> {
+ return new Promise((resolve) => {
+ setTimeout(() => {
+ resolve(getTransport(option));
+ }, timer);
+ });
+}
+
+export class IggyTransport extends EventEmitter {
+ public socket: Socket;
+ public connected: boolean;
+ public connecting: boolean;
+ private config: ClientConfig;
+ private reconnectOption: ReconnectOption;
+ private reconnectCount: number;
+ private lastError: unknown[];
+
+ constructor(config: ClientConfig) {
+ super()
+ this.config = config;
+ this.reconnectOption = { ...DefaultReconnectOption, ...config.reconnect };
+ this.reconnectCount = 0;
+ this.connected = false;
+ this.connecting = true;
+ this.socket = getTransport(config);
+ this.wrapSocket();
+ this.lastError = [];
+ }
+
+ wrapSocket() {
+ this.socket.on('connect', () => {
+ debug('socket#connect event');
+ this.connected = true;
+ this.connecting = false;
+ // const lastReconnectCount = this.reconnectCount;
+ this.reconnectCount = 0;
+ this.lastError = [];
+ this.emit('connect');
+ // if(lastReconnectCount > 0)
+ // this.emit('connect');
+ });
+
+ this.socket.once('close', async (hadError?: boolean) => {
+ this.connected = false;
+ this.emit('disconnected', hadError);
+ const { enabled, interval, maxRetries } = this.reconnectOption
+ debug(
+ 'socket#close event',
+ {
+ hadError,
+ reconnect: { enabled, interval, maxRetries },
+ count:this.reconnectCount
+ }
+ );
+ if(!enabled)
+ return this.emit('close', hadError);
+
+ if(this.reconnectCount > maxRetries) {
+ debug(`reconnect reached maxRetries of ${ maxRetries }`,
this.lastError);
+ this.emit('close', hadError);
+ this.emit(
+ 'error',
+ new Error(
+ `reconnect maxRetries exceeded (count: ${ this.reconnectCount })`,
+ { cause: this.lastError.pop() }
+ ));
+ return;
+ }
+
+ this.connecting = true;
+ this.reconnectCount += 1;
+ this.socket = await recreate(this.config, interval);
+ this.wrapSocket();
+ // this.emit('close');
+ });
+
+ this.socket.on('end', async (err: unknown) => {
+ console.error('Transport/socket#end', err);
+ this.emit('end');
+ });
+
+ this.socket.on('drain', () => this.emit('drain'));
+
+ this.socket.on('error', (err) => {
+ console.error('Transport/socket#error !1/2-noemit', err);
+ this.lastError.push(err);
+ if(!this.connecting && !this.connected)
+ this.emit('error', err);
+ });
+
+ this.socket.on(
+ 'lookup',
+ (err, address, family, host) => this.emit('lookup', err, address,
family, host)
+ );
+ this.socket.on('ready', () => this.emit('ready'));
+ this.socket.on('timeout', () => this.emit('timeout'));
+ // customize data events
+ this.socket.on('readable', () => this.emit('readable'));
+ };
+
+};
diff --git a/foreign/node/src/client/client.ts
b/foreign/node/src/client/client.ts
index 08489a42..e7ac472e 100644
--- a/foreign/node/src/client/client.ts
+++ b/foreign/node/src/client/client.ts
@@ -18,84 +18,82 @@
*/
-import type { RawClient, ClientConfig } from "./client.type.js"
import { createPool, type Pool } from 'generic-pool';
+import type { RawClient, ClientConfig } from "./client.type.js"
+import { getRawClient } from './client.socket.js';
import { CommandAPI } from '../wire/command-set.js';
-import { TcpClient } from './tcp.client.js';
-import { TlsClient } from './tls.client.js';
import { debug } from './client.debug.js';
-export const rawClientGetter = (config: ClientConfig): Promise<RawClient> => {
- const { transport, options } = config;
- switch (transport) {
- case 'TLS': return TlsClient(options);
- case 'TCP':
- default:
- return TcpClient(options);
- }
-}
-
// create & destroy must be async
const createPoolFactory = (config: ClientConfig) => ({
- create: function () {
- return rawClientGetter(config);
+ create: async function () {
+ return getRawClient(config);
},
destroy: async function (client: RawClient) {
return client.destroy();
}
});
+const poolClientProvider = (config: ClientConfig) => {
+ const min = config.poolSize?.min || 1;
+ const max = config.poolSize?.max || 4;
+ const pool = createPool(createPoolFactory(config), { min, max });
+ const poolClientProvider = async () => {
+ const c = await pool.acquire();
+ if (!c.isAuthenticated)
+ await c.authenticate(config.credentials);
+ debug('client acquired from pool. pool size is', pool.size);
+ c.once('finishQueue', () => {
+ pool.release(c)
+ debug('client released to pool. pool size is', pool.size);
+ });
+ return c;
+ }
+ poolClientProvider._pool = pool;
+ return poolClientProvider;
+};
+
+
export class Client extends CommandAPI {
_config: ClientConfig
_pool: Pool<RawClient>
- destroy: () => void
constructor(config: ClientConfig) {
- const min = config.poolSize?.min || 1;
- const max = config.poolSize?.max || 4;
- const pool = createPool(createPoolFactory(config), { min, max });
- const getFromPool = async () => {
- const c = await pool.acquire();
- if (!c.isAuthenticated)
- await c.authenticate(config.credentials);
- debug('client acquired from pool. pool size is', pool.size);
- c.once('finishQueue', () => {
- pool.release(c)
- debug('client released to pool. pool size is', pool.size);
- });
- return c;
- };
- super(getFromPool);
+ const pcp = poolClientProvider(config);
+ super(pcp);
this._config = config;
- this._pool = pool;
- this.destroy = async () => {
- debug('destroying client pool. pool size is', pool.size);
- await this._pool.drain();
- await this._pool.clear();
- debug('destroyed client pool. pool size is', pool.size);
- }
+ this._pool = pcp._pool;
};
+
+ async destroy() {
+ debug('destroying client pool. pool size is', this._pool.size);
+ await this._pool.drain();
+ await this._pool.clear();
+ debug('destroyed client pool. pool size is', this._pool.size);
+ }
+}
+
+const singleClientProvider = (config: ClientConfig) => {
+ const c = getRawClient(config);
+ return async function singleClientProvider () {
+ if (!c.isAuthenticated)
+ await c.authenticate(config.credentials);
+ return c;
+ }
}
export class SingleClient extends CommandAPI {
_config: ClientConfig
- destroy: () => void
-
+
constructor(config: ClientConfig) {
- const cliP = rawClientGetter(config);
- const init = async () => {
- const c = await cliP;
- if (!c.isAuthenticated)
- await c.authenticate(config.credentials);
- return c;
- };
- super(init);
+ super(singleClientProvider(config));
this._config = config;
- this.destroy = async () => {
- const s = await this.clientProvider();
- s.destroy();
- };
+ }
+
+ async destroy() {
+ const s = await this.clientProvider();
+ s.destroy();
}
};
@@ -104,10 +102,16 @@ export class SimpleClient extends CommandAPI {
constructor(client: RawClient) {
super(() => Promise.resolve(client));
}
+
+ async destroy() {
+ const s = await this.clientProvider();
+ s.destroy();
+ }
+
};
export const getClient = async (config: ClientConfig) => {
- const cli = await rawClientGetter(config);
+ const cli = getRawClient(config);
if (!cli.isAuthenticated)
await cli.authenticate(config.credentials);
const api = new SimpleClient(cli);
diff --git a/foreign/node/src/client/client.type.ts
b/foreign/node/src/client/client.type.ts
index a162c247..6a39e112 100644
--- a/foreign/node/src/client/client.type.ts
+++ b/foreign/node/src/client/client.type.ts
@@ -19,8 +19,11 @@
import type { Readable } from 'stream';
-import type { TcpOption } from './tcp.client.js';
-import type { TlsOption } from './tls.client.js';
+import { type TcpSocketConnectOpts } from 'node:net';
+import { type ConnectionOptions } from 'node:tls';
+
+export type TcpOption = TcpSocketConnectOpts;
+export type TlsOption = { port: number } & ConnectionOptions;
export type CommandResponse = {
status: number,
@@ -29,12 +32,14 @@ export type CommandResponse = {
};
export type RawClient = {
- sendCommand: (code: number, payload: Buffer, handleResponse?: boolean) =>
Promise<CommandResponse>,
+ sendCommand: (
+ code: number, payload: Buffer, handleResponse?: boolean
+ ) => Promise<CommandResponse>,
destroy: () => void,
isAuthenticated: boolean
authenticate: (c: ClientCredentials) => Promise<boolean>
- on: (ev: string, cb: () => void) => void
- once: (ev: string, cb: () => void) => void
+ on: (ev: string, cb: (e?: unknown) => void) => void
+ once: (ev: string, cb: (e?: unknown) => void) => void
getReadStream: () => Readable
}
@@ -43,7 +48,13 @@ export type ClientProvider = () => Promise<RawClient>;
export const Transports = ['TCP', 'TLS' /**, 'QUIC' */] as const;
export type TransportType = typeof Transports[number];
-type TransportOption = TcpOption | TlsOption;
+export type ReconnectOption = {
+ enabled: boolean,
+ interval: number,
+ maxRetries: number
+}
+
+export type TransportOption = TcpOption | TlsOption;
export type TokenCredentials = {
token: string
@@ -65,5 +76,7 @@ export type ClientConfig = {
transport: TransportType,
options: TransportOption,
credentials: ClientCredentials,
- poolSize?: PoolSizeOption
+ poolSize?: PoolSizeOption,
+ reconnect?: ReconnectOption,
+ heartbeatInterval?: number
}
diff --git a/foreign/node/src/client/index.ts b/foreign/node/src/client/index.ts
index ab8372d6..de2290c1 100644
--- a/foreign/node/src/client/index.ts
+++ b/foreign/node/src/client/index.ts
@@ -19,8 +19,6 @@
export { Client, SimpleClient, SingleClient } from './client.js'
-export { TcpClient, type TcpOption } from './tcp.client.js';
-export { TlsClient, type TlsOption } from './tls.client.js';
export * from './client.utils.js';
export * from './client.socket.js';
export * from './client.type.js';
diff --git a/foreign/node/src/debug-send.ts b/foreign/node/src/debug-send.ts
index 3ca5f664..7cd51ff5 100644
--- a/foreign/node/src/debug-send.ts
+++ b/foreign/node/src/debug-send.ts
@@ -18,12 +18,10 @@
*/
-
import assert from 'node:assert/strict';
-import { Client, SingleClient } from './client/index.js';
-import { uuidv7, uuidv4 } from 'uuidv7'
+import { ClientConfig, SingleClient } from './client/index.js';
import { groupConsumerStream } from './stream/consumer-stream.js';
-import { PollingStrategy, Partitioning, type PollMessagesResponse } from
'./wire/index.js';
+import { PollingStrategy, type PollMessagesResponse } from './wire/index.js';
const streamId = 321;
const topicId = 543;
@@ -42,7 +40,7 @@ const topic = {
compressionAlgorithm: 1
};
-const opt = {
+const opt: ClientConfig = {
transport: 'TCP' as const,
options: {
port: 8090,
@@ -51,7 +49,12 @@ const opt = {
// keepAlive: true
},
credentials: { username: 'iggy', password: 'iggy' },
-};2
+ reconnect: {
+ enabled: true,
+ interval: 10 * 1000,
+ maxRetries: 10
+ }
+};
const c = new SingleClient(opt);
@@ -77,63 +80,58 @@ try {
console.log('server stream CREATED::', { streamId });
await c.topic.create(topic);
console.log('server topic CREATED::', { topicId });
- // send
- // setInterval(async () => {
+ // send
+ setInterval(async () => {
assert.ok(await c.message.send(msg));
- // console.log('message SEND::', { msg })
- // }, 3000);
-
-
- // // POLL MESSAGE
- // const pollReq = {
- // groupId,
- // streamId,
- // topicId,
- // pollingStrategy: PollingStrategy.Next,
- // count: 1,
- // interval: 5000
- // };
-
- // const cs = await groupConsumerStream(opt)(pollReq);
- // const dataCb = (d: PollMessagesResponse) => {
- // console.log('cli/DATA POLLED:', d);
- // // recv += d.count;
- // // if (recv === ct)
- // // str.destroy();
- // };
-
- // cs.on('data', dataCb);
-
- // cs.on('error', (err) => {
- // console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
- // // cs.destroy(err);
- // });
-
- // cs.on('end', async () => {
- // console.log('cli/=>>Stream END::')
- // cs.destroy();
- // await cleanup();
- // });
-
- // cs.on('close', async () => {
- // console.log('cli/=>>Stream CLOSE::')
- // // await cleanup();
- // });
+ console.log('message SEND::', { msg })
+ }, 3000);
+
+
+ // POLL MESSAGE
+ const pollReq = {
+ groupId,
+ streamId,
+ topicId,
+ pollingStrategy: PollingStrategy.Next,
+ count: 1,
+ interval: 5000
+ };
+
+ const cs = await groupConsumerStream(opt)(pollReq);
+ const dataCb = (d: PollMessagesResponse) => {
+ console.log('cli/DATA POLLED:', d);
+ // recv += d.count;
+ // if (recv === ct)
+ // str.destroy();
+ };
+
+ cs.on('data', dataCb);
+
+ cs.on('error', (err) => {
+ console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
+ cs.destroy(err);
+ });
+
+ cs.on('end', async () => {
+ console.log('cli/=>>Stream END::')
+ cs.destroy();
+ await cleanup();
+ });
+
+ cs.on('close', async () => {
+ console.log('cli/=>>Stream CLOSE::')
+ // await cleanup();
+ });
} catch (err) {
console.error('client catchALL END', err)
await cleanup();
-} finally {
- console.error('finally::cleanup')
- await cleanup();
}
-
// process.on('SIGINT', async () => {
// await cleanup();
// console.log('cleanup OK, exiting...');
// process.exit()
// });
-
diff --git a/foreign/node/src/debug.ts b/foreign/node/src/debug.ts
index bf1cd329..afabcf27 100644
--- a/foreign/node/src/debug.ts
+++ b/foreign/node/src/debug.ts
@@ -17,13 +17,11 @@
* under the License.
*/
-
-
import assert from 'node:assert/strict';
import { Client } from './client/index.js';
import { uuidv7, uuidv4 } from 'uuidv7'
import { groupConsumerStream } from './stream/consumer-stream.js';
-import { PollingStrategy, Partitioning, type PollMessagesResponse } from
'./wire/index.js';
+import { PollingStrategy, type PollMessagesResponse } from './wire/index.js';
const streamId = 123;
const topicId = 345;
@@ -46,7 +44,13 @@ const opt = {
transport: 'TCP' as const,
options: { port: 8090, host: '127.0.0.1', allowHalfOpen: true, keepAlive:
true },
credentials: { username: 'iggy', password: 'iggy' },
-};2
+ reconnect: {
+ enabled: true,
+ interval: 10 * 1000,
+ maxRetries: 10
+ },
+ heartbeatInterval: 5 * 1000
+};
const c = new Client(opt);
@@ -74,6 +78,7 @@ try {
console.log('server stream CREATED::', { streamId });
await c.topic.create(topic);
console.log('server topic CREATED::', { topicId });
+
// send
assert.ok(await c.message.send(msg));
console.log('message SEND::', { msg })
@@ -84,7 +89,7 @@ try {
streamId,
topicId,
pollingStrategy: PollingStrategy.Next,
- count: 1,
+ count: 5,
interval: 5000
};
@@ -93,16 +98,13 @@ try {
const cs = await groupConsumerStream(opt)(pollReq);
const dataCb = (d: PollMessagesResponse) => {
console.log('cli/DATA POLLED:', d);
- // recv += d.count;
- // if (recv === ct)
- // str.destroy();
};
cs.on('data', dataCb);
cs.on('error', (err) => {
console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
- // cs.destroy(err);
+ cs.destroy(err);
});
cs.on('end', async () => {
diff --git a/foreign/node/src/e2e/tcp.consumer-group.e2e.ts
b/foreign/node/src/e2e/tcp.consumer-group.e2e.ts
index c678cdca..1edf93fc 100644
--- a/foreign/node/src/e2e/tcp.consumer-group.e2e.ts
+++ b/foreign/node/src/e2e/tcp.consumer-group.e2e.ts
@@ -107,7 +107,7 @@ describe('e2e -> consumer-group', async () => {
};
let ct = 0;
while (ct < payloadLength) {
- const { messages, ...resp } = await c.message.poll(pollReq);
+ const { messages /** , ...resp*/ } = await c.message.poll(pollReq);
// console.log('POLL', messages.length, 'R/C', resp.count, messages,
resp, ct);
// assert.equal(messages.length, resp.count);
ct += messages.length;
diff --git a/foreign/node/src/stream/consumer-stream.ts
b/foreign/node/src/stream/consumer-stream.ts
index c197db0b..3e07afdc 100644
--- a/foreign/node/src/stream/consumer-stream.ts
+++ b/foreign/node/src/stream/consumer-stream.ts
@@ -20,10 +20,12 @@
import { Readable, pipeline, PassThrough } from "node:stream";
import type { ClientConfig, RawClient } from "../client/client.type.js";
+import { getRawClient } from '../client/client.socket.js';
import type { Id } from '../wire/identifier.utils.js';
-import { SimpleClient, rawClientGetter } from "../client/client.js";
+import { getClient } from "../client/client.js";
import { type PollMessages, POLL_MESSAGES } from
"../wire/message/poll-messages.command.js";
import { type PollingStrategy, ConsumerKind, type CommandAPI } from
"../wire/index.js";
+import { debug } from "../client/client.debug.js";
const wait = (interval: number, cb?: () => void): Promise<void> =>
@@ -41,12 +43,8 @@ async function* genAutoCommitedPoll(
while (true) {
const r = await c.message.poll(poll);
yield r;
-
- const k = `${r.partitionId}`;
- let part = state.get(k) || 0;
- part = r.count;
- state.set(k, part);
-
+ state.set(`${r.partitionId}`, r.count);
+
if (Array.from(state).every(([, last]) => last === 0)) {
await wait(interval);
}
@@ -62,12 +60,11 @@ async function* genPoll(
};
-
export const singleConsumerStream = (config: ClientConfig) => async (
poll: PollMessages,
// interval: 1000
): Promise<Readable> => {
- const c = await rawClientGetter(config);
+ const c = getRawClient(config);
if (!c.isAuthenticated)
await c.authenticate(config.credentials);
const ps = Readable.from(genPoll(c, poll), { objectMode: true });
@@ -100,16 +97,15 @@ export const groupConsumerStream = (config: ClientConfig)
=>
interval = 1000,
autocommit = true
}: ConsumerGroupStreamRequest): Promise<Readable> {
- const c = await rawClientGetter(config);
- const s = new SimpleClient(c);
- if (!c.isAuthenticated)
- await c.authenticate(config.credentials);
-
+ const s = await getClient(config);
+
try {
await s.group.get({ streamId, topicId, groupId })
// eslint-disable-next-line @typescript-eslint/no-unused-vars
} catch (err) {
- await s.group.create({ streamId, topicId, groupId, name:
`auto-${groupId}` })
+ const ng = { streamId, topicId, groupId, name: `auto-${groupId}` };
+ debug('group does not exist, creating it', ng);
+ await s.group.create(ng)
}
await s.group.join({ streamId, topicId, groupId });
@@ -124,6 +120,17 @@ export const groupConsumerStream = (config: ClientConfig)
=>
autocommit
}
const ps = Readable.from(genAutoCommitedPoll(s, poll, interval), {
objectMode: true });
+ // return (await s.clientProvider()).getReadStream();
+ // c.on('error', (err: unknown) => {
+ // debug('groupConsumerStream::client error', err);
+ // // ps.emit('error', err);
+ // });
+
+ // c.on('end', () => {
+ // debug('groupConsumerStream::END');
+ // // ps.emit('end');
+ // });
+
return ps;
};
diff --git a/foreign/node/src/tcp.e2e.ts b/foreign/node/src/tcp.e2e.ts
index 3d18bc47..c105015a 100644
--- a/foreign/node/src/tcp.e2e.ts
+++ b/foreign/node/src/tcp.e2e.ts
@@ -18,7 +18,7 @@
*/
-import { TcpClient } from './client/index.js';
+import { getRawClient } from './client/index.js';
import {
login, logout,
@@ -32,7 +32,14 @@ import {
try {
// create socket
- const cli = TcpClient({ host: '127.0.0.1', port: 8090 });
+ const cli = getRawClient({
+ transport: 'TCP' as const,
+ options: {
+ host: '127.0.0.1',
+ port: 8090
+ },
+ credentials: { username: 'iggy', password: 'iggy' }
+ });
const s = () => Promise.resolve(cli);
// LOGIN
diff --git a/foreign/node/src/tls.system.e2e.ts
b/foreign/node/src/tls.system.e2e.ts
index 73d52a73..1e2f2187 100644
--- a/foreign/node/src/tls.system.e2e.ts
+++ b/foreign/node/src/tls.system.e2e.ts
@@ -18,12 +18,19 @@
*/
-import { TlsClient } from './client/index.js';
+import { getRawClient } from './client/index.js';
import { login, logout, getStats, ping } from './wire/index.js';
try {
// create socket
- const cli = TlsClient({ host: '127.0.0.1', port: 8090 });
+ const cli = getRawClient({
+ transport: 'TCP' as const,
+ options: {
+ host: '127.0.0.1',
+ port: 8090
+ },
+ credentials: { username: 'iggy', password: 'iggy' }
+ });
const s = () => Promise.resolve(cli);
// PING
diff --git a/foreign/node/src/wire/error.code.ts
b/foreign/node/src/wire/error.code.ts
index d7464c78..a65caf87 100644
--- a/foreign/node/src/wire/error.code.ts
+++ b/foreign/node/src/wire/error.code.ts
@@ -21,127 +21,193 @@
export const translateErrorCode = (code: number): string => {
switch (code.toString()) {
- case '1': return 'error';
- case '2': return 'invalid_configuration';
- case '3': return 'invalid_command';
- case '4': return 'invalid_format';
- case '5': return 'feature_unavailable';
- case '10': return 'cannot_create_base_directory';
- case '20': return 'resource_not_found';
- case '21': return 'cannot_load_resource';
- case '22': return 'cannot_save_resource';
- case '23': return 'cannot_delete_resource';
- case '24': return 'cannot_serialize_resource';
- case '25': return 'cannot_deserialize_resource';
- case '40': return 'unauthenticated';
- case '41': return 'unauthorized';
- case '42': return 'invalid_credentials';
- case '43': return 'invalid_username';
- case '44': return 'invalid_password';
- case '51': return 'not_connected';
- case '52': return 'request_error';
- case '60': return 'invalid_encryption_key';
- case '61': return 'cannot_encrypt_data';
- case '62': return 'cannot_decrypt_data';
- case '100': return 'client_not_found';
- case '101': return 'invalid_client_id';
- case '200': return 'io_error';
- case '201': return 'write_error';
- case '202': return 'cannot_parse_utf8';
- case '203': return 'cannot_parse_int';
- case '204': return 'cannot_parse_slice';
- case '300': return 'http_response_error';
- case '301': return 'request_middleware_error';
- case '302': return 'cannot_create_endpoint';
- case '303': return 'cannot_parse_url';
- case '304': return 'invalid_response';
- case '305': return 'empty_response';
- case '306': return 'cannot_parse_address';
- case '307': return 'read_error';
- case '308': return 'connection_error';
- case '309': return 'read_to_end_error';
- case '1000': return 'cannot_create_streams_directory';
- case '1001': return 'cannot_create_stream_directory';
- case '1002': return 'cannot_create_stream_info';
- case '1003': return 'cannot_update_stream_info';
- case '1004': return 'cannot_open_stream_info';
- case '1005': return 'cannot_read_stream_info';
- case '1006': return 'cannot_create_stream';
- case '1007': return 'cannot_delete_stream';
- case '1008': return 'cannot_delete_stream_directory';
- case '1009': return 'stream_id_not_found';
- case '1010': return 'stream_name_not_found';
- case '1011': return 'stream_id_already_exists';
- case '1012': return 'stream_name_already_exists';
- case '1013': return 'invalid_stream_name';
- case '1014': return 'invalid_stream_id';
- case '1015': return 'cannot_read_streams';
- case '2000': return 'cannot_create_topics_directory';
- case '2001': return 'cannot_create_topic_directory';
- case '2002': return 'cannot_create_topic_info';
- case '2003': return 'cannot_update_topic_info';
- case '2004': return 'cannot_open_topic_info';
- case '2005': return 'cannot_read_topic_info';
- case '2006': return 'cannot_create_topic';
- case '2007': return 'cannot_delete_topic';
- case '2008': return 'cannot_delete_topic_directory';
- case '2009': return 'cannot_poll_topic';
- case '2010': return 'topic_id_not_found';
- case '2011': return 'topic_name_not_found';
- case '2012': return 'topic_id_already_exists';
- case '2013': return 'topic_name_already_exists';
- case '2014': return 'invalid_topic_name';
- case '2015': return 'too_many_partitions';
- case '2016': return 'invalid_topic_id';
- case '2017': return 'cannot_read_topics';
- case '3000': return 'cannot_create_partition';
- case '3001': return 'cannot_create_partitions_directory';
- case '3002': return 'cannot_create_partition_directory';
- case '3003': return 'cannot_open_partition_log_file';
- case '3004': return 'cannot_read_partitions';
- case '3005': return 'cannot_delete_partition';
- case '3006': return 'cannot_delete_partition_directory';
- case '3007': return 'partition_not_found';
- case '3008': return 'no_partitions';
- case '4000': return 'segment_not_found';
- case '4001': return 'segment_closed';
- case '4002': return 'invalid_segment_size';
- case '4003': return 'cannot_create_segment_log_file';
- case '4004': return 'cannot_create_segment_index_file';
- case '4005': return 'cannot_create_segment_time_index_file';
- case '4006': return 'cannot_save_messages_to_segment';
- case '4007': return 'cannot_save_index_to_segment';
- case '4008': return 'cannot_save_time_index_to_segment';
- case '4009': return 'invalid_messages_count';
- case '4010': return 'cannot_append_message';
- case '4011': return 'cannot_read_message';
- case '4012': return 'cannot_read_message_id';
- case '4013': return 'cannot_read_message_state';
- case '4014': return 'cannot_read_message_timestamp';
- case '4015': return 'cannot_read_headers_length';
- case '4016': return 'cannot_read_headers_payload';
- case '4017': return 'too_big_headers_payload';
- case '4018': return 'invalid_header_key';
- case '4019': return 'invalid_header_value';
- case '4020': return 'cannot_read_message_length';
- case '4021': return 'cannot_read_message_payload';
- case '4022': return 'too_big_message_payload';
- case '4023': return 'too_many_messages';
- case '4024': return 'empty_message_payload';
- case '4025': return 'invalid_message_payload_length';
- case '4026': return 'cannot_read_message_checksum';
- case '4027': return 'invalid_message_checksum';
- case '4028': return 'invalid_key_value_length';
- case '4100': return 'invalid_offset';
- case '4101': return 'cannot_read_consumer_offsets';
- case '5000': return 'consumer_group_not_found';
- case '5001': return 'consumer_group_already_exists';
- case '5002': return 'consumer_group_member_not_found';
- case '5003': return 'invalid_consumer_group_id';
- case '5004': return 'cannot_create_consumer_groups_directory';
- case '5005': return 'cannot_read_consumer_groups';
- case '5006': return 'cannot_create_consumer_group_info';
- case '5007': return 'cannot_delete_consumer_group_info';
+ case '1': return "Error";
+ case '2': return "Invalid configuration";
+ case '3': return "Invalid command";
+ case '4': return "Invalid format";
+ case '5': return "Feature is unavailable";
+ case '6': return "Invalid identifier";
+ case '7': return "Invalid version: {0}";
+ case '8': return "Disconnected";
+ case '9': return "Cannot establish connection";
+ case '10': return "Cannot create base directory, Path: {0}";
+ case '11': return "Cannot create runtime directory, Path: {0}";
+ case '12': return "Cannot remove runtime directory, Path: {0}";
+ case '13': return "Cannot create state directory, Path: {0}";
+ case '14': return "State file not found";
+ case '15': return "State file corrupted";
+ case '16': return "Invalid state entry checksum: {0}, expected: {1}, for
index: {2}";
+ case '19': return "Cannot open database, Path: {0}";
+ case '20': return "Resource with key: {0} was not found.";
+ case '30': return "Stale client";
+ case '31': return "TCP error";
+ case '32': return "QUIC error";
+ case '33': return "Invalid server address";
+ case '34': return "Invalid client address";
+ case '40': return "Unauthenticated";
+ case '41': return "Unauthorized";
+ case '42': return "Invalid credentials";
+ case '43': return "Invalid username";
+ case '44': return "Invalid password";
+ case '45': return "Invalid user status";
+ case '46': return "User already exists";
+ case '47': return "User inactive";
+ case '48': return "Cannot delete user with ID: {0}";
+ case '49': return "Cannot change permissions for user with ID: {0}";
+ case '50': return "Invalid personal access token name";
+ case '51': return "Personal access token: {0} for user with ID: {1}
already exists";
+ case '52': return "User with ID: {0} has reached the maximum number of
personal access tokens: {1}";
+ case '53': return "Invalid personal access token";
+ case '54': return "Personal access token: {0} for user with ID: {1} has
expired.";
+ case '55': return "Users limit reached.";
+ case '61': return "Not connected";
+ case '63': return "Client shutdown";
+ case '64': return "Invalid TLS domain";
+ case '65': return "Invalid TLS certificate path";
+ case '66': return "Invalid TLS certificate";
+ case '67': return "Failed to add certificate";
+ case '70': return "Invalid encryption key";
+ case '71': return "Cannot encrypt data";
+ case '72': return "Cannot decrypt data";
+ case '73': return "Invalid JWT algorithm: {0}";
+ case '74': return "Invalid JWT secret";
+ case '75': return "JWT is missing";
+ case '76': return "Cannot generate JWT";
+ case '77': return "Access token is missing";
+ case '78': return "Invalid access token";
+ case '80': return "Invalid size bytes";
+ case '81': return "Invalid UTF-8";
+ case '82': return "Invalid number encoding";
+ case '83': return "Invalid boolean value";
+ case '84': return "Invalid number value";
+
+ case '100': return "Client with ID: {0} was not found.";
+ case '101': return "Invalid client ID";
+
+ case '206': return "Connection closed";
+ case '209': return "Cannot parse header kind from {0}";
+ case '300': return "HTTP response error, status: {0}, body: {1}";
+ case '301': return "Invalid HTTP request";
+ case '302': return "Invalid JSON response";
+ case '303': return "Invalid bytes response";
+ case '304': return "Empty response";
+ case '305': return "Cannot create endpoint";
+ case '306': return "Cannot parse URL";
+
+ // STREAM
+ case '1000': return "Cannot create streams directory, Path: {0}";
+ case '1001': return "Cannot create stream with ID: {0} directory, Path:
{1}";
+ case '1002': return "Failed to create stream info file for stream with ID:
{0}";
+ case '1003': return "Failed to update stream info for stream with ID: {0}";
+ case '1004': return "Failed to open stream info file for stream with ID:
{0}";
+ case '1005': return "Failed to read stream info file for stream with ID:
{0}";
+ case '1006': return "Failed to create stream with ID: {0}";
+ case '1007': return "Failed to delete stream with ID: {0}";
+ case '1008': return "Failed to delete stream directory with ID: {0}";
+ case '1009': return "Stream with ID: {0} was not found.";
+ case '1010': return "Stream with name: {0} was not found.";
+ case '1011': return "Stream with ID: {0} already exists.";
+ case '1012': return "Stream with name: {0} already exists.";
+ case '1013': return "Invalid stream name";
+ case '1014': return "Invalid stream ID";
+ case '1015': return "Cannot read streams";
+ case '1016': return "Missing streams";
+ case '1017': return "Missing topics for stream with ID: {0}";
+ case '1018': return "Missing partitions for topic with ID: {0} for stream
with ID: {1}.";
+ case '1019': return "Max topic size cannot be lower than segment size. Max
topic size: {0} < segment size: {1}.";
+
+
+ case '2000': return "Cannot create topics directory for stream with ID:
{0}, Path: {1}";
+ case '2001': return "Failed to create directory for topic with ID: {0} for
stream with ID: {1}, Path: {2}";
+ case '2002': return "Failed to create topic info file for topic with ID:
{0} for stream with ID: {1}.";
+ case '2003': return "Failed to update topic info for topic with ID: {0}
for stream with ID: {1}.";
+ case '2004': return "Failed to open topic info file for topic with ID: {0}
for stream with ID: {1}.";
+ case '2005': return "Failed to read topic info file for topic with ID: {0}
for stream with ID: {1}.";
+ case '2006': return "Failed to create topic with ID: {0} for stream with
ID: {1}.";
+ case '2007': return "Failed to delete topic with ID: {0} for stream with
ID: {1}.";
+ case '2008': return "Failed to delete topic directory with ID: {0} for
stream with ID: {1}, Path: {2}";
+ case '2009': return "Cannot poll topic";
+ case '2010': return "Topic with ID: {0} for stream with ID: {1} was not
found.";
+ case '2011': return "Topic with name: {0} for stream with ID: {1} was not
found.";
+ case '2012': return "Topic with ID: {0} for stream with ID: {1} already
exists.";
+ case '2013': return "Topic with name: {0} for stream with ID: {1} already
exists.";
+ case '2014': return "Invalid topic name";
+ case '2015': return "Too many partitions";
+ case '2016': return "Invalid topic ID";
+ case '2017': return "Cannot read topics for stream with ID: {0}";
+ case '2018': return "Invalid replication factor";
+
+ // TOPIC
+ case '3000': return "Cannot create partition with ID: {0} for stream with
ID: {1} and topic with ID: {2}";
+ case '3001': return "Failed to create directory for partitions for stream
with ID: {0} and topic with ID: {1}";
+ case '3002': return "Failed to create directory for partition with ID: {0}
for stream with ID: {1} and topic with ID: {2}";
+ case '3003': return "Cannot open partition log file";
+ case '3004': return "Cannot read partitions directories.";
+ case '3005': return "Failed to delete partition with ID: {0} for stream
with ID: {1} and topic with ID: {2}";
+ case '3006': return "Failed to delete partition directory with ID: {0} for
stream with ID: {1} and topic with ID: {2}";
+ case '3007': return "Partition with ID: {0} for topic with ID: {1} for
stream with ID: {2} was not found.";
+ case '3008': return "Topic with ID: {0} for stream with ID: {1} has no
partitions.";
+ case '3009': return "Cannot read partitions for topic with ID: {0} for
stream with ID: {1}";
+ case '3010': return "Failed to delete consumer offsets directory for path:
{0}";
+ case '3011': return "Failed to delete consumer offset file for path: {0}";
+ case '3012': return "Failed to create consumer offsets directory for path:
{0}";
+ case '3020': return "Failed to read consumers offsets from path: {0}";
+ case '3021': return "Consumer offset for consumer with ID: {0} was not
found.";
+
+ // MESSAGE
+ case '4000': return "Segment not found";
+ case '4001': return "Segment with start offset: {0} and partition with ID:
{1} is closed";
+ case '4002': return "Segment size is invalid";
+ case '4003': return "Failed to create segment log file for Path: {0}.";
+ case '4004': return "Failed to create segment index file for Path: {0}.";
+ case '4005': return "Failed to create segment time index file for Path:
{0}.";
+ case '4006': return "Cannot save messages to segment.";
+ case '4007': return "Cannot save index to segment.";
+ case '4008': return "Cannot save time index to segment.";
+ case '4009': return "Invalid messages count";
+ case '4010': return "Cannot append message";
+ case '4011': return "Cannot read message";
+ case '4012': return "Cannot read message ID";
+ case '4013': return "Cannot read message state";
+ case '4014': return "Cannot read message timestamp";
+ case '4015': return "Cannot read headers length";
+ case '4016': return "Cannot read headers payload";
+ case '4017': return "Too big headers payload";
+ case '4018': return "Invalid header key";
+ case '4019': return "Invalid header value";
+ case '4020': return "Cannot read message length";
+ case '4021': return "Cannot save messages to segment";
+ case '4022': return "Too big message payload";
+ case '4023': return "Too many messages";
+ case '4024': return "Empty message payload";
+ case '4025': return "Invalid message payload length";
+ case '4026': return "Cannot read message checksum";
+ case '4027': return "Invalid message checksum: {0}, expected: {1}, for
offset: {2}";
+ case '4028': return "Cannot append message to segment at offset: {0}";
+
+ // STORAGE
+ case '5000': return "File not found";
+ case '5001': return "Failed to open file";
+ case '5002': return "Failed to write file";
+ case '5003': return "Failed to close file";
+ case '5004': return "Failed to delete file";
+ case '5005': return "Cannot read file";
+ case '5006': return "Invalid file size";
+ case '5007': return "Cannot create file";
+ case '5008': return "Cannot rename file";
+ case '5009': return "Cannot get file info";
+ case '5010': return "Failed to create directory for file";
+ case '5011': return "Failed to create file for {0}";
+ case '5012': return "Failed to create backup for {0}";
+ case '5013': return "Failed to create file path for backup file: {0}";
+ case '5014': return "Failed to backup file: {0}";
+ case '5015': return "Failed to restore file: {0}";
+ case '5016': return "Failed to restore file path for backup file: {0}";
+ case '5017': return "Failed to delete backup file: {0}";
+ case '5018': return "Failed to delete directory for file: {0}";
+ case '5019': return "Failed to delete file: {0}";
+ case '5020': return "Failed to copy file: {0}";
+
default: return 'error';
}
-}
\ No newline at end of file
+}
diff --git a/foreign/node/src/wire/stream/create-stream.command.ts
b/foreign/node/src/wire/stream/create-stream.command.ts
index 928b771b..e15825b0 100644
--- a/foreign/node/src/wire/stream/create-stream.command.ts
+++ b/foreign/node/src/wire/stream/create-stream.command.ts
@@ -45,7 +45,7 @@ export const CREATE_STREAM = {
bName
]);
},
-
+
deserialize: (r: CommandResponse) => {
return deserializeToStream(r.data, 0).data
}