This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans-new
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans-new by this push:
new 46f3dd5c test
46f3dd5c is described below
commit 46f3dd5c11ff076bb69300a9ea80e9d1f237aeea
Author: haze518 <[email protected]>
AuthorDate: Sat Aug 16 07:04:39 2025 +0600
test
---
core/sdk/src/connection/mod.rs | 32 ++++++++++++++++++++-------
core/sdk/src/protocol/mod.rs | 49 ++++++++++++++++++++++++++++++++++++++++--
2 files changed, 71 insertions(+), 10 deletions(-)
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index cdbccb84..bfe8ec0d 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -223,10 +223,14 @@ impl<T> Future for WaitFuture<T> {
}
}
-pub struct State<S: AsyncIO> {
+pub struct State<S, B>
+where
+ S: AsyncIO,
+ B: StreamBuilder<Stream = S>
+{
inner: ProtoConnectionState,
driver: Option<Waker>,
- socket_factory: Box<dyn SocketFactory<S>>,
+ socket_factory: B,
socket: Option<S>,
current_send: Option<TxBuf>,
send_offset: usize,
@@ -239,9 +243,14 @@ pub struct State<S: AsyncIO> {
waiters: Arc<Waiters<Result<Bytes, IggyError>>>,
pending_commands: VecDeque<(u64, ClientCommand)>,
ready_commands: VecDeque<u64>,
+ connect_waiters: Vec<u64>,
}
-impl<S: AsyncIO> Debug for State<S> {
+impl<S, B> Debug for State<S, B>
+where
+ S: AsyncIO,
+ B: StreamBuilder<Stream = S>
+{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
@@ -250,7 +259,11 @@ impl<S: AsyncIO> Debug for State<S> {
}
}
-impl<S: AsyncIO> State<S> {
+impl<S, B> State<S, B>
+where
+ S: AsyncIO,
+ B: StreamBuilder<Stream = S>
+{
fn wake(&mut self) {
if let Some(waker) = self.driver.take() {
waker.wake();
@@ -272,10 +285,13 @@ impl<S: AsyncIO> State<S> {
for (request_id, cmd) in self.pending_commands.drain(..) {
match cmd {
ClientCommand::Connect(server_addr) => {
- let socket = (self.socket_factory)(&server_addr).unwrap();
- self.socket = Some(socket);
- self.ready_commands.push_back(request_id);
- self.waiters.complete(request_id, Ok(Bytes::new()));
+ self.connect_waiters.push(request_id);
+ self.inner.core.
+ // let socket = self.socket_factory.connect(server_addr);
+
+ // self.socket = Some(socket);
+ // self.ready_commands.push_back(request_id);
+ // self.waiters.complete(request_id, Ok(Bytes::new()));
}
ClientCommand::Disconnect => {
// TODO add processing
diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs
index 9d7da993..6979a995 100644
--- a/core/sdk/src/protocol/mod.rs
+++ b/core/sdk/src/protocol/mod.rs
@@ -1,5 +1,5 @@
use std::{
- collections::VecDeque, sync::Arc, time::{Duration, Instant}
+ collections::VecDeque, net::SocketAddr, sync::Arc, time::{Duration,
Instant}
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -17,7 +17,7 @@ pub struct ProtocolCoreConfig {
#[derive(Debug)]
pub enum Order {
- Connect,
+ Connect(SocketAddr),
Wait(IggyDuration),
Transmit(TxBuf),
Authenticate { username: String, password: String },
@@ -48,6 +48,7 @@ pub struct ProtocolCore {
sent_order: VecDeque<u64>,
auth_pending: bool,
auth_request_id: Option<u64>,
+ server_address: Option<SocketAddr>,
}
impl ProtocolCore {
@@ -213,6 +214,50 @@ impl ProtocolCore {
pub fn shutdown(&mut self) {
self.state = ClientState::Shutdown;
}
+
+ // TODO нужно сопоставить стейты из tcp_client и этим
+ fn is_init(&self) -> bool {
+ return !self.last_connect_attempt.is_none()
+ }
+
+ pub fn poll_new(&mut self) -> Order {
+ match self.state {
+ ClientState::Disconnected => Order::Error(IggyError::Disconnected),
+ ClientState::Shutdown => Order::Error(IggyError::ClientShutdown),
+ ClientState::Connecting => {
+ match self.server_address {
+ Some(addr) => Order::Connect(addr),
+ None => Order::Error(IggyError::Disconnected),
+ }
+ }
+ ClientState::Connected | ClientState::Authenticated |
ClientState::Authenticating => Order::Noop,
+
+ }
+ }
+
+ pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(),
IggyError> {
+ match self.state {
+ ClientState::Shutdown => return Err(IggyError::ClientShutdown),
+ ClientState::Connecting => return Ok(()),
+ ClientState::Connected | ClientState::Authenticating |
ClientState::Authenticated => return Ok(()),
+ _ => {
+ self.state = ClientState::Connecting;
+ self.server_address = Some(server_address);
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn desire_disconnect(&mut self) {
+ self.state = ClientState::Disconnected;
+ self.server_address = None;
+ }
+
+ pub fn desire_shutdown(&mut self) {
+ self.state = ClientState::Shutdown;
+ self.server_address = None;
+ }
}
fn encode_auth(username: &str, password: &str) -> Bytes {