This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch writePlus in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit c5ec940701ef2abcc357ac0b4d8612db04b299c9 Author: Junfan Zhang <[email protected]> AuthorDate: Tue Feb 20 17:07:32 2024 +0800 channel try --- rust/experimental/server/Cargo.toml | 2 +- rust/experimental/server/src/channel.rs | 35 ++++++++++++++++++++++++++++ rust/experimental/server/src/error.rs | 4 +++- rust/experimental/server/src/grpc.rs | 16 ++++++++----- rust/experimental/server/src/lib.rs | 9 ++++--- rust/experimental/server/src/main.rs | 5 +++- rust/experimental/server/src/store/memory.rs | 13 ++++++----- 7 files changed, 66 insertions(+), 18 deletions(-) diff --git a/rust/experimental/server/Cargo.toml b/rust/experimental/server/Cargo.toml index 860ad4320..0b556540a 100644 --- a/rust/experimental/server/Cargo.toml +++ b/rust/experimental/server/Cargo.toml @@ -67,7 +67,6 @@ dashmap = "5.4.0" log = "0.4.17" env_logger = "0.10.0" crossbeam = "0.8.2" -crossbeam-channel = "0.5" tempdir = "0.3.7" async-trait = "0.1.68" futures = "0.3" @@ -98,6 +97,7 @@ socket2 = { version="0.4", features = ["all"]} cap = "0.1.2" spin = "0.9.8" opendal = { version = "0.44.0", features = ["services-fs"]} +crossbeam-channel = "0.5.8" [dependencies.hdfs-native] version = "0.7.0" diff --git a/rust/experimental/server/src/channel.rs b/rust/experimental/server/src/channel.rs new file mode 100644 index 000000000..13d9994d8 --- /dev/null +++ b/rust/experimental/server/src/channel.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; +use crossbeam_channel::{Receiver, Sender, unbounded}; +use crate::app::{AppManager, WritingViewContext}; +use crate::error::WorkerError; +use crate::metric::TOTAL_RECEIVED_DATA; +use crate::runtime::manager::RuntimeManager; + +#[derive(Clone)] +pub struct Channel { + app_manager: Arc<AppManager>, + runtime_manager: RuntimeManager, + + sender: Sender<WritingViewContext>, + receiver: Receiver<WritingViewContext>, +} + +impl Channel { + pub fn new(app_manager: Arc<AppManager>, runtime_manager: RuntimeManager) -> Self { + let (s, r) = unbounded(); + Self { + app_manager, + runtime_manager, + sender: s, + receiver: r, + } + } + + pub fn send(&self, ctx: WritingViewContext) -> anyhow::Result<i32, WorkerError> { + let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum(); + TOTAL_RECEIVED_DATA.inc_by(len as u64); + + self.sender.send(ctx).map_err(|_| WorkerError::WRITING_TO_CHANNEL_FAIL)?; + Ok(len) + } +} \ No newline at end of file diff --git a/rust/experimental/server/src/error.rs b/rust/experimental/server/src/error.rs index 7a7040882..cfbe60861 100644 --- a/rust/experimental/server/src/error.rs +++ b/rust/experimental/server/src/error.rs @@ -16,7 +16,6 @@ // under the License. use anyhow::Error; - use log::error; use poem::error::ParseQueryError; use thiserror::Error; @@ -63,6 +62,9 @@ pub enum WorkerError { #[error("Spill event has been retried exceed the max limit for app: {0}")] SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(String), + + #[error("Failed to writing data to channel.")] + WRITING_TO_CHANNEL_FAIL, } impl From<AcquireError> for WorkerError { diff --git a/rust/experimental/server/src/grpc.rs b/rust/experimental/server/src/grpc.rs index 1ff4bd400..658b94faa 100644 --- a/rust/experimental/server/src/grpc.rs +++ b/rust/experimental/server/src/grpc.rs @@ -48,6 +48,7 @@ use crate::metric::{ }; use crate::util; use tonic::{Request, Response, Status}; +use crate::channel::Channel; /// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed /// streams on the same connection. @@ -77,11 +78,12 @@ impl Into<i32> for StatusCode { pub struct DefaultShuffleServer { app_manager_ref: AppManagerRef, + writing_channel: Channel, } impl DefaultShuffleServer { - pub fn from(app_manager_ref: AppManagerRef) -> DefaultShuffleServer { - DefaultShuffleServer { app_manager_ref } + pub fn from(app_manager_ref: AppManagerRef, writing_channel: Channel) -> DefaultShuffleServer { + DefaultShuffleServer { app_manager_ref, writing_channel } } } @@ -217,10 +219,12 @@ impl ShuffleServer for DefaultShuffleServer { }; let ctx = WritingViewContext::from(uid.clone(), blocks); - let inserted = app - .insert(ctx) - .instrument_await(format!("insert data for app. uid: {:?}", &uid)) - .await; + let inserted = self.writing_channel.send(ctx); + + // let inserted = app + // .insert(ctx) + // .instrument_await(format!("insert data for app. uid: {:?}", &uid)) + // .await; if inserted.is_err() { let err = format!( "Errors on putting data. app_id: {}, err: {:?}", diff --git a/rust/experimental/server/src/lib.rs b/rust/experimental/server/src/lib.rs index 1219285b1..54d3f184d 100644 --- a/rust/experimental/server/src/lib.rs +++ b/rust/experimental/server/src/lib.rs @@ -31,6 +31,7 @@ pub mod runtime; pub mod signal; pub mod store; pub mod util; +pub mod channel; use crate::app::AppManager; use crate::grpc::DefaultShuffleServer; @@ -69,14 +70,16 @@ pub async fn start_uniffle_worker(config: config::Config) -> Result<()> { HTTP_SERVICE.start(runtime_manager.clone(), http_port); let (tx, rx) = oneshot::channel::<()>(); + let cloned_runtime_manager = runtime_manager.clone(); + let app_manager_ref = AppManager::get_ref(cloned_runtime_manager, config.clone()); + + let writing_channel = channel::Channel::new(app_manager_ref.clone(), runtime_manager.clone()); // implement server startup - let cloned_runtime_manager = runtime_manager.clone(); runtime_manager.grpc_runtime.spawn(async move { - let app_manager_ref = AppManager::get_ref(cloned_runtime_manager, config.clone()); let rpc_port = config.grpc_port.unwrap_or(19999); info!("Starting GRpc server with port:[{}] ......", rpc_port); - let shuffle_server = DefaultShuffleServer::from(app_manager_ref); + let shuffle_server = DefaultShuffleServer::from(app_manager_ref, writing_channel.clone()); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port as u16); let service = ShuffleServerServer::new(shuffle_server) .max_decoding_message_size(usize::MAX) diff --git a/rust/experimental/server/src/main.rs b/rust/experimental/server/src/main.rs index f5e93c589..86c7ba97f 100644 --- a/rust/experimental/server/src/main.rs +++ b/rust/experimental/server/src/main.rs @@ -62,6 +62,7 @@ pub mod runtime; pub mod signal; pub mod store; mod util; +pub mod channel; const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_SIZE"; const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4"; @@ -233,8 +234,10 @@ fn main() -> Result<()> { let available_cores = std::thread::available_parallelism()?; debug!("GRpc service with parallelism: [{}]", &available_cores); + let writing_channel = channel::Channel::new(app_manager_ref.clone(), runtime_manager.clone()); + for _ in 0..available_cores.into() { - let shuffle_server = DefaultShuffleServer::from(app_manager_ref.clone()); + let shuffle_server = DefaultShuffleServer::from(app_manager_ref.clone(), writing_channel.clone()); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port as u16); let service = ShuffleServerServer::new(shuffle_server) .max_decoding_message_size(usize::MAX) diff --git a/rust/experimental/server/src/store/memory.rs b/rust/experimental/server/src/store/memory.rs index e93e4ae7f..fad27e792 100644 --- a/rust/experimental/server/src/store/memory.rs +++ b/rust/experimental/server/src/store/memory.rs @@ -256,12 +256,13 @@ impl Store for MemoryStore { async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> { let uid = ctx.uid; - let buffer = self.get_or_create_underlying_staging_buffer(uid.clone()); - let mut buffer_guarded = buffer.lock(); - - let blocks = ctx.data_blocks; - let inserted_size = buffer_guarded.add(blocks)?; - drop(buffer_guarded); + let inserted_size = { + let buffer = self.get_or_create_underlying_staging_buffer(uid); + let mut buffer_guarded = buffer.lock(); + let blocks = ctx.data_blocks; + let inserted_size = buffer_guarded.add(blocks)?; + inserted_size + }; self.budget.allocated_to_used(inserted_size)?;
