This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans-new
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans-new by this push:
     new 7350efee test
7350efee is described below

commit 7350efeece3352a499a49bd631b77ed890ac705b
Author: haze518 <[email protected]>
AuthorDate: Sat Aug 16 11:22:05 2025 +0600

    test
---
 core/common/src/error/iggy_error.rs        |   4 +
 core/integration/src/tcp_client.rs         |  22 +--
 core/integration/tests/sdk/producer/mod.rs |  10 +-
 core/sdk/src/connection/mod.rs             | 288 ++++++++++++++++++-----------
 core/sdk/src/protocol/mod.rs               | 216 ++++++++++------------
 5 files changed, 296 insertions(+), 244 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index 8059f6c6..b2209ced 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -469,6 +469,10 @@ pub enum IggyError {
     MaxRetriesExceeded = 10053,
     #[error("Connection timeout")]
     ConnectionTimeout = 10054,
+    #[error("Incorrect connection state")]
+    IncorrectConnectionState = 10055,
+    #[error("Connection missed socket")]
+    ConnectionMissedSocket = 10056,
 }
 
 impl IggyError {
diff --git a/core/integration/src/tcp_client.rs 
b/core/integration/src/tcp_client.rs
index 45936b34..c2fb8372 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -44,19 +44,13 @@ impl ClientFactory for TcpClientFactory {
             ..TcpClientConfig::default()
         };
 
-        let tcp_client = NewTcpClient::create(Arc::new(config)).unwrap();
-        let wrapper = ClientWrapper::New(tcp_client);
-
-        // let client = 
IggyClient::create(iggy::prelude::ClientWrapper::New(tcp_client), None, None);
-
-        // let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| 
{
-        //     panic!(
-        //         "Failed to create TcpClient, iggy-server has address {}, 
error: {:?}",
-        //         self.server_addr, e
-        //     )
-        // });
-
-        Client::connect(&wrapper).await.unwrap_or_else(|e| {
+        let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| {
+            panic!(
+                "Failed to create TcpClient, iggy-server has address {}, 
error: {:?}",
+                self.server_addr, e
+            )
+        });
+        Client::connect(&client).await.unwrap_or_else(|e| {
             if self.tls_enabled {
                 panic!(
                     "Failed to connect to iggy-server at {} with TLS enabled, 
error: {:?}\n\
@@ -72,7 +66,7 @@ impl ClientFactory for TcpClientFactory {
                 )
             }
         });
-        wrapper
+        ClientWrapper::Tcp(client)
     }
 
     fn transport(&self) -> Transport {
diff --git a/core/integration/tests/sdk/producer/mod.rs 
b/core/integration/tests/sdk/producer/mod.rs
index a0805e96..b6f4131a 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -18,13 +18,14 @@
 
 mod background;
 
+use std::net::SocketAddr;
 use std::{sync::Arc, time::Duration};
 
 use bytes::Bytes;
-use iggy::clients::client::IggyClient;
+use iggy::{clients::client::IggyClient, connection::tokio_tcp};
 use iggy::prelude::*;
 use integration::test_server::{login_root, TestServer};
-use iggy::connection::{TokioTcpTransport, NewTcpClient, TokioRuntime};
+use iggy::connection::{NewTcpClient, SFut, SocketFactory, TokioCompat, 
TokioRuntime};
 use tokio::time::sleep;
 
 const STREAM_ID: u32 = 1;
@@ -81,7 +82,10 @@ async fn test_async_send() {
     // let client = 
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
     // let client = IggyClient::create(client, None, None);
 
-    let tcp_client = 
NewTcpClient::create(Arc::new(tcp_client_config)).unwrap();
+    let factory: SocketFactory<TokioCompat> = Arc::new(move |addr: SocketAddr| 
-> SFut<TokioCompat> {
+        Box::pin(tokio_tcp(addr))
+    });
+    let tcp_client = NewTcpClient::create(Arc::new(tcp_client_config), 
factory).unwrap();
 
     let client = 
IggyClient::create(iggy::prelude::ClientWrapper::New(tcp_client), None, None);
 
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index bfe8ec0d..7aaad996 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,13 +1,24 @@
 use std::{
-    collections::{HashMap, VecDeque}, fmt::Debug, io, net::SocketAddr, 
ops::Deref, pin::Pin, str::FromStr, sync::{
-        atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Mutex
-    }, task::{ready, Context, Poll, Waker}, time::Duration
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    io,
+    net::SocketAddr,
+    ops::Deref,
+    pin::Pin,
+    str::FromStr,
+    sync::{
+        Arc, Mutex,
+        atomic::{AtomicBool, AtomicU64, Ordering},
+    },
+    task::{Context, Poll, Waker, ready},
+    time::Duration,
 };
 
+use crate::protocol::{ControlAction, ProtocolCore, ProtocolCoreConfig, 
Response, TxBuf};
 use async_broadcast::{Receiver, Sender, broadcast};
 use async_trait::async_trait;
 use bytes::{Buf, Bytes, BytesMut};
-use futures::{future::poll_fn, task::AtomicWaker, AsyncRead, AsyncWrite, 
FutureExt};
+use futures::{AsyncRead, AsyncWrite, FutureExt, future::poll_fn, 
task::AtomicWaker};
 use iggy_binary_protocol::{BinaryClient, BinaryTransport, Client};
 use iggy_common::{
     ClientState, Command, DiagnosticEvent, IggyDuration, IggyError, 
TcpClientConfig,
@@ -19,46 +30,24 @@ use tokio::{
 };
 use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, 
TokioAsyncWriteCompatExt};
 use tracing::{debug, error, info, trace, warn};
-use crate::protocol::{Order, ProtocolCore, ProtocolCoreConfig, Response, 
TxBuf};
 
-pub trait StreamBuilder {
-    type Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static;
+pub type SFut<S> = Pin<Box<dyn Future<Output = io::Result<S>> + Send>>;
+pub type SocketFactory<S> = Arc<dyn Fn(SocketAddr) -> SFut<S> + Send + Sync>;
 
-    fn connect(self, addr: SocketAddr) -> Pin<Box<dyn Future<Output = 
io::Result<Self::Stream>> + Send>>;
-}
+pub async fn tokio_tcp(addr: SocketAddr) -> io::Result<Compat<TcpStream>> {
+    let socket = match addr {
+        SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
+        SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
+    };
 
-pub struct TokioTcpBuilder {
-    inner: tokio::net::TcpSocket,
-}
-
-impl TokioTcpBuilder {
-    pub fn new_for(addr: &SocketAddr) -> io::Result<Self> {
-        let sock = match addr {
-            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
-            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
-        };
-        sock.set_nodelay(true)?;
-        Ok(Self { inner: sock })
-    }
-}
-
-impl StreamBuilder for TokioTcpBuilder {
-    type Stream = Compat<TcpStream>;
-
-    fn connect(self, addr: SocketAddr) -> Pin<Box<dyn Future<Output = 
io::Result<Self::Stream>> + Send>> {
-        Box::pin(async move {
-            let s = self.inner.connect(addr).await?;
-            Ok(s.compat())
-        })    
-    }
+    socket.set_nodelay(true)?;
+    let s = socket.connect(addr).await?;
+    Ok(s.compat())
 }
 
 pub type TokioCompat = Compat<TcpStream>;
 pub type NewTokioTcpClient = NewTcpClient<TokioCompat>;
 
-pub trait SocketFactory<S>: Fn(&SocketAddr) -> io::Result<S> + Send + Sync + 
'static {}
-impl<F, S> SocketFactory<S> for F where F: Fn(&SocketAddr) -> io::Result<S> + 
Send + Sync + 'static {}
-
 #[derive(Debug, Clone)]
 pub struct ConnectionStats {
     pub bytes_sent: u64,
@@ -112,12 +101,16 @@ pub struct ConnectionInner<S: AsyncIO> {
 pub struct ConnectionRef<S: AsyncIO>(Arc<ConnectionInner<S>>);
 
 impl<S: AsyncIO> ConnectionRef<S> {
-    fn new(state: ProtoConnectionState, factory: Box<dyn SocketFactory<S>>, 
config: Arc<TcpClientConfig>) -> Self {
+    fn new(
+        state: ProtoConnectionState,
+        socket_factory: SocketFactory<S>,
+        config: Arc<TcpClientConfig>,
+    ) -> Self {
         Self(Arc::new(ConnectionInner {
             state: Mutex::new(State {
                 inner: state,
                 driver: None,
-                socket_factory: factory,
+                socket_factory,
                 socket: None,
                 current_send: None,
                 send_offset: 0,
@@ -126,10 +119,15 @@ impl<S: AsyncIO> ConnectionRef<S> {
                 ready_responses: HashMap::new(),
                 recv_waiters: HashMap::new(),
                 config,
-                waiters: Arc::new(Waiters { map: Mutex::new(HashMap::new()), 
next_id: AtomicU64::new(0) }),
+                waiters: Arc::new(Waiters {
+                    map: Mutex::new(HashMap::new()),
+                    next_id: AtomicU64::new(0),
+                }),
                 pending_commands: VecDeque::new(),
                 ready_commands: VecDeque::new(),
-            })
+                connect_waiters: Vec::new(),
+                pending_connect: None,
+            }),
         }))
     }
 }
@@ -168,11 +166,20 @@ struct Waiters<T> {
 
 impl<T> Waiters<T> {
     fn new() -> Self {
-        Self { map: Mutex::new(HashMap::new()), next_id: AtomicU64::new(1) }
+        Self {
+            map: Mutex::new(HashMap::new()),
+            next_id: AtomicU64::new(1),
+        }
     }
     fn alloc(&self) -> RequestId {
         let id = self.next_id.fetch_add(1, Ordering::Relaxed);
-        self.map.lock().unwrap().insert(id, WaitEntry { waker: 
AtomicWaker::new(), result: None });
+        self.map.lock().unwrap().insert(
+            id,
+            WaitEntry {
+                waker: AtomicWaker::new(),
+                result: None,
+            },
+        );
         id
     }
     fn complete(&self, id: RequestId, val: T) -> bool {
@@ -198,7 +205,9 @@ impl<T> Future for WaitFuture<T> {
         if let Some(val) = {
             if let Some(e) = waiters.get_mut(&self.id) {
                 e.result.take()
-            } else { None }
+            } else {
+                None
+            }
         } {
             waiters.remove(&self.id);
             return Poll::Ready(val);
@@ -213,7 +222,9 @@ impl<T> Future for WaitFuture<T> {
         if let Some(val) = {
             if let Some(e) = waiters.get_mut(&self.id) {
                 e.result.take()
-            } else { None }
+            } else {
+                None
+            }
         } {
             waiters.remove(&self.id);
             return Poll::Ready(val);
@@ -223,14 +234,13 @@ impl<T> Future for WaitFuture<T> {
     }
 }
 
-pub struct State<S, B>
+pub struct State<S>
 where
     S: AsyncIO,
-    B: StreamBuilder<Stream = S>
 {
     inner: ProtoConnectionState,
     driver: Option<Waker>,
-    socket_factory: B,
+    socket_factory: SocketFactory<S>,
     socket: Option<S>,
     current_send: Option<TxBuf>,
     send_offset: usize,
@@ -244,25 +254,21 @@ where
     pending_commands: VecDeque<(u64, ClientCommand)>,
     ready_commands: VecDeque<u64>,
     connect_waiters: Vec<u64>,
+    pending_connect: Option<Pin<Box<dyn Future<Output = io::Result<S>> + 
Send>>>,
 }
 
-impl<S, B> Debug for State<S, B>
+impl<S> Debug for State<S>
 where
     S: AsyncIO,
-    B: StreamBuilder<Stream = S>
 {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(
-            f,
-            "test"
-        )
+        write!(f, "test")
     }
 }
 
-impl<S, B> State<S, B>
+impl<S> State<S>
 where
     S: AsyncIO,
-    B: StreamBuilder<Stream = S>
 {
     fn wake(&mut self) {
         if let Some(waker) = self.driver.take() {
@@ -274,38 +280,71 @@ where
         let id = self.waiters.alloc();
         // TODO перетащить в sans io ядро inner.core
         self.pending_commands.push_back((id, command));
-        WaitFuture { waiters: self.waiters.clone(), id }
+        WaitFuture {
+            waiters: self.waiters.clone(),
+            id,
+        }
     }
 
     fn enqueue_message(&mut self, code: u32, payload: Bytes) -> Result<u64, 
IggyError> {
         self.inner.core.send(code, payload)
     }
 
-    fn drive_client_commands(&mut self) -> bool {
+    fn drive_client_commands(&mut self) -> io::Result<bool> {
         for (request_id, cmd) in self.pending_commands.drain(..) {
             match cmd {
-                ClientCommand::Connect(server_addr) => {
+                ClientCommand::Connect(server_address) => {
                     self.connect_waiters.push(request_id);
-                    self.inner.core.
-                    // let socket = self.socket_factory.connect(server_addr);
-
-                    // self.socket = Some(socket);
-                    // self.ready_commands.push_back(request_id);
-                    // self.waiters.complete(request_id, Ok(Bytes::new()));
+                    self.inner
+                        .core
+                        .desire_connect(server_address)
+                        .map_err(|e| {
+                            io::Error::new(io::ErrorKind::ConnectionAborted, 
e.as_string())
+                        })?;
                 }
                 ClientCommand::Disconnect => {
-                    // TODO add processing
                     self.ready_commands.push_back(request_id);
-                    self.waiters.complete(request_id, Ok(Bytes::new()));
+                    self.inner.core.disconnect();
+                    // self.waiters.complete(request_id, Ok(Bytes::new()));
                 }
                 ClientCommand::Shutdown => {
-                    // TODO add processing
                     self.ready_commands.push_back(request_id);
-                    self.waiters.complete(request_id, Ok(Bytes::new()));
+                    self.inner.core.shutdown();
+                    // self.waiters.complete(request_id, Ok(Bytes::new()));
                 }
             }
         }
-        true
+        Ok(true)
+    }
+
+    fn drive_connect(&mut self, cx: &mut Context<'_>) -> io::Result<bool> {
+        if let Some(fut) = self.pending_connect.as_mut() {
+            match fut.as_mut().poll(cx) {
+                Poll::Pending => return Ok(true),
+                Poll::Ready(Ok(stream)) => {
+                    self.socket = Some(stream);
+                    self.pending_connect = None;
+                    self.inner.core.on_connected().map_err(|e| {
+                        io::Error::new(io::ErrorKind::ConnectionRefused, 
e.as_string())
+                    })?;
+                    for id in self.connect_waiters.drain(..) {
+                        let _ = self.waiters.complete(id, Ok(Bytes::new()));
+                    }
+                    return Ok(true);
+                }
+                Poll::Ready(Err(e)) => {
+                    self.pending_connect = None;
+                    self.inner.core.disconnect();
+                    for id in self.connect_waiters.drain(..) {
+                        let _ = self
+                            .waiters
+                            .complete(id, 
Err(IggyError::CannotEstablishConnection));
+                    }
+                    return Ok(true);
+                }
+            }
+        }
+        Ok(false)
     }
 
     fn drive_timer(&mut self, cx: &mut Context<'_>) -> bool {
@@ -319,28 +358,39 @@ where
     }
 
     fn drive_transmit(&mut self, cx: &mut Context<'_>) -> io::Result<bool> {
+        if self.current_send.is_none() {
+            if let Some(tx) = self.inner.core.poll_transmit() {
+                self.current_send = Some(tx);
+                self.send_offset = 0;
+            } else {
+                return Ok(false);
+            }
+        }
+
+        let socket = self
+            .socket
+            .as_mut()
+            .ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "No 
socket"))?;
+
+        let buf = self.current_send.as_ref().unwrap();
         let mut offset = self.send_offset;
-        let socket = self.socket.as_mut().ok_or(io::Error::new(
-            io::ErrorKind::NotConnected, 
-            "No socket"
-        ))?;
-
-        if let Some(buf) = self.current_send.take() {
-            while self.send_offset < buf.data.len() {
-                match Pin::new(&mut *socket).poll_write(cx, 
&buf.data[offset..])? {
-                    Poll::Ready(n) => {
-                        offset += n;
-                        self.send_offset += n;
-                    }
-                    Poll::Pending => return Ok(false),
+
+        while self.send_offset < buf.data.len() {
+            match Pin::new(&mut *socket).poll_write(cx, &buf.data[offset..])? {
+                Poll::Ready(n) => {
+                    offset += n;
+                    self.send_offset += n;
                 }
-            }
-            match Pin::new(socket).poll_flush(cx)? {
                 Poll::Pending => return Ok(false),
-                Poll::Ready(()) => {}
             }
-            self.current_send = None;
         }
+
+        match Pin::new(socket).poll_flush(cx)? {
+            Poll::Pending => return Ok(false),
+            Poll::Ready(()) => {}
+        }
+
+        self.current_send = None;
         Ok(true)
     }
 
@@ -354,13 +404,19 @@ where
 
         loop {
             let n = {
-                let socket = self.socket.as_mut().ok_or(io::Error::new(
-                    io::ErrorKind::NotConnected, 
-                    "No socket"
-                ))?;
+                let socket = self
+                    .socket
+                    .as_mut()
+                    .ok_or(io::Error::new(io::ErrorKind::NotConnected, "No 
socket"))?;
                 let mut pinned = Pin::new(&mut *socket);
                 match pinned.as_mut().poll_read(cx, &mut recv_scratch)? {
                     Poll::Pending => return Ok(false),
+                    Poll::Ready(0) => {
+                        return Err(io::Error::new(
+                            io::ErrorKind::UnexpectedEof,
+                            "Connection closed",
+                        ));
+                    }
                     Poll::Ready(n) => n,
                 }
             };
@@ -432,25 +488,28 @@ impl<S: AsyncIO> Future for ConnectionDriver<S> {
         let order = st.inner.core.poll();
 
         match order {
-            Order::Wait(dur) => {
-                st.wait_timer = 
Some(Box::pin(tokio::time::sleep(dur.get_duration())));
-                return Poll::Pending;
-            }
-            Order::Transmit(tx) => {
-                st.current_send = Some(tx);
-                st.send_offset = 0;
+            ControlAction::Wait(dur) => {
+                if st.wait_timer.is_none() {
+                    st.wait_timer = 
Some(Box::pin(tokio::time::sleep(dur.get_duration())));
+                }
             }
-            Order::Error(e) => {
+            ControlAction::Error(e) => {
                 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, 
format!("{e:?}"))));
             }
-            Order::Noop | Order::Authenticate { .. } => {}
-            Order::Connect => {todo!("добавить вызов метода из state")}
+            ControlAction::Noop | ControlAction::Authenticate { .. } => {}
+            ControlAction::Connect(server_adress) => {
+                if st.pending_connect.is_none() {
+                    st.pending_connect = 
Some((st.socket_factory)(server_adress));
+                }
+            }
         }
 
-        keep_going |= st.drive_client_commands();
-        keep_going |= st.drive_transmit(cx)?;
-        keep_going |= st.drive_receive(cx)?;
-
+        keep_going |= st.drive_connect(cx)?;
+        keep_going |= st.drive_client_commands()?;
+        if st.socket.is_some() {
+            keep_going |= st.drive_transmit(cx)?;
+            keep_going |= st.drive_receive(cx)?;
+        }
         if keep_going {
             cx.waker().wake_by_ref();
         } else {
@@ -472,21 +531,31 @@ pub struct NewTcpClient<S: AsyncIO> {
 }
 
 impl<S: AsyncIO + Send + Sync + 'static> NewTcpClient<S> {
-    pub fn create(config: Arc<TcpClientConfig>, factory:Box<dyn 
SocketFactory<S>>) -> Result<Self, IggyError> {
+    pub fn create(
+        config: Arc<TcpClientConfig>,
+        factory: SocketFactory<S>,
+    ) -> Result<Self, IggyError> {
         // let runtime = Arc::new(TokioRuntime {});
         // let transport = TokioTcpTransport::new(config.clone(), 
runtime.clone());
         // let state = transport.state.clone();
 
         let (tx, rx) = broadcast(1000);
         let (client_tx, client_rx) = flume::unbounded::<ClientCommand>();
-        
-        let proto_config = ProtocolCoreConfig{
+
+        let proto_config = ProtocolCoreConfig {
             auto_login: iggy_common::AutoLogin::Disabled,
             reestablish_after: IggyDuration::new_from_secs(5),
             max_retries: None,
         };
 
-        let conn = ConnectionRef::new(ProtoConnectionState { core: 
ProtocolCore::new(proto_config), error: None }, factory, config.clone());
+        let conn = ConnectionRef::new(
+            ProtoConnectionState {
+                core: ProtocolCore::new(proto_config),
+                error: None,
+            },
+            factory,
+            config.clone(),
+        );
         let driver = ConnectionDriver(conn.clone());
         tokio::spawn(async move {
             if let Err(e) = driver.await {
@@ -525,7 +594,8 @@ impl<S: AsyncIO + Send + Sync + 'static> NewTcpClient<S> {
             state.wake();
 
             Poll::Pending
-        }).await
+        })
+        .await
     }
 }
 
diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs
index 6979a995..22260495 100644
--- a/core/sdk/src/protocol/mod.rs
+++ b/core/sdk/src/protocol/mod.rs
@@ -1,9 +1,12 @@
 use std::{
-    collections::VecDeque, net::SocketAddr, sync::Arc, time::{Duration, 
Instant}
+    collections::VecDeque,
+    net::SocketAddr,
+    sync::Arc,
+    time::{Duration, Instant},
 };
 
 use bytes::{Buf, BufMut, Bytes, BytesMut};
-use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration, 
IggyError};
+use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration, 
IggyError, IggyTimestamp};
 use tracing::{debug, info, warn};
 
 const RESPONSE_HEADER_SIZE: usize = 8;
@@ -16,10 +19,9 @@ pub struct ProtocolCoreConfig {
 }
 
 #[derive(Debug)]
-pub enum Order {
+pub enum ControlAction {
     Connect(SocketAddr),
     Wait(IggyDuration),
-    Transmit(TxBuf),
     Authenticate { username: String, password: String },
     Noop,
     Error(IggyError),
@@ -41,7 +43,7 @@ pub struct Response {
 pub struct ProtocolCore {
     state: ClientState,
     config: ProtocolCoreConfig,
-    last_connect_attempt: Option<Instant>,
+    last_connect_attempt: Option<IggyTimestamp>,
     pub retry_count: u32,
     next_request_id: u64,
     pending_sends: VecDeque<(u32, Bytes, u64)>,
@@ -63,6 +65,7 @@ impl ProtocolCore {
             sent_order: VecDeque::new(),
             auth_pending: false,
             auth_request_id: None,
+            server_address: None,
         }
     }
 
@@ -70,74 +73,7 @@ impl ProtocolCore {
         self.state
     }
 
-    pub fn poll(&mut self) -> Order {
-        match self.state {
-            ClientState::Disconnected => self.handle_disconnected(),
-            ClientState::Connecting => Order::Connect,
-            ClientState::Connected => self.handle_connected(),
-            ClientState::Authenticating => Order::Noop,
-            ClientState::Authenticated => self.poll_transmit(),
-            ClientState::Shutdown => Order::Noop,
-        }
-    }
-
-    fn handle_disconnected(&mut self) -> Order {
-        if let Some(last_attempt) = self.last_connect_attempt {
-            let elapsed = last_attempt.elapsed().as_micros() as u64;
-            let interval = self.config.reestablish_after.as_micros();
-
-            if elapsed < interval {
-                let remaining = IggyDuration::from(interval - elapsed);
-                return Order::Wait(remaining);
-            }
-        }
-
-        if let Some(max_retries) = self.config.max_retries {
-            if self.retry_count >= max_retries {
-                return Order::Error(IggyError::MaxRetriesExceeded);
-            }
-        }
-
-        self.retry_count += 1;
-        self.last_connect_attempt = Some(Instant::now());
-        self.state = ClientState::Connecting;
-
-        debug!("Initiating connection (attempt {})", self.retry_count);
-        Order::Connect
-    }
-
-    fn handle_connected(&mut self) -> Order {
-        match &self.config.auto_login {
-            AutoLogin::Disabled => {
-                info!("Automatic sign-in is disabled.");
-                self.state = ClientState::Authenticated;
-            }
-            AutoLogin::Enabled(credentials) => {
-                if !self.auth_pending {
-                    self.state = ClientState::Authenticating;
-                    self.auth_pending = true;
-
-                    match credentials {
-                        Credentials::UsernamePassword(username, password) => {
-                            let auth_payload = encode_auth(&username, 
&password);
-                            let auth_id = self.queue_send(0x0A, auth_payload);
-                            self.auth_request_id = Some(auth_id);
-
-                            return self.poll_transmit();
-                        }
-                        _ => {
-                            todo!("add PersonalAccessToken")
-                        }
-                    }
-
-                }
-            }
-        }
-
-        self.poll_transmit()
-    }
-
-    fn poll_transmit(&mut self) -> Order {
+    pub fn poll_transmit(&mut self) -> Option<TxBuf> {
         if let Some((code, payload, request_id)) = 
self.pending_sends.pop_front() {
             let mut buf = BytesMut::new();
             let total_len = (payload.len() + 4) as u32;
@@ -147,12 +83,12 @@ impl ProtocolCore {
 
             self.sent_order.push_back(request_id);
 
-            Order::Transmit(TxBuf {
+            Some(TxBuf {
                 data: buf.freeze(),
                 request_id,
             })
         } else {
-            Order::Noop
+            None
         }
     }
 
@@ -160,10 +96,9 @@ impl ProtocolCore {
         match self.state {
             ClientState::Shutdown => Err(IggyError::ClientShutdown),
             ClientState::Disconnected | ClientState::Connecting => 
Err(IggyError::NotConnected),
-            ClientState::Connected | ClientState::Authenticating => {
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
                 Ok(self.queue_send(code, payload))
             }
-            ClientState::Authenticated => Ok(self.queue_send(code, payload)),
         }
     }
 
@@ -174,20 +109,6 @@ impl ProtocolCore {
         request_id
     }
 
-    pub fn on_connected(&mut self) {
-        debug!("Transport connected");
-        self.state = ClientState::Connected;
-        self.retry_count = 0;
-    }
-
-    pub fn on_disconnected(&mut self) {
-        debug!("Transport disconnected");
-        self.state = ClientState::Disconnected;
-        self.auth_pending = false;
-        self.auth_request_id = None;
-        self.sent_order.clear();
-    }
-
     pub fn on_response(&mut self, status: u32, _payload: &Bytes) -> 
Option<u64> {
         let request_id = self.sent_order.pop_front()?;
 
@@ -205,41 +126,60 @@ impl ProtocolCore {
         Some(request_id)
     }
 
-    fn on_authenticated(&mut self) {
-        debug!("Authentication successful");
-        self.state = ClientState::Authenticated;
-        self.auth_pending = false;
-    }
-
-    pub fn shutdown(&mut self) {
-        self.state = ClientState::Shutdown;
-    }
-
-    // TODO нужно сопоставить стейты из tcp_client и этим
-    fn is_init(&self) -> bool {
-        return !self.last_connect_attempt.is_none()
-    }
-
-    pub fn poll_new(&mut self) -> Order {
+    pub fn poll(&mut self) -> ControlAction {
         match self.state {
-            ClientState::Disconnected => Order::Error(IggyError::Disconnected),
-            ClientState::Shutdown => Order::Error(IggyError::ClientShutdown),
+            ClientState::Shutdown => 
ControlAction::Error(IggyError::ClientShutdown),
+            ClientState::Disconnected => ControlAction::Noop,
+            ClientState::Authenticated | ClientState::Authenticating | 
ClientState::Connected => {
+                ControlAction::Noop
+            }
             ClientState::Connecting => {
-                match self.server_address {
-                    Some(addr) => Order::Connect(addr),
-                    None => Order::Error(IggyError::Disconnected),
+                let server_address = match self.server_address {
+                    Some(addr) => addr,
+                    None => return 
ControlAction::Error(IggyError::ConnectionMissedSocket),
+                };
+
+                if let Some(last_attempt) = self.last_connect_attempt {
+                    let elapsed = last_attempt.as_micros() as u64;
+                    let interval = self.config.reestablish_after.as_micros();
+
+                    if elapsed < interval {
+                        let remaining = IggyDuration::from(interval - elapsed);
+                        return ControlAction::Wait(remaining);
+                    }
                 }
+
+                if let Some(max_retries) = self.config.max_retries {
+                    if self.retry_count >= max_retries {
+                        return 
ControlAction::Error(IggyError::MaxRetriesExceeded);
+                    }
+                }
+
+                self.retry_count += 1;
+                self.last_connect_attempt = Some(IggyTimestamp::now());
+
+                return ControlAction::Connect(server_address);
             }
-            ClientState::Connected | ClientState::Authenticated | 
ClientState::Authenticating => Order::Noop,
+        }
+    }
 
+    pub fn on_authenticated(&mut self) -> Result<(), IggyError> {
+        debug!("Authentication successful");
+        if self.state != ClientState::Connected {
+            return Err(IggyError::IncorrectConnectionState);
         }
+        self.state = ClientState::Authenticated;
+        self.auth_pending = false;
+        Ok(())
     }
 
     pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(), 
IggyError> {
         match self.state {
             ClientState::Shutdown => return Err(IggyError::ClientShutdown),
             ClientState::Connecting => return Ok(()),
-            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => return Ok(()),
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
+                return Ok(());
+            }
             _ => {
                 self.state = ClientState::Connecting;
                 self.server_address = Some(server_address);
@@ -249,14 +189,54 @@ impl ProtocolCore {
         Ok(())
     }
 
-    pub fn desire_disconnect(&mut self) {
+    pub fn on_connected(&mut self) -> Result<(), IggyError> {
+        debug!("Transport connected");
+        if self.state != ClientState::Connecting {
+            return Err(IggyError::IncorrectConnectionState);
+        }
+        self.state = ClientState::Connected;
+        self.retry_count = 0;
+
+        match &self.config.auto_login {
+            AutoLogin::Disabled => {
+                info!("Automatic sign-in is disabled.");
+                self.state = ClientState::Authenticated;
+            }
+            AutoLogin::Enabled(credentials) => {
+                if !self.auth_pending {
+                    self.state = ClientState::Authenticating;
+                    self.auth_pending = true;
+
+                    match credentials {
+                        Credentials::UsernamePassword(username, password) => {
+                            let auth_payload = encode_auth(&username, 
&password);
+                            let auth_id = self.queue_send(0x0A, auth_payload);
+                            self.auth_request_id = Some(auth_id);
+                        }
+                        _ => {
+                            todo!("add PersonalAccessToken")
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn disconnect(&mut self) {
+        debug!("Transport disconnected");
         self.state = ClientState::Disconnected;
-        self.server_address = None;
+        self.auth_pending = false;
+        self.auth_request_id = None;
+        self.sent_order.clear();
     }
 
-    pub fn desire_shutdown(&mut self) {
+    pub fn shutdown(&mut self) {
         self.state = ClientState::Shutdown;
-        self.server_address = None;
+        self.auth_pending = false;
+        self.auth_request_id = None;
+        self.sent_order.clear();
     }
 }
 

Reply via email to