This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans-new
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans-new by this push:
     new 46f3dd5c test
46f3dd5c is described below

commit 46f3dd5c11ff076bb69300a9ea80e9d1f237aeea
Author: haze518 <[email protected]>
AuthorDate: Sat Aug 16 07:04:39 2025 +0600

    test
---
 core/sdk/src/connection/mod.rs | 32 ++++++++++++++++++++-------
 core/sdk/src/protocol/mod.rs   | 49 ++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 71 insertions(+), 10 deletions(-)

diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index cdbccb84..bfe8ec0d 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -223,10 +223,14 @@ impl<T> Future for WaitFuture<T> {
     }
 }
 
-pub struct State<S: AsyncIO> {
+pub struct State<S, B>
+where
+    S: AsyncIO,
+    B: StreamBuilder<Stream = S>
+{
     inner: ProtoConnectionState,
     driver: Option<Waker>,
-    socket_factory: Box<dyn SocketFactory<S>>,
+    socket_factory: B,
     socket: Option<S>,
     current_send: Option<TxBuf>,
     send_offset: usize,
@@ -239,9 +243,14 @@ pub struct State<S: AsyncIO> {
     waiters: Arc<Waiters<Result<Bytes, IggyError>>>,
     pending_commands: VecDeque<(u64, ClientCommand)>,
     ready_commands: VecDeque<u64>,
+    connect_waiters: Vec<u64>,
 }
 
-impl<S: AsyncIO> Debug for State<S> {
+impl<S, B> Debug for State<S, B>
+where
+    S: AsyncIO,
+    B: StreamBuilder<Stream = S>
+{
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
@@ -250,7 +259,11 @@ impl<S: AsyncIO> Debug for State<S> {
     }
 }
 
-impl<S: AsyncIO> State<S> {
+impl<S, B> State<S, B>
+where
+    S: AsyncIO,
+    B: StreamBuilder<Stream = S>
+{
     fn wake(&mut self) {
         if let Some(waker) = self.driver.take() {
             waker.wake();
@@ -272,10 +285,13 @@ impl<S: AsyncIO> State<S> {
         for (request_id, cmd) in self.pending_commands.drain(..) {
             match cmd {
                 ClientCommand::Connect(server_addr) => {
-                    let socket = (self.socket_factory)(&server_addr).unwrap();
-                    self.socket = Some(socket);
-                    self.ready_commands.push_back(request_id);
-                    self.waiters.complete(request_id, Ok(Bytes::new()));
+                    self.connect_waiters.push(request_id);
+                    self.inner.core.
+                    // let socket = self.socket_factory.connect(server_addr);
+
+                    // self.socket = Some(socket);
+                    // self.ready_commands.push_back(request_id);
+                    // self.waiters.complete(request_id, Ok(Bytes::new()));
                 }
                 ClientCommand::Disconnect => {
                     // TODO add processing
diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs
index 9d7da993..6979a995 100644
--- a/core/sdk/src/protocol/mod.rs
+++ b/core/sdk/src/protocol/mod.rs
@@ -1,5 +1,5 @@
 use std::{
-    collections::VecDeque, sync::Arc, time::{Duration, Instant}
+    collections::VecDeque, net::SocketAddr, sync::Arc, time::{Duration, 
Instant}
 };
 
 use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -17,7 +17,7 @@ pub struct ProtocolCoreConfig {
 
 #[derive(Debug)]
 pub enum Order {
-    Connect,
+    Connect(SocketAddr),
     Wait(IggyDuration),
     Transmit(TxBuf),
     Authenticate { username: String, password: String },
@@ -48,6 +48,7 @@ pub struct ProtocolCore {
     sent_order: VecDeque<u64>,
     auth_pending: bool,
     auth_request_id: Option<u64>,
+    server_address: Option<SocketAddr>,
 }
 
 impl ProtocolCore {
@@ -213,6 +214,50 @@ impl ProtocolCore {
     pub fn shutdown(&mut self) {
         self.state = ClientState::Shutdown;
     }
+
+    // TODO нужно сопоставить стейты из tcp_client и этим
+    fn is_init(&self) -> bool {
+        return !self.last_connect_attempt.is_none()
+    }
+
+    pub fn poll_new(&mut self) -> Order {
+        match self.state {
+            ClientState::Disconnected => Order::Error(IggyError::Disconnected),
+            ClientState::Shutdown => Order::Error(IggyError::ClientShutdown),
+            ClientState::Connecting => {
+                match self.server_address {
+                    Some(addr) => Order::Connect(addr),
+                    None => Order::Error(IggyError::Disconnected),
+                }
+            }
+            ClientState::Connected | ClientState::Authenticated | 
ClientState::Authenticating => Order::Noop,
+
+        }
+    }
+
+    pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(), 
IggyError> {
+        match self.state {
+            ClientState::Shutdown => return Err(IggyError::ClientShutdown),
+            ClientState::Connecting => return Ok(()),
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => return Ok(()),
+            _ => {
+                self.state = ClientState::Connecting;
+                self.server_address = Some(server_address);
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn desire_disconnect(&mut self) {
+        self.state = ClientState::Disconnected;
+        self.server_address = None;
+    }
+
+    pub fn desire_shutdown(&mut self) {
+        self.state = ClientState::Shutdown;
+        self.server_address = None;
+    }
 }
 
 fn encode_auth(username: &str, password: &str) -> Bytes {

Reply via email to