This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new a418e823 feat: split transport from client, add heartbeat and 
reconnection support (#1809)
a418e823 is described below

commit a418e823669ed0860f63ef1394ab292df524cb91
Author: T1B0 <[email protected]>
AuthorDate: Thu May 29 22:07:33 2025 +0200

    feat: split transport from client, add heartbeat and reconnection support 
(#1809)
    
    - split transport code from client
    - add simple reconnection support (interval + maxRetries)
    - add heartbeat support
    - better error handling
    
    Co-authored-by: Piotr Gankiewicz <[email protected]>
---
 foreign/node/src/client/client.socket.ts           | 208 +++++++++-----
 foreign/node/src/client/client.transport.ts        | 154 ++++++++++
 foreign/node/src/client/client.ts                  | 110 ++++----
 foreign/node/src/client/client.type.ts             |  27 +-
 foreign/node/src/client/index.ts                   |   2 -
 foreign/node/src/debug-send.ts                     | 102 ++++---
 foreign/node/src/debug.ts                          |  20 +-
 foreign/node/src/e2e/tcp.consumer-group.e2e.ts     |   2 +-
 foreign/node/src/stream/consumer-stream.ts         |  37 ++-
 foreign/node/src/tcp.e2e.ts                        |  11 +-
 foreign/node/src/tls.system.e2e.ts                 |  11 +-
 foreign/node/src/wire/error.code.ts                | 310 +++++++++++++--------
 .../node/src/wire/stream/create-stream.command.ts  |   2 +-
 13 files changed, 659 insertions(+), 337 deletions(-)

diff --git a/foreign/node/src/client/client.socket.ts 
b/foreign/node/src/client/client.socket.ts
index 1476e1d5..24af592c 100644
--- a/foreign/node/src/client/client.socket.ts
+++ b/foreign/node/src/client/client.socket.ts
@@ -18,33 +18,27 @@
  */
 
 
-import type { Socket } from 'node:net';
 import { Duplex } from 'node:stream';
+
 import type {
-  ClientCredentials, CommandResponse, PasswordCredentials, TokenCredentials
+  ClientConfig,
+  ClientCredentials, CommandResponse,
+  PasswordCredentials, RawClient, TokenCredentials
 } from './client.type.js';
+
 import { handleResponse, serializeCommand } from './client.utils.js';
 import { responseError } from '../wire/error.utils.js';
 import { LOGIN } from '../wire/session/login.command.js';
 import { LOGIN_WITH_TOKEN } from '../wire/session/login-with-token.command.js';
 import { debug } from './client.debug.js';
+import { IggyTransport } from './client.transport.js';
+import { PING } from '../wire/index.js';
 
-export const wrapSocket = (socket: Socket) =>
-  new Promise<CommandResponseStream>((resolve, reject) => {
-    const responseStream = new CommandResponseStream(socket);
-
-    socket.on('error', (err: unknown) => {
-      console.error('RESPONSESTREAM ERROR', err)
-      reject(err);
-    });
-    socket.once('connect', () => {
-      debug('responseStream.connect event');
-      resolve(responseStream);
-    });
-    socket.on('close', () => { debug('socket#close'); reject(); });
-    socket.on('end', () => { console.error('socket#end'); reject(); });
-  });
-
+const UNLOGGED_COMMAND_CODE = [
+  PING.code,
+  LOGIN.code,
+  LOGIN_WITH_TOKEN.code
+];
 
 type WriteCb = ((error: Error | null | undefined) => void) | undefined
 
@@ -56,26 +50,29 @@ type Job = {
 };
 
 
-export class CommandResponseStream extends Duplex {
-  private _socket: Socket;
+export class CommandResponseStream extends Duplex  {
+  private options: ClientConfig;
+  private transport: IggyTransport;
   private _readPaused: boolean;
   private _execQueue: Job[];
   public busy: boolean;
   isAuthenticated: boolean;
   userId?: number;
 
-  constructor(socket: Socket) {
+  constructor(options: ClientConfig) {
     super();
-    this._socket = this._wrapSocket(socket);
+    this.options = options;
+    this.transport = new IggyTransport(options);    
     this._readPaused = false;
     this.busy = false;
-    this._execQueue = [];
     this.isAuthenticated = false;
+    this._execQueue = [];
+    this.wrapTransport();
+    this.heartbeat(options.heartbeatInterval);
   };
-
-  // Probably triggered by Duplex class
+  
   _destroy() {
-    this._socket.destroy();
+    this.transport.socket.destroy();
   }
 
   _read(size: number): void {
@@ -85,49 +82,31 @@ export class CommandResponseStream extends Duplex {
   }
 
   _write(chunk: Buffer, encoding: BufferEncoding | undefined, cb?: WriteCb) {
-    return this._socket.write(chunk, encoding, cb);
+    return this.transport.socket.write(chunk, encoding, cb);
   };
 
   writeCommand(command: number, payload: Buffer): boolean {
     const cmd = serializeCommand(command, payload);
-    return this._socket.write(cmd);
+    return this.transport.socket.write(cmd);
   }
 
   sendCommand(
-    command: number, payload: Buffer, handleResponse = true
+    command: number, payload: Buffer, handleResponse = true, last = true
   ): Promise<CommandResponse> {
     return new Promise((resolve, reject) => {
-      this._execQueue.push({ command, payload, resolve, reject });
+      if(last)
+        this._execQueue.push({ command, payload, resolve, reject });
+      else
+        this._execQueue.unshift({ command, payload, resolve, reject });
       this._processQueue(handleResponse);
     });
   }
 
-  async authenticate(creds: ClientCredentials) {
-    const r = ('token' in creds) ?
-      await this._authWithToken(creds) :
-      await this._authWithPassword(creds);
-    this.isAuthenticated = true;
-    this.userId = r.userId;
-    return this.isAuthenticated;
-  }
-
-  async _authWithPassword(creds: PasswordCredentials) {
-    const pl = LOGIN.serialize(creds);
-    const logr = await this.sendCommand(LOGIN.code, pl);
-    return LOGIN.deserialize(logr);
-  }
-
-  async _authWithToken(creds: TokenCredentials) {
-    const pl = LOGIN_WITH_TOKEN.serialize(creds);
-    const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl);
-    return LOGIN_WITH_TOKEN.deserialize(logr);
-  }
-
   async _processQueue(handleResponse = true): Promise<void> {
     if (this.busy)
       return;
     this.busy = true;
-    while (this._execQueue.length > 0) {
+    while (this._execQueue.length > 0 && this.transport.socket.writable) {
       const next = this._execQueue.shift();
       if (!next) break;
       const { command, payload, resolve, reject } = next;
@@ -146,8 +125,11 @@ export class CommandResponseStream extends Duplex {
     payload: Buffer,
     handleResp = true
   ): Promise<CommandResponse> {
-    debug('==> writeCommand', this.writeCommand(command, payload));
+    const lastWrite = this.writeCommand(command, payload);
+    debug('==> writeCommand', lastWrite);
     return new Promise((resolve, reject) => {
+      if(!lastWrite)
+        return reject(new Error('failed to write to socket'));
       const errCb = (err: unknown) => reject(err);
       this.once('error', errCb);
       this.once('data', (resp) => {
@@ -162,35 +144,74 @@ export class CommandResponseStream extends Duplex {
     });
   }
 
+  _failQueue(err: Error) {
+    this._execQueue.forEach(({ reject }) => reject(err));
+    this._execQueue = [];
+  }  
+
   getReadStream() {
-    return this;//.pipe(new PassThrough());
+    return this;
   }
+  
+  wrapTransport() {
+    this.transport.on('connect', () => {
+      debug('socket#connect event');
+      this.emit('connect');
+      if(this._execQueue.length > 0) {
+        if(!this.isAuthenticated) {
+          const needAuth = this._execQueue.some(
+            q => !UNLOGGED_COMMAND_CODE.includes(q.command)
+          );
+          if(needAuth)
+            this.authenticate(this.options.credentials);
+        }
+        this._processQueue();
+      }
+    });
 
-  _wrapSocket(socket: Socket) {
-    // pass through
-    socket.on('close', hadError => this.emit('close', hadError));
-    socket.on('connect', () => this.emit('connect'));
-    socket.on('drain', () => this.emit('drain'));
-    socket.on('end', () => this.emit('end'));
-    socket.on('error', err => this.emit('error', err));
-    socket.on(
+    this.transport.on('close', async (hadError?: boolean) => {
+      console.log(
+        'socket#close',
+        { hadError, connected: this.transport.connected },
+      );
+      this.emit('close');
+    });
+
+    this.transport.on('end', async () => {
+      console.error('socket#end');
+      this.emit('end');
+    });
+    
+    this.transport.on('error', (err) => {
+      console.error('socket#err', err);
+      this._failQueue(err);
+      this.emit('error', err);
+    });
+
+    this.transport.on('disconnected', async () => {
+      this.isAuthenticated = false;
+    });
+
+    this.transport.on('drain', () => this.emit('drain'));
+
+    this.transport.on(
       'lookup',
       (err, address, family, host) => this.emit('lookup', err, address, 
family, host)
     );
-    socket.on('ready', () => this.emit('ready'));
-    socket.on('timeout', () => this.emit('timeout'));
+    this.transport.on('ready', () => this.emit('ready'));
+    this.transport.on('timeout', () => this.emit('timeout'));
 
     // customize data events
-    socket.on('readable', () => this._onReadable());
-    return socket;
+    this.transport.on('readable', () => this._onReadable());
+    return this.transport;
   }
 
   _onReadable() {
     while (!this._readPaused) {
-      const head = this._socket.read(8);
+      const head = this.transport.socket.read(8);
       if (!head || head.length === 0) return;
       if (head.length < 8) {
-        this._socket.unshift(head);
+        this.transport.socket.unshift(head);
         return;
       }
       /** first chunk[4:8] hold response length */
@@ -201,15 +222,15 @@ export class CommandResponseStream extends Duplex {
         return;
       }
 
-      const payload = this._socket.read(responseSize);
+      const payload = this.transport.socket.read(responseSize);
       debug('payload', payload, responseSize, head.readUInt32LE(0));
       if (!payload) {
-        this._socket.unshift(head);
+        this.transport.socket.unshift(head);
         return;
       }
       /** payload is incomplete, unshift until next read */
       if (payload.length < responseSize) {
-        this._socket.unshift(Buffer.concat([head, payload]));
+        this.transport.socket.unshift(Buffer.concat([head, payload]));
         return;
       }
 
@@ -219,4 +240,49 @@ export class CommandResponseStream extends Duplex {
         this._readPaused = true;
     }
   }
+
+  async authenticate(creds: ClientCredentials) {
+    const r = ('token' in creds) ?
+      await this._authWithToken(creds) :
+      await this._authWithPassword(creds);
+    this.isAuthenticated = true;
+    this.userId = r.userId;
+    return this.isAuthenticated;
+  }
+
+  async _authWithPassword(creds: PasswordCredentials) {
+    const pl = LOGIN.serialize(creds);
+    const logr = await this.sendCommand(LOGIN.code, pl, true, false);
+    return LOGIN.deserialize(logr);
+  }
+
+  async _authWithToken(creds: TokenCredentials) {
+    const pl = LOGIN_WITH_TOKEN.serialize(creds);
+    const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl, true, 
false);
+    return LOGIN_WITH_TOKEN.deserialize(logr);
+  }
+
+  async ping() {
+    const pl = PING.serialize();
+    const pingR = await this.sendCommand(PING.code, pl, true);
+    return PING.deserialize(pingR);
+  }
+
+  heartbeat(interval?: number) {
+    if(!interval)
+      return
+    
+    setInterval(async () => {
+      if(this.transport.connected) {
+        debug(`sending hearbeat ping (interval: ${interval} ms)`);
+        await this.ping()
+      }
+    }, interval);
+  }
+
+  
 };
+
+export function getRawClient(options: ClientConfig): RawClient {
+  return new CommandResponseStream(options);
+}
diff --git a/foreign/node/src/client/client.transport.ts 
b/foreign/node/src/client/client.transport.ts
new file mode 100644
index 00000000..4614a12f
--- /dev/null
+++ b/foreign/node/src/client/client.transport.ts
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import { EventEmitter } from 'node:events';
+import { createConnection, type Socket } from 'node:net';
+import { connect as TLSConnect } from 'node:tls';
+import { debug } from './client.debug.js';
+import type { ClientConfig, TlsOption, TcpOption, ReconnectOption } from 
'./client.type.js';
+
+
+const createTcpSocket = (options: TcpOption): Socket => {
+  return createConnection(options);
+};
+
+const createTlsSocket = ({ port, ...options }: TlsOption): Socket => {
+  const socket = TLSConnect(port, options);
+  socket.setEncoding('utf8');
+  return socket;
+};
+
+const getTransport = (config: ClientConfig): Socket => {
+  const { transport, options } = config;
+  switch (transport) {
+    case 'TLS': return createTlsSocket(options);
+    case 'TCP':
+    default:
+      return createTcpSocket(options);
+  }
+};
+
+const DefaultReconnectOption: ReconnectOption = {
+  enabled: true,
+  interval: 5 * 1000,
+  maxRetries: 12
+}
+
+function recreate(option: ClientConfig, timer=1000): Promise<Socket> {
+  return new Promise((resolve) => {
+    setTimeout(() => {
+      resolve(getTransport(option));
+    }, timer);
+  });
+}
+
+export class IggyTransport extends EventEmitter {
+  public socket: Socket;
+  public connected: boolean;
+  public connecting: boolean;
+  private config: ClientConfig;
+  private reconnectOption: ReconnectOption;
+  private reconnectCount: number;
+  private lastError: unknown[];
+
+  constructor(config: ClientConfig) {
+    super()
+    this.config = config;
+    this.reconnectOption = { ...DefaultReconnectOption, ...config.reconnect };
+    this.reconnectCount = 0;
+    this.connected = false;
+    this.connecting = true;
+    this.socket = getTransport(config);
+    this.wrapSocket();
+    this.lastError = [];
+  }
+
+  wrapSocket() {
+    this.socket.on('connect', () => {
+      debug('socket#connect event');
+      this.connected = true;
+      this.connecting = false;
+      // const lastReconnectCount = this.reconnectCount;
+      this.reconnectCount = 0;
+      this.lastError = [];
+      this.emit('connect');
+      // if(lastReconnectCount > 0)
+      //   this.emit('connect');
+    });
+
+    this.socket.once('close', async (hadError?: boolean) => {
+      this.connected = false;
+      this.emit('disconnected', hadError);
+      const { enabled, interval, maxRetries } = this.reconnectOption
+      debug(
+        'socket#close event',
+        {
+          hadError,
+          reconnect: { enabled, interval, maxRetries },
+          count:this.reconnectCount
+        }
+      );
+      if(!enabled)
+        return this.emit('close', hadError);
+
+      if(this.reconnectCount > maxRetries) {
+        debug(`reconnect reached maxRetries of ${ maxRetries }`, 
this.lastError);
+        this.emit('close', hadError);
+        this.emit(
+          'error',
+          new Error(
+            `reconnect maxRetries exceeded (count: ${ this.reconnectCount })`,
+            { cause: this.lastError.pop() }
+          ));
+        return;
+      }
+
+      this.connecting = true;
+      this.reconnectCount += 1;
+      this.socket = await recreate(this.config, interval);
+      this.wrapSocket();      
+      // this.emit('close');
+    });
+    
+    this.socket.on('end', async (err: unknown) => {
+      console.error('Transport/socket#end', err);
+      this.emit('end');
+    });
+
+    this.socket.on('drain', () => this.emit('drain'));
+
+    this.socket.on('error', (err) => {
+      console.error('Transport/socket#error !1/2-noemit', err);
+      this.lastError.push(err);
+      if(!this.connecting && !this.connected)
+        this.emit('error', err);
+    });
+
+    this.socket.on(
+      'lookup',
+      (err, address, family, host) => this.emit('lookup', err, address, 
family, host)
+    );
+    this.socket.on('ready', () => this.emit('ready'));
+    this.socket.on('timeout', () => this.emit('timeout'));
+    // customize data events
+    this.socket.on('readable', () => this.emit('readable'));
+  };
+
+};
diff --git a/foreign/node/src/client/client.ts 
b/foreign/node/src/client/client.ts
index 08489a42..e7ac472e 100644
--- a/foreign/node/src/client/client.ts
+++ b/foreign/node/src/client/client.ts
@@ -18,84 +18,82 @@
  */
 
 
-import type { RawClient, ClientConfig } from "./client.type.js"
 import { createPool, type Pool } from 'generic-pool';
+import type { RawClient, ClientConfig } from "./client.type.js"
+import { getRawClient } from './client.socket.js';
 import { CommandAPI } from '../wire/command-set.js';
-import { TcpClient } from './tcp.client.js';
-import { TlsClient } from './tls.client.js';
 import { debug } from './client.debug.js';
 
 
-export const rawClientGetter = (config: ClientConfig): Promise<RawClient> => {
-  const { transport, options } = config;
-  switch (transport) {
-    case 'TLS': return TlsClient(options);
-    case 'TCP':
-    default:
-      return TcpClient(options);
-  }
-}
-
 // create & destroy must be async
 const createPoolFactory = (config: ClientConfig) => ({
-  create: function () {
-    return rawClientGetter(config);
+  create: async function () {
+    return getRawClient(config);
   },
   destroy: async function (client: RawClient) {
     return client.destroy();
   }
 });
 
+const poolClientProvider = (config: ClientConfig) => {
+  const min = config.poolSize?.min || 1;
+  const max = config.poolSize?.max || 4;
+  const pool = createPool(createPoolFactory(config), { min, max });
+  const poolClientProvider = async () => {
+    const c = await pool.acquire();
+    if (!c.isAuthenticated)
+      await c.authenticate(config.credentials);
+    debug('client acquired from pool. pool size is', pool.size);
+    c.once('finishQueue', () => {
+      pool.release(c)
+      debug('client released to pool. pool size is', pool.size);
+    });
+    return c;
+  }
+  poolClientProvider._pool = pool;
+  return poolClientProvider;
+};
+
+
 export class Client extends CommandAPI {
   _config: ClientConfig
   _pool: Pool<RawClient>
-  destroy: () => void
   
   constructor(config: ClientConfig) {
-    const min = config.poolSize?.min || 1;
-    const max = config.poolSize?.max || 4;
-    const pool = createPool(createPoolFactory(config), { min, max });
-    const getFromPool = async () => {
-      const c = await pool.acquire();
-      if (!c.isAuthenticated)
-        await c.authenticate(config.credentials);
-      debug('client acquired from pool. pool size is', pool.size);
-      c.once('finishQueue', () => {
-        pool.release(c)
-        debug('client released to pool. pool size is', pool.size);
-      });
-      return c;
-    };
-    super(getFromPool);
+    const pcp = poolClientProvider(config);
+    super(pcp);
     this._config = config;
-    this._pool = pool;
-    this.destroy = async () => {
-      debug('destroying client pool. pool size is', pool.size);
-      await this._pool.drain();
-      await this._pool.clear();
-      debug('destroyed client pool. pool size is', pool.size);
-    }
+    this._pool = pcp._pool;
   };
+
+  async destroy() {
+    debug('destroying client pool. pool size is', this._pool.size);
+    await this._pool.drain();
+    await this._pool.clear();
+    debug('destroyed client pool. pool size is', this._pool.size);
+  }
+}
+
+const singleClientProvider = (config: ClientConfig) => {
+  const c = getRawClient(config);
+  return async function singleClientProvider () {
+    if (!c.isAuthenticated)
+      await c.authenticate(config.credentials);
+    return c;
+  }
 }
 
 export class SingleClient extends CommandAPI {
   _config: ClientConfig
-  destroy: () => void
-  
+
   constructor(config: ClientConfig) {
-    const cliP = rawClientGetter(config);
-    const init = async () => {
-      const c = await cliP;
-      if (!c.isAuthenticated)
-        await c.authenticate(config.credentials);
-      return c;
-    };
-    super(init);
+    super(singleClientProvider(config));
     this._config = config;
-    this.destroy = async () => {
-      const s = await this.clientProvider();
-      s.destroy();
-    };
+  }
+
+  async destroy() {
+    const s = await this.clientProvider();
+    s.destroy();
   }
 };
 
@@ -104,10 +102,16 @@ export class SimpleClient extends CommandAPI {
   constructor(client: RawClient) {
     super(() => Promise.resolve(client));
   }
+  
+  async destroy() {
+    const s = await this.clientProvider();
+    s.destroy();
+  }
+
 };
 
 export const getClient = async (config: ClientConfig) => {
-  const cli = await rawClientGetter(config);
+  const cli = getRawClient(config);
   if (!cli.isAuthenticated)
     await cli.authenticate(config.credentials);
   const api = new SimpleClient(cli);
diff --git a/foreign/node/src/client/client.type.ts 
b/foreign/node/src/client/client.type.ts
index a162c247..6a39e112 100644
--- a/foreign/node/src/client/client.type.ts
+++ b/foreign/node/src/client/client.type.ts
@@ -19,8 +19,11 @@
 
 
 import type { Readable } from 'stream';
-import type { TcpOption } from './tcp.client.js';
-import type { TlsOption } from './tls.client.js';
+import { type TcpSocketConnectOpts } from 'node:net';
+import { type ConnectionOptions } from 'node:tls';
+
+export type TcpOption = TcpSocketConnectOpts;
+export type TlsOption = { port: number } & ConnectionOptions;
 
 export type CommandResponse = {
   status: number,
@@ -29,12 +32,14 @@ export type CommandResponse = {
 };
 
 export type RawClient = {
-  sendCommand: (code: number, payload: Buffer, handleResponse?: boolean) => 
Promise<CommandResponse>,
+  sendCommand: (
+    code: number, payload: Buffer, handleResponse?: boolean
+  ) => Promise<CommandResponse>,
   destroy: () => void,
   isAuthenticated: boolean
   authenticate: (c: ClientCredentials) => Promise<boolean>
-  on: (ev: string, cb: () => void) => void
-  once: (ev: string, cb: () => void) => void
+  on: (ev: string, cb: (e?: unknown) => void) => void
+  once: (ev: string, cb: (e?: unknown) => void) => void
   getReadStream: () => Readable
 }
 
@@ -43,7 +48,13 @@ export type ClientProvider = () => Promise<RawClient>;
 export const Transports = ['TCP', 'TLS' /**, 'QUIC' */] as const;
 export type TransportType = typeof Transports[number];
 
-type TransportOption = TcpOption | TlsOption;
+export type ReconnectOption = {
+  enabled: boolean,
+  interval: number,
+  maxRetries: number
+}
+
+export type TransportOption = TcpOption | TlsOption;
 
 export type TokenCredentials = {
   token: string
@@ -65,5 +76,7 @@ export type ClientConfig = {
   transport: TransportType,
   options: TransportOption,
   credentials: ClientCredentials,
-  poolSize?: PoolSizeOption
+  poolSize?: PoolSizeOption,
+  reconnect?: ReconnectOption,
+  heartbeatInterval?: number
 }
diff --git a/foreign/node/src/client/index.ts b/foreign/node/src/client/index.ts
index ab8372d6..de2290c1 100644
--- a/foreign/node/src/client/index.ts
+++ b/foreign/node/src/client/index.ts
@@ -19,8 +19,6 @@
 
 
 export { Client, SimpleClient, SingleClient } from './client.js'
-export { TcpClient, type TcpOption } from './tcp.client.js';
-export { TlsClient, type TlsOption } from './tls.client.js';
 export * from './client.utils.js';
 export * from './client.socket.js';
 export * from './client.type.js';
diff --git a/foreign/node/src/debug-send.ts b/foreign/node/src/debug-send.ts
index 3ca5f664..7cd51ff5 100644
--- a/foreign/node/src/debug-send.ts
+++ b/foreign/node/src/debug-send.ts
@@ -18,12 +18,10 @@
  */
 
 
-
 import assert from 'node:assert/strict';
-import { Client, SingleClient } from './client/index.js';
-import { uuidv7, uuidv4 } from 'uuidv7'
+import { ClientConfig, SingleClient } from './client/index.js';
 import { groupConsumerStream } from './stream/consumer-stream.js';
-import { PollingStrategy, Partitioning, type PollMessagesResponse } from 
'./wire/index.js';
+import { PollingStrategy, type PollMessagesResponse } from './wire/index.js';
 
 const streamId = 321;
 const topicId = 543;
@@ -42,7 +40,7 @@ const topic = {
   compressionAlgorithm: 1
 };
 
-const opt = {
+const opt: ClientConfig = {
   transport: 'TCP' as const,
   options: {
     port: 8090,
@@ -51,7 +49,12 @@ const opt = {
     // keepAlive: true
   },
   credentials: { username: 'iggy', password: 'iggy' },
-};2
+  reconnect: {
+    enabled: true,
+    interval: 10 * 1000,
+    maxRetries: 10
+  }
+};
 
 const c = new SingleClient(opt);
 
@@ -77,63 +80,58 @@ try {
   console.log('server stream CREATED::', { streamId });
   await c.topic.create(topic);
   console.log('server topic CREATED::', { topicId });
-  // send
 
-  // setInterval(async () => {
+  // send
+  setInterval(async () => {
     assert.ok(await c.message.send(msg));
-  //   console.log('message SEND::', { msg })
-  // }, 3000);
-
-
-  // // POLL MESSAGE
-  // const pollReq = {
-  //   groupId,
-  //   streamId,
-  //   topicId,
-  //   pollingStrategy: PollingStrategy.Next,
-  //   count: 1,
-  //   interval: 5000
-  // };
-
-  // const cs = await groupConsumerStream(opt)(pollReq);
-  // const dataCb = (d: PollMessagesResponse) => {
-  //   console.log('cli/DATA POLLED:', d);
-  //   // recv += d.count;
-  //   // if (recv === ct)
-  //   //   str.destroy();
-  // };
-
-  // cs.on('data', dataCb);
-
-  // cs.on('error', (err) => {
-  //   console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
-  //   // cs.destroy(err);
-  // });
-
-  // cs.on('end', async () => {
-  //   console.log('cli/=>>Stream END::')
-  //   cs.destroy();
-  //   await cleanup();
-  // });
-
-  // cs.on('close', async () => {
-  //   console.log('cli/=>>Stream CLOSE::')
-  //   // await cleanup();
-  // });
+    console.log('message SEND::', { msg })
+  }, 3000);
+
+
+  // POLL MESSAGE
+  const pollReq = {
+    groupId,
+    streamId,
+    topicId,
+    pollingStrategy: PollingStrategy.Next,
+    count: 1,
+    interval: 5000
+  };
+
+  const cs = await groupConsumerStream(opt)(pollReq);
+  const dataCb = (d: PollMessagesResponse) => {
+    console.log('cli/DATA POLLED:', d);
+    // recv += d.count;
+    // if (recv === ct)
+    //   str.destroy();
+  };
+
+  cs.on('data', dataCb);
+
+  cs.on('error', (err) => {
+    console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
+    cs.destroy(err);
+  });
+
+  cs.on('end', async () => {
+    console.log('cli/=>>Stream END::')
+    cs.destroy();
+    await cleanup();
+  });
+
+  cs.on('close', async () => {
+    console.log('cli/=>>Stream CLOSE::')
+    // await cleanup();
+  });
 
 
 } catch (err) {
   console.error('client catchALL END', err)
   await cleanup();
-} finally {
-  console.error('finally::cleanup')
-  await cleanup();
 }
 
-
 // process.on('SIGINT', async () => {
 //   await cleanup();
 //   console.log('cleanup OK, exiting...');
 //   process.exit()
 // });
-
diff --git a/foreign/node/src/debug.ts b/foreign/node/src/debug.ts
index bf1cd329..afabcf27 100644
--- a/foreign/node/src/debug.ts
+++ b/foreign/node/src/debug.ts
@@ -17,13 +17,11 @@
  * under the License.
  */
 
-
-
 import assert from 'node:assert/strict';
 import { Client } from './client/index.js';
 import { uuidv7, uuidv4 } from 'uuidv7'
 import { groupConsumerStream } from './stream/consumer-stream.js';
-import { PollingStrategy, Partitioning, type PollMessagesResponse } from 
'./wire/index.js';
+import { PollingStrategy, type PollMessagesResponse } from './wire/index.js';
 
 const streamId = 123;
 const topicId = 345;
@@ -46,7 +44,13 @@ const opt = {
   transport: 'TCP' as const,
   options: { port: 8090, host: '127.0.0.1', allowHalfOpen: true, keepAlive: 
true },
   credentials: { username: 'iggy', password: 'iggy' },
-};2
+  reconnect: {
+    enabled: true,
+    interval: 10 * 1000,
+    maxRetries: 10
+  },
+  heartbeatInterval: 5 * 1000
+};
 
 const c = new Client(opt);
 
@@ -74,6 +78,7 @@ try {
   console.log('server stream CREATED::', { streamId });
   await c.topic.create(topic);
   console.log('server topic CREATED::', { topicId });
+  
   // send
   assert.ok(await c.message.send(msg));
   console.log('message SEND::', { msg })
@@ -84,7 +89,7 @@ try {
     streamId,
     topicId,
     pollingStrategy: PollingStrategy.Next,
-    count: 1,
+    count: 5,
     interval: 5000
   };
 
@@ -93,16 +98,13 @@ try {
   const cs = await groupConsumerStream(opt)(pollReq);
   const dataCb = (d: PollMessagesResponse) => {
     console.log('cli/DATA POLLED:', d);
-    // recv += d.count;
-    // if (recv === ct)
-    //   str.destroy();
   };
 
   cs.on('data', dataCb);
 
   cs.on('error', (err) => {
     console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
-    // cs.destroy(err);
+    cs.destroy(err);
   });
 
   cs.on('end', async () => {
diff --git a/foreign/node/src/e2e/tcp.consumer-group.e2e.ts 
b/foreign/node/src/e2e/tcp.consumer-group.e2e.ts
index c678cdca..1edf93fc 100644
--- a/foreign/node/src/e2e/tcp.consumer-group.e2e.ts
+++ b/foreign/node/src/e2e/tcp.consumer-group.e2e.ts
@@ -107,7 +107,7 @@ describe('e2e -> consumer-group', async () => {
     };
     let ct = 0;
     while (ct < payloadLength) {
-      const { messages, ...resp } = await c.message.poll(pollReq);
+      const { messages /** , ...resp*/ } = await c.message.poll(pollReq);
       // console.log('POLL', messages.length, 'R/C', resp.count, messages, 
resp, ct);
       // assert.equal(messages.length, resp.count);
       ct += messages.length;
diff --git a/foreign/node/src/stream/consumer-stream.ts 
b/foreign/node/src/stream/consumer-stream.ts
index c197db0b..3e07afdc 100644
--- a/foreign/node/src/stream/consumer-stream.ts
+++ b/foreign/node/src/stream/consumer-stream.ts
@@ -20,10 +20,12 @@
 
 import { Readable, pipeline, PassThrough } from "node:stream";
 import type { ClientConfig, RawClient } from "../client/client.type.js";
+import { getRawClient } from '../client/client.socket.js';
 import type { Id } from '../wire/identifier.utils.js';
-import { SimpleClient, rawClientGetter } from "../client/client.js";
+import { getClient } from "../client/client.js";
 import { type PollMessages, POLL_MESSAGES } from 
"../wire/message/poll-messages.command.js";
 import { type PollingStrategy, ConsumerKind, type CommandAPI } from 
"../wire/index.js";
+import { debug } from "../client/client.debug.js";
 
 
 const wait = (interval: number, cb?: () => void): Promise<void> =>
@@ -41,12 +43,8 @@ async function* genAutoCommitedPoll(
   while (true) {
     const r = await c.message.poll(poll);
     yield r;
-
-    const k = `${r.partitionId}`;
-    let part = state.get(k) || 0;    
-    part = r.count;
-    state.set(k, part);
-
+    state.set(`${r.partitionId}`, r.count);
+    
     if (Array.from(state).every(([, last]) => last === 0)) {
       await wait(interval);
     }
@@ -62,12 +60,11 @@ async function* genPoll(
 };
 
 
-
 export const singleConsumerStream = (config: ClientConfig) => async (
   poll: PollMessages,
   // interval: 1000
 ): Promise<Readable> => {
-  const c = await rawClientGetter(config);
+  const c = getRawClient(config);
   if (!c.isAuthenticated)
     await c.authenticate(config.credentials);
   const ps = Readable.from(genPoll(c, poll), { objectMode: true });
@@ -100,16 +97,15 @@ export const groupConsumerStream = (config: ClientConfig) 
=>
     interval = 1000,
     autocommit = true
   }: ConsumerGroupStreamRequest): Promise<Readable> {
-    const c = await rawClientGetter(config);
-    const s = new SimpleClient(c);
-    if (!c.isAuthenticated)
-      await c.authenticate(config.credentials);
-
+    const s = await getClient(config);
+    
     try {
       await s.group.get({ streamId, topicId, groupId })
       // eslint-disable-next-line @typescript-eslint/no-unused-vars
     } catch (err) {
-      await s.group.create({ streamId, topicId, groupId, name: 
`auto-${groupId}` })
+      const ng = { streamId, topicId, groupId, name: `auto-${groupId}` };
+      debug('group does not exist, creating it', ng);
+      await s.group.create(ng)
     }
 
     await s.group.join({ streamId, topicId, groupId });
@@ -124,6 +120,17 @@ export const groupConsumerStream = (config: ClientConfig) 
=>
       autocommit
     }
     const ps = Readable.from(genAutoCommitedPoll(s, poll, interval), { 
objectMode: true });
+    // return (await s.clientProvider()).getReadStream();
+    // c.on('error', (err: unknown) => {
+    //   debug('groupConsumerStream::client error', err);
+    //   // ps.emit('error', err);
+    // });
+
+    // c.on('end', () => {
+    //   debug('groupConsumerStream::END');
+    //   // ps.emit('end');
+    // });
+
     return ps;
   };
 
diff --git a/foreign/node/src/tcp.e2e.ts b/foreign/node/src/tcp.e2e.ts
index 3d18bc47..c105015a 100644
--- a/foreign/node/src/tcp.e2e.ts
+++ b/foreign/node/src/tcp.e2e.ts
@@ -18,7 +18,7 @@
  */
 
 
-import { TcpClient } from './client/index.js';
+import { getRawClient } from './client/index.js';
 
 import {
   login, logout,
@@ -32,7 +32,14 @@ import {
 
 try {
   // create socket
-  const cli = TcpClient({ host: '127.0.0.1', port: 8090 });
+  const cli = getRawClient({
+    transport: 'TCP' as const,
+    options: {
+      host: '127.0.0.1',
+      port: 8090
+    },
+    credentials: { username: 'iggy', password: 'iggy' }
+  });
   const s = () => Promise.resolve(cli);
 
   // LOGIN
diff --git a/foreign/node/src/tls.system.e2e.ts 
b/foreign/node/src/tls.system.e2e.ts
index 73d52a73..1e2f2187 100644
--- a/foreign/node/src/tls.system.e2e.ts
+++ b/foreign/node/src/tls.system.e2e.ts
@@ -18,12 +18,19 @@
  */
 
 
-import { TlsClient } from './client/index.js';
+import { getRawClient } from './client/index.js';
 import { login, logout, getStats, ping } from './wire/index.js';
 
 try {
   // create socket
-  const cli = TlsClient({ host: '127.0.0.1', port: 8090 });
+  const cli = getRawClient({
+    transport: 'TCP' as const,
+    options: {
+      host: '127.0.0.1',
+      port: 8090
+    },
+    credentials: { username: 'iggy', password: 'iggy' }
+  });
   const s = () => Promise.resolve(cli);
 
   // PING
diff --git a/foreign/node/src/wire/error.code.ts 
b/foreign/node/src/wire/error.code.ts
index d7464c78..a65caf87 100644
--- a/foreign/node/src/wire/error.code.ts
+++ b/foreign/node/src/wire/error.code.ts
@@ -21,127 +21,193 @@
 
 export const translateErrorCode = (code: number): string => {
   switch (code.toString()) {
-    case '1': return 'error';
-    case '2': return 'invalid_configuration';
-    case '3': return 'invalid_command';
-    case '4': return 'invalid_format';
-    case '5': return 'feature_unavailable';
-    case '10': return 'cannot_create_base_directory';
-    case '20': return 'resource_not_found';
-    case '21': return 'cannot_load_resource';
-    case '22': return 'cannot_save_resource';
-    case '23': return 'cannot_delete_resource';
-    case '24': return 'cannot_serialize_resource';
-    case '25': return 'cannot_deserialize_resource';
-    case '40': return 'unauthenticated';
-    case '41': return 'unauthorized';
-    case '42': return 'invalid_credentials';
-    case '43': return 'invalid_username';
-    case '44': return 'invalid_password';
-    case '51': return 'not_connected';
-    case '52': return 'request_error';
-    case '60': return 'invalid_encryption_key';
-    case '61': return 'cannot_encrypt_data';
-    case '62': return 'cannot_decrypt_data';
-    case '100': return 'client_not_found';
-    case '101': return 'invalid_client_id';
-    case '200': return 'io_error';
-    case '201': return 'write_error';
-    case '202': return 'cannot_parse_utf8';
-    case '203': return 'cannot_parse_int';
-    case '204': return 'cannot_parse_slice';
-    case '300': return 'http_response_error';
-    case '301': return 'request_middleware_error';
-    case '302': return 'cannot_create_endpoint';
-    case '303': return 'cannot_parse_url';
-    case '304': return 'invalid_response';
-    case '305': return 'empty_response';
-    case '306': return 'cannot_parse_address';
-    case '307': return 'read_error';
-    case '308': return 'connection_error';
-    case '309': return 'read_to_end_error';
-    case '1000': return 'cannot_create_streams_directory';
-    case '1001': return 'cannot_create_stream_directory';
-    case '1002': return 'cannot_create_stream_info';
-    case '1003': return 'cannot_update_stream_info';
-    case '1004': return 'cannot_open_stream_info';
-    case '1005': return 'cannot_read_stream_info';
-    case '1006': return 'cannot_create_stream';
-    case '1007': return 'cannot_delete_stream';
-    case '1008': return 'cannot_delete_stream_directory';
-    case '1009': return 'stream_id_not_found';
-    case '1010': return 'stream_name_not_found';
-    case '1011': return 'stream_id_already_exists';
-    case '1012': return 'stream_name_already_exists';
-    case '1013': return 'invalid_stream_name';
-    case '1014': return 'invalid_stream_id';
-    case '1015': return 'cannot_read_streams';
-    case '2000': return 'cannot_create_topics_directory';
-    case '2001': return 'cannot_create_topic_directory';
-    case '2002': return 'cannot_create_topic_info';
-    case '2003': return 'cannot_update_topic_info';
-    case '2004': return 'cannot_open_topic_info';
-    case '2005': return 'cannot_read_topic_info';
-    case '2006': return 'cannot_create_topic';
-    case '2007': return 'cannot_delete_topic';
-    case '2008': return 'cannot_delete_topic_directory';
-    case '2009': return 'cannot_poll_topic';
-    case '2010': return 'topic_id_not_found';
-    case '2011': return 'topic_name_not_found';
-    case '2012': return 'topic_id_already_exists';
-    case '2013': return 'topic_name_already_exists';
-    case '2014': return 'invalid_topic_name';
-    case '2015': return 'too_many_partitions';
-    case '2016': return 'invalid_topic_id';
-    case '2017': return 'cannot_read_topics';
-    case '3000': return 'cannot_create_partition';
-    case '3001': return 'cannot_create_partitions_directory';
-    case '3002': return 'cannot_create_partition_directory';
-    case '3003': return 'cannot_open_partition_log_file';
-    case '3004': return 'cannot_read_partitions';
-    case '3005': return 'cannot_delete_partition';
-    case '3006': return 'cannot_delete_partition_directory';
-    case '3007': return 'partition_not_found';
-    case '3008': return 'no_partitions';
-    case '4000': return 'segment_not_found';
-    case '4001': return 'segment_closed';
-    case '4002': return 'invalid_segment_size';
-    case '4003': return 'cannot_create_segment_log_file';
-    case '4004': return 'cannot_create_segment_index_file';
-    case '4005': return 'cannot_create_segment_time_index_file';
-    case '4006': return 'cannot_save_messages_to_segment';
-    case '4007': return 'cannot_save_index_to_segment';
-    case '4008': return 'cannot_save_time_index_to_segment';
-    case '4009': return 'invalid_messages_count';
-    case '4010': return 'cannot_append_message';
-    case '4011': return 'cannot_read_message';
-    case '4012': return 'cannot_read_message_id';
-    case '4013': return 'cannot_read_message_state';
-    case '4014': return 'cannot_read_message_timestamp';
-    case '4015': return 'cannot_read_headers_length';
-    case '4016': return 'cannot_read_headers_payload';
-    case '4017': return 'too_big_headers_payload';
-    case '4018': return 'invalid_header_key';
-    case '4019': return 'invalid_header_value';
-    case '4020': return 'cannot_read_message_length';
-    case '4021': return 'cannot_read_message_payload';
-    case '4022': return 'too_big_message_payload';
-    case '4023': return 'too_many_messages';
-    case '4024': return 'empty_message_payload';
-    case '4025': return 'invalid_message_payload_length';
-    case '4026': return 'cannot_read_message_checksum';
-    case '4027': return 'invalid_message_checksum';
-    case '4028': return 'invalid_key_value_length';
-    case '4100': return 'invalid_offset';
-    case '4101': return 'cannot_read_consumer_offsets';
-    case '5000': return 'consumer_group_not_found';
-    case '5001': return 'consumer_group_already_exists';
-    case '5002': return 'consumer_group_member_not_found';
-    case '5003': return 'invalid_consumer_group_id';
-    case '5004': return 'cannot_create_consumer_groups_directory';
-    case '5005': return 'cannot_read_consumer_groups';
-    case '5006': return 'cannot_create_consumer_group_info';
-    case '5007': return 'cannot_delete_consumer_group_info';
+    case '1': return "Error";
+    case '2': return "Invalid configuration";
+    case '3': return "Invalid command";
+    case '4': return "Invalid format";
+    case '5': return "Feature is unavailable";
+    case '6': return "Invalid identifier";
+    case '7': return "Invalid version: {0}";
+    case '8': return "Disconnected";
+    case '9': return "Cannot establish connection";
+    case '10': return "Cannot create base directory, Path: {0}";
+    case '11': return "Cannot create runtime directory, Path: {0}";
+    case '12': return "Cannot remove runtime directory, Path: {0}";
+    case '13': return "Cannot create state directory, Path: {0}";
+    case '14': return "State file not found";
+    case '15': return "State file corrupted";
+    case '16': return "Invalid state entry checksum: {0}, expected: {1}, for 
index: {2}";
+    case '19': return "Cannot open database, Path: {0}";
+    case '20': return "Resource with key: {0} was not found.";
+    case '30': return "Stale client";
+    case '31': return "TCP error";
+    case '32': return "QUIC error";
+    case '33': return "Invalid server address";
+    case '34': return "Invalid client address";
+    case '40': return "Unauthenticated";
+    case '41': return "Unauthorized";
+    case '42': return "Invalid credentials";
+    case '43': return "Invalid username";
+    case '44': return "Invalid password";
+    case '45': return "Invalid user status";
+    case '46': return "User already exists";
+    case '47': return "User inactive";
+    case '48': return "Cannot delete user with ID: {0}";
+    case '49': return "Cannot change permissions for user with ID: {0}";
+    case '50': return "Invalid personal access token name";
+    case '51': return "Personal access token: {0} for user with ID: {1} 
already exists";
+    case '52': return "User with ID: {0} has reached the maximum number of 
personal access tokens: {1}";
+    case '53': return "Invalid personal access token";
+    case '54': return "Personal access token: {0} for user with ID: {1} has 
expired.";
+    case '55': return "Users limit reached.";
+    case '61': return "Not connected";
+    case '63': return "Client shutdown";
+    case '64': return "Invalid TLS domain";
+    case '65': return "Invalid TLS certificate path";
+    case '66': return "Invalid TLS certificate";
+    case '67': return "Failed to add certificate";
+    case '70': return "Invalid encryption key";
+    case '71': return "Cannot encrypt data";
+    case '72': return "Cannot decrypt data";
+    case '73': return "Invalid JWT algorithm: {0}";
+    case '74': return "Invalid JWT secret";
+    case '75': return "JWT is missing";
+    case '76': return "Cannot generate JWT";
+    case '77': return "Access token is missing";
+    case '78': return "Invalid access token";
+    case '80': return "Invalid size bytes";
+    case '81': return "Invalid UTF-8";
+    case '82': return "Invalid number encoding";
+    case '83': return "Invalid boolean value";
+    case '84': return "Invalid number value";
+
+    case '100': return "Client with ID: {0} was not found.";
+    case '101': return "Invalid client ID";
+
+    case '206': return "Connection closed";
+    case '209': return "Cannot parse header kind from {0}";
+    case '300': return "HTTP response error, status: {0}, body: {1}";
+    case '301': return "Invalid HTTP request";
+    case '302': return "Invalid JSON response";
+    case '303': return "Invalid bytes response";
+    case '304': return "Empty response";
+    case '305': return "Cannot create endpoint";
+    case '306': return "Cannot parse URL";
+
+    // STREAM
+    case '1000': return "Cannot create streams directory, Path: {0}";
+    case '1001': return "Cannot create stream with ID: {0} directory, Path: 
{1}";
+    case '1002': return "Failed to create stream info file for stream with ID: 
{0}";
+    case '1003': return "Failed to update stream info for stream with ID: {0}";
+    case '1004': return "Failed to open stream info file for stream with ID: 
{0}";
+    case '1005': return "Failed to read stream info file for stream with ID: 
{0}";
+    case '1006': return "Failed to create stream with ID: {0}";
+    case '1007': return "Failed to delete stream with ID: {0}";
+    case '1008': return "Failed to delete stream directory with ID: {0}";
+    case '1009': return "Stream with ID: {0} was not found.";
+    case '1010': return "Stream with name: {0} was not found.";
+    case '1011': return "Stream with ID: {0} already exists.";
+    case '1012': return "Stream with name: {0} already exists.";
+    case '1013': return "Invalid stream name";
+    case '1014': return "Invalid stream ID";
+    case '1015': return "Cannot read streams";
+    case '1016': return "Missing streams";
+    case '1017': return "Missing topics for stream with ID: {0}";
+    case '1018': return "Missing partitions for topic with ID: {0} for stream 
with ID: {1}.";
+    case '1019': return "Max topic size cannot be lower than segment size. Max 
topic size: {0} < segment size: {1}.";
+
+  
+    case '2000': return "Cannot create topics directory for stream with ID: 
{0}, Path: {1}";
+    case '2001': return "Failed to create directory for topic with ID: {0} for 
stream with ID: {1}, Path: {2}";
+    case '2002': return "Failed to create topic info file for topic with ID: 
{0} for stream with ID: {1}.";
+    case '2003': return "Failed to update topic info for topic with ID: {0} 
for stream with ID: {1}.";
+    case '2004': return "Failed to open topic info file for topic with ID: {0} 
for stream with ID: {1}.";
+    case '2005': return "Failed to read topic info file for topic with ID: {0} 
for stream with ID: {1}.";
+    case '2006': return "Failed to create topic with ID: {0} for stream with 
ID: {1}.";
+    case '2007': return "Failed to delete topic with ID: {0} for stream with 
ID: {1}.";
+    case '2008': return "Failed to delete topic directory with ID: {0} for 
stream with ID: {1}, Path: {2}";
+    case '2009': return "Cannot poll topic";
+    case '2010': return "Topic with ID: {0} for stream with ID: {1} was not 
found.";
+    case '2011': return "Topic with name: {0} for stream with ID: {1} was not 
found.";
+    case '2012': return "Topic with ID: {0} for stream with ID: {1} already 
exists.";
+    case '2013': return "Topic with name: {0} for stream with ID: {1} already 
exists.";
+    case '2014': return "Invalid topic name";
+    case '2015': return "Too many partitions";
+    case '2016': return "Invalid topic ID";
+    case '2017': return "Cannot read topics for stream with ID: {0}";
+    case '2018': return "Invalid replication factor";
+
+    // TOPIC
+    case '3000': return "Cannot create partition with ID: {0} for stream with 
ID: {1} and topic with ID: {2}";
+    case '3001': return "Failed to create directory for partitions for stream 
with ID: {0} and topic with ID: {1}";
+    case '3002': return "Failed to create directory for partition with ID: {0} 
for stream with ID: {1} and topic with ID: {2}";
+    case '3003': return "Cannot open partition log file";
+    case '3004': return "Cannot read partitions directories.";
+    case '3005': return "Failed to delete partition with ID: {0} for stream 
with ID: {1} and topic with ID: {2}";
+    case '3006': return "Failed to delete partition directory with ID: {0} for 
stream with ID: {1} and topic with ID: {2}";
+    case '3007': return "Partition with ID: {0} for topic with ID: {1} for 
stream with ID: {2} was not found.";
+    case '3008': return "Topic with ID: {0} for stream with ID: {1} has no 
partitions.";
+    case '3009': return "Cannot read partitions for topic with ID: {0} for 
stream with ID: {1}";
+    case '3010': return "Failed to delete consumer offsets directory for path: 
{0}";
+    case '3011': return "Failed to delete consumer offset file for path: {0}";
+    case '3012': return "Failed to create consumer offsets directory for path: 
{0}";
+    case '3020': return "Failed to read consumers offsets from path: {0}";
+    case '3021': return "Consumer offset for consumer with ID: {0} was not 
found.";
+
+    // MESSAGE
+    case '4000': return "Segment not found";
+    case '4001': return "Segment with start offset: {0} and partition with ID: 
{1} is closed";
+    case '4002': return "Segment size is invalid";
+    case '4003': return "Failed to create segment log file for Path: {0}.";
+    case '4004': return "Failed to create segment index file for Path: {0}.";
+    case '4005': return "Failed to create segment time index file for Path: 
{0}.";
+    case '4006': return "Cannot save messages to segment.";
+    case '4007': return "Cannot save index to segment.";
+    case '4008': return "Cannot save time index to segment.";
+    case '4009': return "Invalid messages count";
+    case '4010': return "Cannot append message";
+    case '4011': return "Cannot read message";
+    case '4012': return "Cannot read message ID";
+    case '4013': return "Cannot read message state";
+    case '4014': return "Cannot read message timestamp";
+    case '4015': return "Cannot read headers length";
+    case '4016': return "Cannot read headers payload";
+    case '4017': return "Too big headers payload";
+    case '4018': return "Invalid header key";
+    case '4019': return "Invalid header value";
+    case '4020': return "Cannot read message length";
+    case '4021': return "Cannot save messages to segment";
+    case '4022': return "Too big message payload";
+    case '4023': return "Too many messages";
+    case '4024': return "Empty message payload";
+    case '4025': return "Invalid message payload length";
+    case '4026': return "Cannot read message checksum";
+    case '4027': return "Invalid message checksum: {0}, expected: {1}, for 
offset: {2}";
+    case '4028': return "Cannot append message to segment at offset: {0}";
+
+    // STORAGE
+    case '5000': return "File not found";
+    case '5001': return "Failed to open file";
+    case '5002': return "Failed to write file";
+    case '5003': return "Failed to close file";
+    case '5004': return "Failed to delete file";
+    case '5005': return "Cannot read file";
+    case '5006': return "Invalid file size";
+    case '5007': return "Cannot create file";
+    case '5008': return "Cannot rename file";
+    case '5009': return "Cannot get file info";
+    case '5010': return "Failed to create directory for file";
+    case '5011': return "Failed to create file for {0}";
+    case '5012': return "Failed to create backup for {0}";
+    case '5013': return "Failed to create file path for backup file: {0}";
+    case '5014': return "Failed to backup file: {0}";
+    case '5015': return "Failed to restore file: {0}";
+    case '5016': return "Failed to restore file path for backup file: {0}";
+    case '5017': return "Failed to delete backup file: {0}";
+    case '5018': return "Failed to delete directory for file: {0}";
+    case '5019': return "Failed to delete file: {0}";
+    case '5020': return "Failed to copy file: {0}";
+
     default: return 'error';
   }
-}
\ No newline at end of file
+}
diff --git a/foreign/node/src/wire/stream/create-stream.command.ts 
b/foreign/node/src/wire/stream/create-stream.command.ts
index 928b771b..e15825b0 100644
--- a/foreign/node/src/wire/stream/create-stream.command.ts
+++ b/foreign/node/src/wire/stream/create-stream.command.ts
@@ -45,7 +45,7 @@ export const CREATE_STREAM = {
       bName
     ]);
   },
-
+  
   deserialize: (r: CommandResponse) => {
     return deserializeToStream(r.data, 0).data
   }

Reply via email to