This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans-new
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans-new by this push:
new 7350efee test
7350efee is described below
commit 7350efeece3352a499a49bd631b77ed890ac705b
Author: haze518 <[email protected]>
AuthorDate: Sat Aug 16 11:22:05 2025 +0600
test
---
core/common/src/error/iggy_error.rs | 4 +
core/integration/src/tcp_client.rs | 22 +--
core/integration/tests/sdk/producer/mod.rs | 10 +-
core/sdk/src/connection/mod.rs | 288 ++++++++++++++++++-----------
core/sdk/src/protocol/mod.rs | 216 ++++++++++------------
5 files changed, 296 insertions(+), 244 deletions(-)
diff --git a/core/common/src/error/iggy_error.rs
b/core/common/src/error/iggy_error.rs
index 8059f6c6..b2209ced 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -469,6 +469,10 @@ pub enum IggyError {
MaxRetriesExceeded = 10053,
#[error("Connection timeout")]
ConnectionTimeout = 10054,
+ #[error("Incorrect connection state")]
+ IncorrectConnectionState = 10055,
+ #[error("Connection missed socket")]
+ ConnectionMissedSocket = 10056,
}
impl IggyError {
diff --git a/core/integration/src/tcp_client.rs
b/core/integration/src/tcp_client.rs
index 45936b34..c2fb8372 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -44,19 +44,13 @@ impl ClientFactory for TcpClientFactory {
..TcpClientConfig::default()
};
- let tcp_client = NewTcpClient::create(Arc::new(config)).unwrap();
- let wrapper = ClientWrapper::New(tcp_client);
-
- // let client =
IggyClient::create(iggy::prelude::ClientWrapper::New(tcp_client), None, None);
-
- // let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e|
{
- // panic!(
- // "Failed to create TcpClient, iggy-server has address {},
error: {:?}",
- // self.server_addr, e
- // )
- // });
-
- Client::connect(&wrapper).await.unwrap_or_else(|e| {
+ let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| {
+ panic!(
+ "Failed to create TcpClient, iggy-server has address {},
error: {:?}",
+ self.server_addr, e
+ )
+ });
+ Client::connect(&client).await.unwrap_or_else(|e| {
if self.tls_enabled {
panic!(
"Failed to connect to iggy-server at {} with TLS enabled,
error: {:?}\n\
@@ -72,7 +66,7 @@ impl ClientFactory for TcpClientFactory {
)
}
});
- wrapper
+ ClientWrapper::Tcp(client)
}
fn transport(&self) -> Transport {
diff --git a/core/integration/tests/sdk/producer/mod.rs
b/core/integration/tests/sdk/producer/mod.rs
index a0805e96..b6f4131a 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -18,13 +18,14 @@
mod background;
+use std::net::SocketAddr;
use std::{sync::Arc, time::Duration};
use bytes::Bytes;
-use iggy::clients::client::IggyClient;
+use iggy::{clients::client::IggyClient, connection::tokio_tcp};
use iggy::prelude::*;
use integration::test_server::{login_root, TestServer};
-use iggy::connection::{TokioTcpTransport, NewTcpClient, TokioRuntime};
+use iggy::connection::{NewTcpClient, SFut, SocketFactory, TokioCompat,
TokioRuntime};
use tokio::time::sleep;
const STREAM_ID: u32 = 1;
@@ -81,7 +82,10 @@ async fn test_async_send() {
// let client =
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
// let client = IggyClient::create(client, None, None);
- let tcp_client =
NewTcpClient::create(Arc::new(tcp_client_config)).unwrap();
+ let factory: SocketFactory<TokioCompat> = Arc::new(move |addr: SocketAddr|
-> SFut<TokioCompat> {
+ Box::pin(tokio_tcp(addr))
+ });
+ let tcp_client = NewTcpClient::create(Arc::new(tcp_client_config),
factory).unwrap();
let client =
IggyClient::create(iggy::prelude::ClientWrapper::New(tcp_client), None, None);
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index bfe8ec0d..7aaad996 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,13 +1,24 @@
use std::{
- collections::{HashMap, VecDeque}, fmt::Debug, io, net::SocketAddr,
ops::Deref, pin::Pin, str::FromStr, sync::{
- atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Mutex
- }, task::{ready, Context, Poll, Waker}, time::Duration
+ collections::{HashMap, VecDeque},
+ fmt::Debug,
+ io,
+ net::SocketAddr,
+ ops::Deref,
+ pin::Pin,
+ str::FromStr,
+ sync::{
+ Arc, Mutex,
+ atomic::{AtomicBool, AtomicU64, Ordering},
+ },
+ task::{Context, Poll, Waker, ready},
+ time::Duration,
};
+use crate::protocol::{ControlAction, ProtocolCore, ProtocolCoreConfig,
Response, TxBuf};
use async_broadcast::{Receiver, Sender, broadcast};
use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
-use futures::{future::poll_fn, task::AtomicWaker, AsyncRead, AsyncWrite,
FutureExt};
+use futures::{AsyncRead, AsyncWrite, FutureExt, future::poll_fn,
task::AtomicWaker};
use iggy_binary_protocol::{BinaryClient, BinaryTransport, Client};
use iggy_common::{
ClientState, Command, DiagnosticEvent, IggyDuration, IggyError,
TcpClientConfig,
@@ -19,46 +30,24 @@ use tokio::{
};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt,
TokioAsyncWriteCompatExt};
use tracing::{debug, error, info, trace, warn};
-use crate::protocol::{Order, ProtocolCore, ProtocolCoreConfig, Response,
TxBuf};
-pub trait StreamBuilder {
- type Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static;
+pub type SFut<S> = Pin<Box<dyn Future<Output = io::Result<S>> + Send>>;
+pub type SocketFactory<S> = Arc<dyn Fn(SocketAddr) -> SFut<S> + Send + Sync>;
- fn connect(self, addr: SocketAddr) -> Pin<Box<dyn Future<Output =
io::Result<Self::Stream>> + Send>>;
-}
+pub async fn tokio_tcp(addr: SocketAddr) -> io::Result<Compat<TcpStream>> {
+ let socket = match addr {
+ SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
+ SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
+ };
-pub struct TokioTcpBuilder {
- inner: tokio::net::TcpSocket,
-}
-
-impl TokioTcpBuilder {
- pub fn new_for(addr: &SocketAddr) -> io::Result<Self> {
- let sock = match addr {
- SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
- SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
- };
- sock.set_nodelay(true)?;
- Ok(Self { inner: sock })
- }
-}
-
-impl StreamBuilder for TokioTcpBuilder {
- type Stream = Compat<TcpStream>;
-
- fn connect(self, addr: SocketAddr) -> Pin<Box<dyn Future<Output =
io::Result<Self::Stream>> + Send>> {
- Box::pin(async move {
- let s = self.inner.connect(addr).await?;
- Ok(s.compat())
- })
- }
+ socket.set_nodelay(true)?;
+ let s = socket.connect(addr).await?;
+ Ok(s.compat())
}
pub type TokioCompat = Compat<TcpStream>;
pub type NewTokioTcpClient = NewTcpClient<TokioCompat>;
-pub trait SocketFactory<S>: Fn(&SocketAddr) -> io::Result<S> + Send + Sync +
'static {}
-impl<F, S> SocketFactory<S> for F where F: Fn(&SocketAddr) -> io::Result<S> +
Send + Sync + 'static {}
-
#[derive(Debug, Clone)]
pub struct ConnectionStats {
pub bytes_sent: u64,
@@ -112,12 +101,16 @@ pub struct ConnectionInner<S: AsyncIO> {
pub struct ConnectionRef<S: AsyncIO>(Arc<ConnectionInner<S>>);
impl<S: AsyncIO> ConnectionRef<S> {
- fn new(state: ProtoConnectionState, factory: Box<dyn SocketFactory<S>>,
config: Arc<TcpClientConfig>) -> Self {
+ fn new(
+ state: ProtoConnectionState,
+ socket_factory: SocketFactory<S>,
+ config: Arc<TcpClientConfig>,
+ ) -> Self {
Self(Arc::new(ConnectionInner {
state: Mutex::new(State {
inner: state,
driver: None,
- socket_factory: factory,
+ socket_factory,
socket: None,
current_send: None,
send_offset: 0,
@@ -126,10 +119,15 @@ impl<S: AsyncIO> ConnectionRef<S> {
ready_responses: HashMap::new(),
recv_waiters: HashMap::new(),
config,
- waiters: Arc::new(Waiters { map: Mutex::new(HashMap::new()),
next_id: AtomicU64::new(0) }),
+ waiters: Arc::new(Waiters {
+ map: Mutex::new(HashMap::new()),
+ next_id: AtomicU64::new(0),
+ }),
pending_commands: VecDeque::new(),
ready_commands: VecDeque::new(),
- })
+ connect_waiters: Vec::new(),
+ pending_connect: None,
+ }),
}))
}
}
@@ -168,11 +166,20 @@ struct Waiters<T> {
impl<T> Waiters<T> {
fn new() -> Self {
- Self { map: Mutex::new(HashMap::new()), next_id: AtomicU64::new(1) }
+ Self {
+ map: Mutex::new(HashMap::new()),
+ next_id: AtomicU64::new(1),
+ }
}
fn alloc(&self) -> RequestId {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
- self.map.lock().unwrap().insert(id, WaitEntry { waker:
AtomicWaker::new(), result: None });
+ self.map.lock().unwrap().insert(
+ id,
+ WaitEntry {
+ waker: AtomicWaker::new(),
+ result: None,
+ },
+ );
id
}
fn complete(&self, id: RequestId, val: T) -> bool {
@@ -198,7 +205,9 @@ impl<T> Future for WaitFuture<T> {
if let Some(val) = {
if let Some(e) = waiters.get_mut(&self.id) {
e.result.take()
- } else { None }
+ } else {
+ None
+ }
} {
waiters.remove(&self.id);
return Poll::Ready(val);
@@ -213,7 +222,9 @@ impl<T> Future for WaitFuture<T> {
if let Some(val) = {
if let Some(e) = waiters.get_mut(&self.id) {
e.result.take()
- } else { None }
+ } else {
+ None
+ }
} {
waiters.remove(&self.id);
return Poll::Ready(val);
@@ -223,14 +234,13 @@ impl<T> Future for WaitFuture<T> {
}
}
-pub struct State<S, B>
+pub struct State<S>
where
S: AsyncIO,
- B: StreamBuilder<Stream = S>
{
inner: ProtoConnectionState,
driver: Option<Waker>,
- socket_factory: B,
+ socket_factory: SocketFactory<S>,
socket: Option<S>,
current_send: Option<TxBuf>,
send_offset: usize,
@@ -244,25 +254,21 @@ where
pending_commands: VecDeque<(u64, ClientCommand)>,
ready_commands: VecDeque<u64>,
connect_waiters: Vec<u64>,
+ pending_connect: Option<Pin<Box<dyn Future<Output = io::Result<S>> +
Send>>>,
}
-impl<S, B> Debug for State<S, B>
+impl<S> Debug for State<S>
where
S: AsyncIO,
- B: StreamBuilder<Stream = S>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "test"
- )
+ write!(f, "test")
}
}
-impl<S, B> State<S, B>
+impl<S> State<S>
where
S: AsyncIO,
- B: StreamBuilder<Stream = S>
{
fn wake(&mut self) {
if let Some(waker) = self.driver.take() {
@@ -274,38 +280,71 @@ where
let id = self.waiters.alloc();
// TODO перетащить в sans io ядро inner.core
self.pending_commands.push_back((id, command));
- WaitFuture { waiters: self.waiters.clone(), id }
+ WaitFuture {
+ waiters: self.waiters.clone(),
+ id,
+ }
}
fn enqueue_message(&mut self, code: u32, payload: Bytes) -> Result<u64,
IggyError> {
self.inner.core.send(code, payload)
}
- fn drive_client_commands(&mut self) -> bool {
+ fn drive_client_commands(&mut self) -> io::Result<bool> {
for (request_id, cmd) in self.pending_commands.drain(..) {
match cmd {
- ClientCommand::Connect(server_addr) => {
+ ClientCommand::Connect(server_address) => {
self.connect_waiters.push(request_id);
- self.inner.core.
- // let socket = self.socket_factory.connect(server_addr);
-
- // self.socket = Some(socket);
- // self.ready_commands.push_back(request_id);
- // self.waiters.complete(request_id, Ok(Bytes::new()));
+ self.inner
+ .core
+ .desire_connect(server_address)
+ .map_err(|e| {
+ io::Error::new(io::ErrorKind::ConnectionAborted,
e.as_string())
+ })?;
}
ClientCommand::Disconnect => {
- // TODO add processing
self.ready_commands.push_back(request_id);
- self.waiters.complete(request_id, Ok(Bytes::new()));
+ self.inner.core.disconnect();
+ // self.waiters.complete(request_id, Ok(Bytes::new()));
}
ClientCommand::Shutdown => {
- // TODO add processing
self.ready_commands.push_back(request_id);
- self.waiters.complete(request_id, Ok(Bytes::new()));
+ self.inner.core.shutdown();
+ // self.waiters.complete(request_id, Ok(Bytes::new()));
}
}
}
- true
+ Ok(true)
+ }
+
+ fn drive_connect(&mut self, cx: &mut Context<'_>) -> io::Result<bool> {
+ if let Some(fut) = self.pending_connect.as_mut() {
+ match fut.as_mut().poll(cx) {
+ Poll::Pending => return Ok(true),
+ Poll::Ready(Ok(stream)) => {
+ self.socket = Some(stream);
+ self.pending_connect = None;
+ self.inner.core.on_connected().map_err(|e| {
+ io::Error::new(io::ErrorKind::ConnectionRefused,
e.as_string())
+ })?;
+ for id in self.connect_waiters.drain(..) {
+ let _ = self.waiters.complete(id, Ok(Bytes::new()));
+ }
+ return Ok(true);
+ }
+ Poll::Ready(Err(e)) => {
+ self.pending_connect = None;
+ self.inner.core.disconnect();
+ for id in self.connect_waiters.drain(..) {
+ let _ = self
+ .waiters
+ .complete(id,
Err(IggyError::CannotEstablishConnection));
+ }
+ return Ok(true);
+ }
+ }
+ }
+ Ok(false)
}
fn drive_timer(&mut self, cx: &mut Context<'_>) -> bool {
@@ -319,28 +358,39 @@ where
}
fn drive_transmit(&mut self, cx: &mut Context<'_>) -> io::Result<bool> {
+ if self.current_send.is_none() {
+ if let Some(tx) = self.inner.core.poll_transmit() {
+ self.current_send = Some(tx);
+ self.send_offset = 0;
+ } else {
+ return Ok(false);
+ }
+ }
+
+ let socket = self
+ .socket
+ .as_mut()
+ .ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "No
socket"))?;
+
+ let buf = self.current_send.as_ref().unwrap();
let mut offset = self.send_offset;
- let socket = self.socket.as_mut().ok_or(io::Error::new(
- io::ErrorKind::NotConnected,
- "No socket"
- ))?;
-
- if let Some(buf) = self.current_send.take() {
- while self.send_offset < buf.data.len() {
- match Pin::new(&mut *socket).poll_write(cx,
&buf.data[offset..])? {
- Poll::Ready(n) => {
- offset += n;
- self.send_offset += n;
- }
- Poll::Pending => return Ok(false),
+
+ while self.send_offset < buf.data.len() {
+ match Pin::new(&mut *socket).poll_write(cx, &buf.data[offset..])? {
+ Poll::Ready(n) => {
+ offset += n;
+ self.send_offset += n;
}
- }
- match Pin::new(socket).poll_flush(cx)? {
Poll::Pending => return Ok(false),
- Poll::Ready(()) => {}
}
- self.current_send = None;
}
+
+ match Pin::new(socket).poll_flush(cx)? {
+ Poll::Pending => return Ok(false),
+ Poll::Ready(()) => {}
+ }
+
+ self.current_send = None;
Ok(true)
}
@@ -354,13 +404,19 @@ where
loop {
let n = {
- let socket = self.socket.as_mut().ok_or(io::Error::new(
- io::ErrorKind::NotConnected,
- "No socket"
- ))?;
+ let socket = self
+ .socket
+ .as_mut()
+ .ok_or(io::Error::new(io::ErrorKind::NotConnected, "No
socket"))?;
let mut pinned = Pin::new(&mut *socket);
match pinned.as_mut().poll_read(cx, &mut recv_scratch)? {
Poll::Pending => return Ok(false),
+ Poll::Ready(0) => {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Connection closed",
+ ));
+ }
Poll::Ready(n) => n,
}
};
@@ -432,25 +488,28 @@ impl<S: AsyncIO> Future for ConnectionDriver<S> {
let order = st.inner.core.poll();
match order {
- Order::Wait(dur) => {
- st.wait_timer =
Some(Box::pin(tokio::time::sleep(dur.get_duration())));
- return Poll::Pending;
- }
- Order::Transmit(tx) => {
- st.current_send = Some(tx);
- st.send_offset = 0;
+ ControlAction::Wait(dur) => {
+ if st.wait_timer.is_none() {
+ st.wait_timer =
Some(Box::pin(tokio::time::sleep(dur.get_duration())));
+ }
}
- Order::Error(e) => {
+ ControlAction::Error(e) => {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other,
format!("{e:?}"))));
}
- Order::Noop | Order::Authenticate { .. } => {}
- Order::Connect => {todo!("добавить вызов метода из state")}
+ ControlAction::Noop | ControlAction::Authenticate { .. } => {}
+ ControlAction::Connect(server_adress) => {
+ if st.pending_connect.is_none() {
+ st.pending_connect =
Some((st.socket_factory)(server_adress));
+ }
+ }
}
- keep_going |= st.drive_client_commands();
- keep_going |= st.drive_transmit(cx)?;
- keep_going |= st.drive_receive(cx)?;
-
+ keep_going |= st.drive_connect(cx)?;
+ keep_going |= st.drive_client_commands()?;
+ if st.socket.is_some() {
+ keep_going |= st.drive_transmit(cx)?;
+ keep_going |= st.drive_receive(cx)?;
+ }
if keep_going {
cx.waker().wake_by_ref();
} else {
@@ -472,21 +531,31 @@ pub struct NewTcpClient<S: AsyncIO> {
}
impl<S: AsyncIO + Send + Sync + 'static> NewTcpClient<S> {
- pub fn create(config: Arc<TcpClientConfig>, factory:Box<dyn
SocketFactory<S>>) -> Result<Self, IggyError> {
+ pub fn create(
+ config: Arc<TcpClientConfig>,
+ factory: SocketFactory<S>,
+ ) -> Result<Self, IggyError> {
// let runtime = Arc::new(TokioRuntime {});
// let transport = TokioTcpTransport::new(config.clone(),
runtime.clone());
// let state = transport.state.clone();
let (tx, rx) = broadcast(1000);
let (client_tx, client_rx) = flume::unbounded::<ClientCommand>();
-
- let proto_config = ProtocolCoreConfig{
+
+ let proto_config = ProtocolCoreConfig {
auto_login: iggy_common::AutoLogin::Disabled,
reestablish_after: IggyDuration::new_from_secs(5),
max_retries: None,
};
- let conn = ConnectionRef::new(ProtoConnectionState { core:
ProtocolCore::new(proto_config), error: None }, factory, config.clone());
+ let conn = ConnectionRef::new(
+ ProtoConnectionState {
+ core: ProtocolCore::new(proto_config),
+ error: None,
+ },
+ factory,
+ config.clone(),
+ );
let driver = ConnectionDriver(conn.clone());
tokio::spawn(async move {
if let Err(e) = driver.await {
@@ -525,7 +594,8 @@ impl<S: AsyncIO + Send + Sync + 'static> NewTcpClient<S> {
state.wake();
Poll::Pending
- }).await
+ })
+ .await
}
}
diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs
index 6979a995..22260495 100644
--- a/core/sdk/src/protocol/mod.rs
+++ b/core/sdk/src/protocol/mod.rs
@@ -1,9 +1,12 @@
use std::{
- collections::VecDeque, net::SocketAddr, sync::Arc, time::{Duration,
Instant}
+ collections::VecDeque,
+ net::SocketAddr,
+ sync::Arc,
+ time::{Duration, Instant},
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
-use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration,
IggyError};
+use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration,
IggyError, IggyTimestamp};
use tracing::{debug, info, warn};
const RESPONSE_HEADER_SIZE: usize = 8;
@@ -16,10 +19,9 @@ pub struct ProtocolCoreConfig {
}
#[derive(Debug)]
-pub enum Order {
+pub enum ControlAction {
Connect(SocketAddr),
Wait(IggyDuration),
- Transmit(TxBuf),
Authenticate { username: String, password: String },
Noop,
Error(IggyError),
@@ -41,7 +43,7 @@ pub struct Response {
pub struct ProtocolCore {
state: ClientState,
config: ProtocolCoreConfig,
- last_connect_attempt: Option<Instant>,
+ last_connect_attempt: Option<IggyTimestamp>,
pub retry_count: u32,
next_request_id: u64,
pending_sends: VecDeque<(u32, Bytes, u64)>,
@@ -63,6 +65,7 @@ impl ProtocolCore {
sent_order: VecDeque::new(),
auth_pending: false,
auth_request_id: None,
+ server_address: None,
}
}
@@ -70,74 +73,7 @@ impl ProtocolCore {
self.state
}
- pub fn poll(&mut self) -> Order {
- match self.state {
- ClientState::Disconnected => self.handle_disconnected(),
- ClientState::Connecting => Order::Connect,
- ClientState::Connected => self.handle_connected(),
- ClientState::Authenticating => Order::Noop,
- ClientState::Authenticated => self.poll_transmit(),
- ClientState::Shutdown => Order::Noop,
- }
- }
-
- fn handle_disconnected(&mut self) -> Order {
- if let Some(last_attempt) = self.last_connect_attempt {
- let elapsed = last_attempt.elapsed().as_micros() as u64;
- let interval = self.config.reestablish_after.as_micros();
-
- if elapsed < interval {
- let remaining = IggyDuration::from(interval - elapsed);
- return Order::Wait(remaining);
- }
- }
-
- if let Some(max_retries) = self.config.max_retries {
- if self.retry_count >= max_retries {
- return Order::Error(IggyError::MaxRetriesExceeded);
- }
- }
-
- self.retry_count += 1;
- self.last_connect_attempt = Some(Instant::now());
- self.state = ClientState::Connecting;
-
- debug!("Initiating connection (attempt {})", self.retry_count);
- Order::Connect
- }
-
- fn handle_connected(&mut self) -> Order {
- match &self.config.auto_login {
- AutoLogin::Disabled => {
- info!("Automatic sign-in is disabled.");
- self.state = ClientState::Authenticated;
- }
- AutoLogin::Enabled(credentials) => {
- if !self.auth_pending {
- self.state = ClientState::Authenticating;
- self.auth_pending = true;
-
- match credentials {
- Credentials::UsernamePassword(username, password) => {
- let auth_payload = encode_auth(&username,
&password);
- let auth_id = self.queue_send(0x0A, auth_payload);
- self.auth_request_id = Some(auth_id);
-
- return self.poll_transmit();
- }
- _ => {
- todo!("add PersonalAccessToken")
- }
- }
-
- }
- }
- }
-
- self.poll_transmit()
- }
-
- fn poll_transmit(&mut self) -> Order {
+ pub fn poll_transmit(&mut self) -> Option<TxBuf> {
if let Some((code, payload, request_id)) =
self.pending_sends.pop_front() {
let mut buf = BytesMut::new();
let total_len = (payload.len() + 4) as u32;
@@ -147,12 +83,12 @@ impl ProtocolCore {
self.sent_order.push_back(request_id);
- Order::Transmit(TxBuf {
+ Some(TxBuf {
data: buf.freeze(),
request_id,
})
} else {
- Order::Noop
+ None
}
}
@@ -160,10 +96,9 @@ impl ProtocolCore {
match self.state {
ClientState::Shutdown => Err(IggyError::ClientShutdown),
ClientState::Disconnected | ClientState::Connecting =>
Err(IggyError::NotConnected),
- ClientState::Connected | ClientState::Authenticating => {
+ ClientState::Connected | ClientState::Authenticating |
ClientState::Authenticated => {
Ok(self.queue_send(code, payload))
}
- ClientState::Authenticated => Ok(self.queue_send(code, payload)),
}
}
@@ -174,20 +109,6 @@ impl ProtocolCore {
request_id
}
- pub fn on_connected(&mut self) {
- debug!("Transport connected");
- self.state = ClientState::Connected;
- self.retry_count = 0;
- }
-
- pub fn on_disconnected(&mut self) {
- debug!("Transport disconnected");
- self.state = ClientState::Disconnected;
- self.auth_pending = false;
- self.auth_request_id = None;
- self.sent_order.clear();
- }
-
pub fn on_response(&mut self, status: u32, _payload: &Bytes) ->
Option<u64> {
let request_id = self.sent_order.pop_front()?;
@@ -205,41 +126,60 @@ impl ProtocolCore {
Some(request_id)
}
- fn on_authenticated(&mut self) {
- debug!("Authentication successful");
- self.state = ClientState::Authenticated;
- self.auth_pending = false;
- }
-
- pub fn shutdown(&mut self) {
- self.state = ClientState::Shutdown;
- }
-
- // TODO нужно сопоставить стейты из tcp_client и этим
- fn is_init(&self) -> bool {
- return !self.last_connect_attempt.is_none()
- }
-
- pub fn poll_new(&mut self) -> Order {
+ pub fn poll(&mut self) -> ControlAction {
match self.state {
- ClientState::Disconnected => Order::Error(IggyError::Disconnected),
- ClientState::Shutdown => Order::Error(IggyError::ClientShutdown),
+ ClientState::Shutdown =>
ControlAction::Error(IggyError::ClientShutdown),
+ ClientState::Disconnected => ControlAction::Noop,
+ ClientState::Authenticated | ClientState::Authenticating |
ClientState::Connected => {
+ ControlAction::Noop
+ }
ClientState::Connecting => {
- match self.server_address {
- Some(addr) => Order::Connect(addr),
- None => Order::Error(IggyError::Disconnected),
+ let server_address = match self.server_address {
+ Some(addr) => addr,
+ None => return
ControlAction::Error(IggyError::ConnectionMissedSocket),
+ };
+
+ if let Some(last_attempt) = self.last_connect_attempt {
+ let elapsed = last_attempt.as_micros() as u64;
+ let interval = self.config.reestablish_after.as_micros();
+
+ if elapsed < interval {
+ let remaining = IggyDuration::from(interval - elapsed);
+ return ControlAction::Wait(remaining);
+ }
}
+
+ if let Some(max_retries) = self.config.max_retries {
+ if self.retry_count >= max_retries {
+ return
ControlAction::Error(IggyError::MaxRetriesExceeded);
+ }
+ }
+
+ self.retry_count += 1;
+ self.last_connect_attempt = Some(IggyTimestamp::now());
+
+ return ControlAction::Connect(server_address);
}
- ClientState::Connected | ClientState::Authenticated |
ClientState::Authenticating => Order::Noop,
+ }
+ }
+ pub fn on_authenticated(&mut self) -> Result<(), IggyError> {
+ debug!("Authentication successful");
+ if self.state != ClientState::Connected {
+ return Err(IggyError::IncorrectConnectionState);
}
+ self.state = ClientState::Authenticated;
+ self.auth_pending = false;
+ Ok(())
}
pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(),
IggyError> {
match self.state {
ClientState::Shutdown => return Err(IggyError::ClientShutdown),
ClientState::Connecting => return Ok(()),
- ClientState::Connected | ClientState::Authenticating |
ClientState::Authenticated => return Ok(()),
+ ClientState::Connected | ClientState::Authenticating |
ClientState::Authenticated => {
+ return Ok(());
+ }
_ => {
self.state = ClientState::Connecting;
self.server_address = Some(server_address);
@@ -249,14 +189,54 @@ impl ProtocolCore {
Ok(())
}
- pub fn desire_disconnect(&mut self) {
+ pub fn on_connected(&mut self) -> Result<(), IggyError> {
+ debug!("Transport connected");
+ if self.state != ClientState::Connecting {
+ return Err(IggyError::IncorrectConnectionState);
+ }
+ self.state = ClientState::Connected;
+ self.retry_count = 0;
+
+ match &self.config.auto_login {
+ AutoLogin::Disabled => {
+ info!("Automatic sign-in is disabled.");
+ self.state = ClientState::Authenticated;
+ }
+ AutoLogin::Enabled(credentials) => {
+ if !self.auth_pending {
+ self.state = ClientState::Authenticating;
+ self.auth_pending = true;
+
+ match credentials {
+ Credentials::UsernamePassword(username, password) => {
+ let auth_payload = encode_auth(&username,
&password);
+ let auth_id = self.queue_send(0x0A, auth_payload);
+ self.auth_request_id = Some(auth_id);
+ }
+ _ => {
+ todo!("add PersonalAccessToken")
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn disconnect(&mut self) {
+ debug!("Transport disconnected");
self.state = ClientState::Disconnected;
- self.server_address = None;
+ self.auth_pending = false;
+ self.auth_request_id = None;
+ self.sent_order.clear();
}
- pub fn desire_shutdown(&mut self) {
+ pub fn shutdown(&mut self) {
self.state = ClientState::Shutdown;
- self.server_address = None;
+ self.auth_pending = false;
+ self.auth_request_id = None;
+ self.sent_order.clear();
}
}