This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 882cc2082 [#1407] feat(rust): support multiple spill policies and
simplify hdfs config (#1487)
882cc2082 is described below
commit 882cc208256f5b9b7d360e84ec0bb0f0c289473f
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jan 26 16:37:32 2024 +0800
[#1407] feat(rust): support multiple spill policies and simplify hdfs
config (#1487)
### What changes were proposed in this pull request?
1. fix(rust): fast fail of writing to localfile when disk is unhealthy
2. fix(rust): incorrect failed buffer required cnt metrics
3. feat(rust): support fallback to hdfs when local store is invalid
4. btw: support huge partition flushing to hdfs directly
5. feat(rust): avoid setting hdfs config in server client
### Why are the changes needed?
For #1407, hope bringing up this into production env.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
---
rust/experimental/server/Cargo.lock | 60 ++++--
rust/experimental/server/Cargo.toml | 2 +-
rust/experimental/server/README.md | 6 +-
rust/experimental/server/src/app.rs | 246 +++++++++++++++++------
rust/experimental/server/src/config.rs | 1 -
rust/experimental/server/src/error.rs | 6 +
rust/experimental/server/src/grpc.rs | 20 +-
rust/experimental/server/src/main.rs | 4 +-
rust/experimental/server/src/store/hdfs.rs | 116 ++++++++---
rust/experimental/server/src/store/hybrid.rs | 73 +++++--
rust/experimental/server/src/store/local/disk.rs | 6 +-
rust/experimental/server/src/store/localfile.rs | 112 ++++++++++-
rust/experimental/server/src/store/memory.rs | 32 +--
rust/experimental/server/src/store/mod.rs | 5 +-
14 files changed, 535 insertions(+), 154 deletions(-)
diff --git a/rust/experimental/server/Cargo.lock
b/rust/experimental/server/Cargo.lock
index 0d3e7ac2f..87048fc34 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -467,8 +467,8 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
- "prost",
- "prost-types",
+ "prost 0.11.9",
+ "prost-types 0.11.9",
"tonic",
"tracing-core",
]
@@ -485,7 +485,7 @@ dependencies = [
"futures",
"hdrhistogram",
"humantime",
- "prost-types",
+ "prost-types 0.11.9",
"serde",
"serde_json",
"thread_local",
@@ -1227,9 +1227,9 @@ checksum =
"2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
[[package]]
name = "hdfs-native"
-version = "0.5.0"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "824c00b5e1b0ba3aeb5678debcb78bf3a26d3d6473e7fd0f53d1310c988c1f02"
+checksum = "7068d52978481fb74f1ac1522c0823d180a6e112c459169cf72f3cd08b207b0d"
dependencies = [
"base64 0.21.4",
"bytes 1.5.0",
@@ -1240,8 +1240,8 @@ dependencies = [
"libgssapi",
"log",
"num-traits",
- "prost",
- "prost-types",
+ "prost 0.12.3",
+ "prost-types 0.12.3",
"roxmltree",
"socket2 0.5.4",
"thiserror",
@@ -2337,7 +2337,17 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes 1.5.0",
- "prost-derive",
+ "prost-derive 0.11.9",
+]
+
+[[package]]
+name = "prost"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
+dependencies = [
+ "bytes 1.5.0",
+ "prost-derive 0.12.3",
]
[[package]]
@@ -2354,8 +2364,8 @@ dependencies = [
"multimap",
"petgraph",
"prettyplease",
- "prost",
- "prost-types",
+ "prost 0.11.9",
+ "prost-types 0.11.9",
"regex",
"syn 1.0.109",
"tempfile",
@@ -2375,13 +2385,35 @@ dependencies = [
"syn 1.0.109",
]
+[[package]]
+name = "prost-derive"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
+dependencies = [
+ "anyhow",
+ "itertools",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.37",
+]
+
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
- "prost",
+ "prost 0.11.9",
+]
+
+[[package]]
+name = "prost-types"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e"
+dependencies = [
+ "prost 0.12.3",
]
[[package]]
@@ -3419,7 +3451,7 @@ dependencies = [
"hdrhistogram",
"humantime",
"once_cell",
- "prost-types",
+ "prost-types 0.11.9",
"ratatui",
"regex",
"serde",
@@ -3587,7 +3619,7 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project 1.1.3",
- "prost",
+ "prost 0.11.9",
"tokio",
"tokio-stream",
"tower",
@@ -3809,7 +3841,7 @@ dependencies = [
"poem",
"pprof",
"prometheus",
- "prost",
+ "prost 0.11.9",
"prost-build",
"serde",
"signal-hook",
diff --git a/rust/experimental/server/Cargo.toml
b/rust/experimental/server/Cargo.toml
index ad9f5fc92..860ad4320 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -100,7 +100,7 @@ spin = "0.9.8"
opendal = { version = "0.44.0", features = ["services-fs"]}
[dependencies.hdfs-native]
-version = "0.5.0"
+version = "0.7.0"
optional = true
features = ["kerberos"]
diff --git a/rust/experimental/server/README.md
b/rust/experimental/server/README.md
index 9cec4df48..73e88da0f 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -15,7 +15,7 @@
~ limitations under the License.
-->
-Another implementation of Apache Uniffle shuffle server
+Another implementation of Apache Uniffle shuffle server (Single binary, no
extra dependencies)
## Benchmark report
@@ -90,7 +90,7 @@ In the future, rust-based server will use io_uring mechanism
to improve writing
## Build
-`cargo build --release`
+`cargo build --release --features hdfs,jemalloc`
Uniffle-x currently treats all compiler warnings as error, with some dead-code
warning excluded. When you are developing
and really want to ignore the warnings for now, you can use `ccargo --config
'build.rustflags=["-W", "warnings"]' build`
@@ -115,7 +115,6 @@ data_paths = ["/data1/uniffle", "/data2/uniffle"]
healthy_check_min_disks = 0
[hdfs_store]
-data_path = "hdfs://rbf-x/user/bi"
max_concurrency = 10
[hybrid_store]
@@ -135,6 +134,7 @@ push_gateway_endpoint = "http://xxxxxxxxxxxxxx/pushgateway"
### HDFS Setup
Benefit from the hdfs-native crate, there is no need to setup the JAVA_HOME
and relative dependencies.
+If HDFS store is valid, the spark client must specify the conf of
`spark.rss.client.remote.storage.useLocalConfAsDefault=true`
```shell
cargo build --features hdfs --release
diff --git a/rust/experimental/server/src/app.rs
b/rust/experimental/server/src/app.rs
index 9298fb6e3..35699f780 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -40,19 +40,22 @@ use dashmap::DashMap;
use log::{debug, error, info};
use std::collections::hash_map::DefaultHasher;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::str::FromStr;
+use crate::proto::uniffle::RemoteStorage;
use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::Arc;
use std::sync::RwLock;
+use std::sync::{Arc, OnceLock};
use std::time::Duration;
+pub static SHUFFLE_SERVER_ID: OnceLock<String> = OnceLock::new();
+
#[derive(Debug, Clone)]
-enum DataDistribution {
+pub enum DataDistribution {
NORMAL,
#[allow(non_camel_case_types)]
LOCAL_ORDER,
@@ -61,9 +64,24 @@ enum DataDistribution {
pub const MAX_CONCURRENCY_PER_PARTITION_TO_WRITE: i32 = 20;
#[derive(Debug, Clone)]
-struct AppConfigOptions {
- data_distribution: DataDistribution,
- max_concurrency_per_partition_to_write: i32,
+pub struct AppConfigOptions {
+ pub data_distribution: DataDistribution,
+ pub max_concurrency_per_partition_to_write: i32,
+ pub remote_storage_config_option: Option<RemoteStorageConfig>,
+}
+
+impl AppConfigOptions {
+ pub fn new(
+ data_distribution: DataDistribution,
+ max_concurrency_per_partition_to_write: i32,
+ remote_storage_config_option: Option<RemoteStorageConfig>,
+ ) -> Self {
+ Self {
+ data_distribution,
+ max_concurrency_per_partition_to_write,
+ remote_storage_config_option,
+ }
+ }
}
impl Default for AppConfigOptions {
@@ -71,6 +89,30 @@ impl Default for AppConfigOptions {
AppConfigOptions {
data_distribution: DataDistribution::LOCAL_ORDER,
max_concurrency_per_partition_to_write: 20,
+ remote_storage_config_option: None,
+ }
+ }
+}
+
+// =============================================================
+
+#[derive(Clone, Debug)]
+pub struct RemoteStorageConfig {
+ pub root: String,
+ pub configs: HashMap<String, String>,
+}
+
+impl From<RemoteStorage> for RemoteStorageConfig {
+ fn from(remote_conf: RemoteStorage) -> Self {
+ let root = remote_conf.path;
+ let mut confs = HashMap::new();
+ for kv in remote_conf.remote_storage_conf {
+ confs.insert(kv.key, kv.value);
+ }
+
+ Self {
+ root,
+ configs: confs,
}
}
}
@@ -81,7 +123,7 @@ pub struct App {
app_id: String,
// key: shuffleId, value: partitionIds
partitions: DashMap<i32, HashSet<i32>>,
- app_config_options: Option<AppConfigOptions>,
+ app_config_options: AppConfigOptions,
latest_heartbeat_time: AtomicU64,
store: Arc<HybridStore>,
// key: (shuffle_id, partition_id)
@@ -139,11 +181,36 @@ impl PartitionedMeta {
impl App {
fn from(
app_id: String,
- config_options: Option<AppConfigOptions>,
+ config_options: AppConfigOptions,
store: Arc<HybridStore>,
huge_partition_marked_threshold: Option<u64>,
huge_partition_memory_max_available_size: Option<u64>,
+ runtime_manager: RuntimeManager,
) -> Self {
+ // todo: should throw exception if register failed.
+ let copy_app_id = app_id.to_string();
+ let app_options = config_options.clone();
+ let cloned_store = store.clone();
+ let register_result = futures::executor::block_on(async move {
+ runtime_manager
+ .default_runtime
+ .spawn(async move {
+ cloned_store
+ .register_app(RegisterAppContext {
+ app_id: copy_app_id,
+ app_config_options: app_options,
+ })
+ .await
+ })
+ .await
+ });
+ if register_result.is_err() {
+ error!(
+ "Errors on registering app to store: {:#?}",
+ register_result.err()
+ );
+ }
+
App {
app_id,
partitions: DashMap::new(),
@@ -179,8 +246,12 @@ impl App {
.incr_data_size(len)?;
TOTAL_RECEIVED_DATA.inc_by(len as u64);
- self.store.insert(ctx).await?;
+ let ctx = match self.is_huge_partition(&ctx.uid).await {
+ Ok(true) => WritingViewContext::new(ctx.uid.clone(),
ctx.data_blocks, true),
+ _ => ctx,
+ };
+ self.store.insert(ctx).await?;
Ok(len)
}
@@ -204,23 +275,37 @@ impl App {
self.store.get_index(ctx).await
}
- async fn huge_partition_limit(&self, uid: &PartitionedUId) -> Result<bool>
{
- let huge_partition_threshold = &self.huge_partition_marked_threshold;
- let huge_partition_memory_used =
&self.huge_partition_memory_max_available_size;
- if huge_partition_threshold.is_none() ||
huge_partition_memory_used.is_none() {
+ async fn is_huge_partition(&self, uid: &PartitionedUId) -> Result<bool> {
+ let huge_partition_threshold_option =
&self.huge_partition_marked_threshold;
+ let huge_partition_memory_used_option =
&self.huge_partition_memory_max_available_size;
+ if huge_partition_threshold_option.is_none() ||
huge_partition_memory_used_option.is_none()
+ {
return Ok(false);
}
- let huge_partition_size = &huge_partition_threshold.unwrap();
- let huge_partition_memory = &huge_partition_memory_used.unwrap();
+
+ let huge_partition_threshold =
&huge_partition_threshold_option.unwrap();
let meta = self.get_underlying_partition_bitmap(uid.clone());
let data_size = meta.get_data_size()?;
- if data_size > *huge_partition_size
- && self
- .store
- .get_hot_store_memory_partitioned_buffer_size(uid)
- .await?
- > *huge_partition_memory
+ if data_size > *huge_partition_threshold {
+ return Ok(true);
+ }
+
+ return Ok(false);
+ }
+
+ async fn is_backpressure_for_huge_partition(&self, uid: &PartitionedUId)
-> Result<bool> {
+ if !self.is_huge_partition(uid).await? {
+ return Ok(false);
+ }
+ let huge_partition_memory_used =
&self.huge_partition_memory_max_available_size;
+ let huge_partition_memory = &huge_partition_memory_used.unwrap();
+
+ if self
+ .store
+ .get_hot_store_memory_partitioned_buffer_size(uid)
+ .await?
+ > *huge_partition_memory
{
info!(
"[{:?}] with huge partition, it has been writing speed
limited",
@@ -241,12 +326,15 @@ impl App {
&self,
ctx: RequireBufferContext,
) -> Result<RequireBufferResponse, WorkerError> {
- if self.huge_partition_limit(&ctx.uid).await? {
+ if self.is_backpressure_for_huge_partition(&ctx.uid).await? {
TOTAL_REQUIRE_BUFFER_FAILED.inc();
return Err(WorkerError::MEMORY_USAGE_LIMITED_BY_HUGE_PARTITION);
}
- self.store.require_buffer(ctx).await
+ self.store.require_buffer(ctx).await.map_err(|err| {
+ TOTAL_REQUIRE_BUFFER_FAILED.inc();
+ err
+ })
}
pub async fn release_buffer(&self, ticket_id: i64) -> Result<i64,
WorkerError> {
@@ -322,6 +410,29 @@ pub struct GetBlocksContext {
pub struct WritingViewContext {
pub uid: PartitionedUId,
pub data_blocks: Vec<PartitionedDataBlock>,
+ pub owned_by_huge_partition: bool,
+}
+
+impl WritingViewContext {
+ pub fn from(uid: PartitionedUId, data_blocks: Vec<PartitionedDataBlock>)
-> Self {
+ WritingViewContext {
+ uid,
+ data_blocks,
+ owned_by_huge_partition: false,
+ }
+ }
+
+ pub fn new(
+ uid: PartitionedUId,
+ data_blocks: Vec<PartitionedDataBlock>,
+ owned_by_huge_partition: bool,
+ ) -> Self {
+ WritingViewContext {
+ uid,
+ data_blocks,
+ owned_by_huge_partition,
+ }
+ }
}
#[derive(Debug, Clone)]
@@ -341,6 +452,12 @@ pub struct RequireBufferContext {
pub size: i64,
}
+#[derive(Debug, Clone)]
+pub struct RegisterAppContext {
+ pub app_id: String,
+ pub app_config_options: AppConfigOptions,
+}
+
#[derive(Debug, Clone)]
pub struct ReleaseBufferContext {
pub(crate) ticket_id: i64,
@@ -389,6 +506,7 @@ pub struct AppManager {
store: Arc<HybridStore>,
app_heartbeat_timeout_min: u32,
config: Config,
+ runtime_manager: RuntimeManager,
}
impl AppManager {
@@ -404,6 +522,7 @@ impl AppManager {
store,
app_heartbeat_timeout_min,
config,
+ runtime_manager: runtime_manager.clone(),
};
manager
}
@@ -506,7 +625,12 @@ impl AppManager {
self.apps.get(app_id).map(|v| v.value().clone())
}
- pub fn register(&self, app_id: String, shuffle_id: i32) -> Result<()> {
+ pub fn register(
+ &self,
+ app_id: String,
+ shuffle_id: i32,
+ app_config_options: AppConfigOptions,
+ ) -> Result<()> {
info!(
"Accepted registry. app_id: {}, shuffle_id: {}",
app_id.clone(),
@@ -539,10 +663,11 @@ impl AppManager {
Arc::new(App::from(
app_id,
- Some(AppConfigOptions::default()),
+ app_config_options,
self.store.clone(),
threshold,
huge_partition_max_available_size,
+ self.runtime_manager.clone(),
))
});
app_ref.register_shuffle(shuffle_id)
@@ -590,6 +715,7 @@ mod test {
};
use crate::config::{Config, HybridStoreConfig, LocalfileStoreConfig,
MemoryStoreConfig};
+ use crate::runtime::manager::RuntimeManager;
use crate::store::{PartitionedDataBlock, ResponseData};
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
@@ -613,21 +739,24 @@ mod test {
config
}
- #[tokio::test]
- async fn app_put_get_purge_test() {
+ #[test]
+ fn app_put_get_purge_test() {
let app_id = "app_put_get_purge_test-----id";
- let app_manager_ref = AppManager::get_ref(Default::default(),
mock_config()).clone();
- app_manager_ref.register(app_id.clone().into(), 1).unwrap();
+ let runtime_manager: RuntimeManager = Default::default();
+ let app_manager_ref = AppManager::get_ref(runtime_manager.clone(),
mock_config()).clone();
+ app_manager_ref
+ .register(app_id.clone().into(), 1, Default::default())
+ .unwrap();
if let Some(app) = app_manager_ref.get_app("app_id".into()) {
- let writing_ctx = WritingViewContext {
- uid: PartitionedUId {
+ let writing_ctx = WritingViewContext::from(
+ PartitionedUId {
app_id: app_id.clone().into(),
shuffle_id: 1,
partition_id: 0,
},
- data_blocks: vec![
+ vec![
PartitionedDataBlock {
block_id: 0,
length: 10,
@@ -645,11 +774,11 @@ mod test {
task_attempt_id: 0,
},
],
- };
+ );
// case1: put
- let result = app.insert(writing_ctx);
- if result.await.is_err() {
+ let f = app.insert(writing_ctx);
+ if runtime_manager.wait(f).is_err() {
panic!()
}
@@ -660,7 +789,8 @@ mod test {
};
// case2: get
- let result = app.select(reading_ctx).await;
+ let f = app.select(reading_ctx);
+ let result = runtime_manager.wait(f);
if result.is_err() {
panic!()
}
@@ -673,42 +803,46 @@ mod test {
}
// case3: purge
- app_manager_ref
- .purge_app_data(app_id.to_string(), None)
- .await
+ runtime_manager
+ .wait(app_manager_ref.purge_app_data(app_id.to_string(), None))
.expect("");
assert_eq!(false, app_manager_ref.get_app(app_id).is_none());
}
}
- #[tokio::test]
- async fn app_manager_test() {
+ #[test]
+ fn app_manager_test() {
let app_manager_ref = AppManager::get_ref(Default::default(),
mock_config()).clone();
- app_manager_ref.register("app_id".into(), 1).unwrap();
+ app_manager_ref
+ .register("app_id".into(), 1, Default::default())
+ .unwrap();
if let Some(app) = app_manager_ref.get_app("app_id".into()) {
assert_eq!("app_id", app.app_id);
}
}
- #[tokio::test]
- async fn test_get_or_put_block_ids() {
+ #[test]
+ fn test_get_or_put_block_ids() {
let app_id = "test_get_or_put_block_ids-----id".to_string();
- let app_manager_ref = AppManager::get_ref(Default::default(),
mock_config()).clone();
- app_manager_ref.register(app_id.clone().into(), 1).unwrap();
+ let runtime_manager: RuntimeManager = Default::default();
+ let app_manager_ref = AppManager::get_ref(runtime_manager.clone(),
mock_config()).clone();
+ app_manager_ref
+ .register(app_id.clone().into(), 1, Default::default())
+ .unwrap();
let app = app_manager_ref.get_app(app_id.as_ref()).unwrap();
- app.report_block_ids(ReportBlocksContext {
- uid: PartitionedUId {
- app_id: app_id.clone(),
- shuffle_id: 1,
- partition_id: 0,
- },
- blocks: vec![123, 124],
- })
- .await
- .expect("TODO: panic message");
+ runtime_manager
+ .wait(app.report_block_ids(ReportBlocksContext {
+ uid: PartitionedUId {
+ app_id: app_id.clone(),
+ shuffle_id: 1,
+ partition_id: 0,
+ },
+ blocks: vec![123, 124],
+ }))
+ .expect("TODO: panic message");
let data = app
.get_block_ids(GetBlocksContext {
diff --git a/rust/experimental/server/src/config.rs
b/rust/experimental/server/src/config.rs
index 85e37d0a7..a273e2db2 100644
--- a/rust/experimental/server/src/config.rs
+++ b/rust/experimental/server/src/config.rs
@@ -45,7 +45,6 @@ impl MemoryStoreConfig {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct HdfsStoreConfig {
- pub data_path: String,
pub max_concurrency: Option<i32>,
}
diff --git a/rust/experimental/server/src/error.rs
b/rust/experimental/server/src/error.rs
index cc07536a5..c39adfcf4 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -34,6 +34,9 @@ pub enum WorkerError {
#[error("Partial data has been lost, corrupted path: {0}")]
PARTIAL_DATA_LOST(String),
+ #[error("Local disk:[{0}] is not healthy")]
+ LOCAL_DISK_UNHEALTHY(String),
+
#[error("Local disk:[{0}] owned by current partition has been corrupted")]
LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED(String),
@@ -51,6 +54,9 @@ pub enum WorkerError {
#[error("Ticket id: {0} not exist")]
TICKET_ID_NOT_EXIST(i64),
+
+ #[error("Hdfs native client not found for app: {0}")]
+ HDFS_NATIVE_CLIENT_NOT_FOUND(String),
}
impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/grpc.rs
b/rust/experimental/server/src/grpc.rs
index ff5245ea6..e04447335 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -16,8 +16,9 @@
// under the License.
use crate::app::{
- AppManagerRef, GetBlocksContext, PartitionedUId, ReadingIndexViewContext,
ReadingOptions,
- ReadingViewContext, ReportBlocksContext, RequireBufferContext,
WritingViewContext,
+ AppConfigOptions, AppManagerRef, DataDistribution, GetBlocksContext,
PartitionedUId,
+ ReadingIndexViewContext, ReadingOptions, ReadingViewContext,
RemoteStorageConfig,
+ ReportBlocksContext, RequireBufferContext, WritingViewContext,
};
use crate::proto::uniffle::shuffle_server_server::ShuffleServer;
use crate::proto::uniffle::{
@@ -91,9 +92,17 @@ impl ShuffleServer for DefaultShuffleServer {
request: Request<ShuffleRegisterRequest>,
) -> Result<Response<ShuffleRegisterResponse>, Status> {
let inner = request.into_inner();
+ // todo: fast fail when hdfs is enabled but empty remote storage info.
+ let remote_storage_info = inner.remote_storage.map(|x|
RemoteStorageConfig::from(x));
+ // todo: add more options: huge_partition_threshold. and so on...
+ let app_config_option = AppConfigOptions::new(
+ DataDistribution::LOCAL_ORDER,
+ inner.max_concurrency_per_partition_to_write,
+ remote_storage_info,
+ );
let status = self
.app_manager_ref
- .register(inner.app_id, inner.shuffle_id)
+ .register(inner.app_id, inner.shuffle_id, app_config_option)
.map_or(StatusCode::INTERNAL_ERROR, |_| StatusCode::SUCCESS)
.into();
Ok(Response::new(ShuffleRegisterResponse {
@@ -184,10 +193,7 @@ impl ShuffleServer for DefaultShuffleServer {
shuffle_id,
partition_id,
};
- let ctx = WritingViewContext {
- uid: uid.clone(),
- data_blocks: blocks,
- };
+ let ctx = WritingViewContext::from(uid.clone(), blocks);
let inserted = app
.insert(ctx)
diff --git a/rust/experimental/server/src/main.rs
b/rust/experimental/server/src/main.rs
index d2fafa4d4..01ec4aefb 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -17,7 +17,7 @@
#![feature(impl_trait_in_assoc_type)]
-use crate::app::{AppManager, AppManagerRef};
+use crate::app::{AppManager, AppManagerRef, SHUFFLE_SERVER_ID};
use crate::await_tree::AWAIT_TREE_REGISTRY;
use crate::config::{Config, LogConfig, RotationConfig};
use crate::grpc::await_tree_middleware::AwaitTreeMiddlewareLayer;
@@ -201,6 +201,8 @@ fn main() -> Result<()> {
let rpc_port = config.grpc_port.unwrap_or(19999);
let worker_uid = gen_worker_uid(rpc_port);
+ // todo: remove some unnecessary worker_id transfer.
+ SHUFFLE_SERVER_ID.get_or_init(|| worker_uid.clone());
let metric_config = config.metrics.clone();
init_metric_service(runtime_manager.clone(), &metric_config,
worker_uid.clone());
diff --git a/rust/experimental/server/src/store/hdfs.rs
b/rust/experimental/server/src/store/hdfs.rs
index 78d903543..3a1b74c80 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -17,14 +17,15 @@
use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
- ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+ RegisterAppContext, ReleaseBufferContext, RequireBufferContext,
WritingViewContext,
};
use crate::config::HdfsStoreConfig;
use crate::error::WorkerError;
+use std::collections::HashMap;
use crate::metric::TOTAL_HDFS_USED;
use crate::store::{Persistent, RequireBufferResponse, ResponseData,
ResponseDataIndex, Store};
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use async_trait::async_trait;
use await_tree::InstrumentAwait;
@@ -40,6 +41,7 @@ use std::sync::Arc;
use tokio::sync::{Mutex, Semaphore};
use tracing::debug;
+use url::Url;
struct PartitionCachedMeta {
is_file_created: bool,
@@ -62,10 +64,11 @@ impl Default for PartitionCachedMeta {
}
pub struct HdfsStore {
- root: String,
- filesystem: Box<HdfsNativeClient>,
concurrency_access_limiter: Semaphore,
+ // key: app_id, value: hdfs_native_client
+ app_remote_clients: DashMap<String, HdfsNativeClient>,
+
partition_file_locks: DashMap<String, Arc<Mutex<()>>>,
partition_cached_meta: DashMap<String, PartitionCachedMeta>,
}
@@ -76,26 +79,21 @@ impl Persistent for HdfsStore {}
impl HdfsStore {
pub fn from(conf: HdfsStoreConfig) -> Self {
- let data_path = conf.data_path;
-
- let filesystem = HdfsNativeClient::new();
-
HdfsStore {
- root: data_path,
- filesystem: Box::new(filesystem),
partition_file_locks: DashMap::new(),
concurrency_access_limiter:
Semaphore::new(conf.max_concurrency.unwrap_or(1) as usize),
partition_cached_meta: Default::default(),
+ app_remote_clients: Default::default(),
}
}
fn get_app_dir(&self, app_id: &str) -> String {
- format!("{}/{}/", &self.root, app_id)
+ format!("{}/", app_id)
}
/// the dir created with app_id/shuffle_id
fn get_shuffle_dir(&self, app_id: &str, shuffle_id: i32) -> String {
- format!("{}/{}/{}/", &self.root, app_id, shuffle_id)
+ format!("{}/{}/", app_id, shuffle_id)
}
fn get_file_path_by_uid(&self, uid: &PartitionedUId) -> (String, String) {
@@ -103,14 +101,15 @@ impl HdfsStore {
let shuffle_id = &uid.shuffle_id;
let p_id = &uid.partition_id;
+ let worker_id = crate::app::SHUFFLE_SERVER_ID.get().unwrap();
(
format!(
- "{}/{}/{}/{}-{}/partition-{}.data",
- &self.root, app_id, shuffle_id, p_id, p_id, p_id
+ "{}/{}/{}-{}/{}.data",
+ app_id, shuffle_id, p_id, p_id, worker_id
),
format!(
- "{}/{}/{}/{}-{}/partition-{}.index",
- &self.root, app_id, shuffle_id, p_id, p_id, p_id
+ "{}/{}/{}-{}/{}.index",
+ app_id, shuffle_id, p_id, p_id, worker_id
),
)
}
@@ -151,17 +150,22 @@ impl Store for HdfsStore {
))
.await;
+ let filesystem = self.app_remote_clients.get(&uid.app_id).ok_or(
+ WorkerError::HDFS_NATIVE_CLIENT_NOT_FOUND(uid.app_id.to_string()),
+ )?;
+
let mut next_offset = match
self.partition_cached_meta.get(&data_file_path) {
None => {
// setup the parent folder
let parent_dir =
Path::new(data_file_path.as_str()).parent().unwrap();
let parent_path_str = format!("{}/",
parent_dir.to_str().unwrap());
debug!("creating dir: {}", parent_path_str.as_str());
- self.filesystem.create_dir(parent_path_str.as_str()).await?;
+
+ filesystem.create_dir(parent_path_str.as_str()).await?;
// setup the file
- self.filesystem.touch(&data_file_path).await?;
- self.filesystem.touch(&index_file_path).await?;
+ filesystem.touch(&data_file_path).await?;
+ filesystem.touch(&index_file_path).await?;
self.partition_cached_meta
.insert(data_file_path.to_string(), Default::default());
@@ -196,11 +200,11 @@ impl Store for HdfsStore {
total_flushed += length;
}
- self.filesystem
+ filesystem
.append(&data_file_path, data_bytes_holder.freeze())
.instrument_await(format!("hdfs writing data. path: {}",
data_file_path))
.await?;
- self.filesystem
+ filesystem
.append(&index_file_path, index_bytes_holder.freeze())
.instrument_await(format!("hdfs writing index. path: {}",
data_file_path))
.await?;
@@ -240,6 +244,9 @@ impl Store for HdfsStore {
async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = ctx.app_id;
+ let filesystem = self.app_remote_clients.get(&app_id).ok_or(
+ WorkerError::HDFS_NATIVE_CLIENT_NOT_FOUND(app_id.to_string()),
+ )?;
let dir = match ctx.shuffle_id {
Some(shuffle_id) => self.get_shuffle_dir(app_id.as_str(),
shuffle_id),
@@ -259,12 +266,37 @@ impl Store for HdfsStore {
}
info!("The hdfs data for {} has been deleted", &dir);
- self.filesystem.delete_dir(dir.as_str()).await
+ filesystem.delete_dir(dir.as_str()).await?;
+ drop(filesystem);
+
+ if ctx.shuffle_id.is_none() {
+ self.app_remote_clients.remove(&app_id);
+ }
+
+ Ok(())
}
async fn is_healthy(&self) -> Result<bool> {
Ok(true)
}
+
+ async fn register_app(&self, ctx: RegisterAppContext) -> Result<()> {
+ let remote_storage_conf_option =
ctx.app_config_options.remote_storage_config_option;
+ if remote_storage_conf_option.is_none() {
+ return Err(anyhow!(
+ "The remote config must be populated by app registry action!"
+ ));
+ }
+
+ let remote_storage_conf = remote_storage_conf_option.unwrap();
+ let client = HdfsNativeClient::new(remote_storage_conf.root,
remote_storage_conf.configs)?;
+
+ let app_id = ctx.app_id.clone();
+ self.app_remote_clients
+ .entry(app_id)
+ .or_insert_with(|| client);
+ Ok(())
+ }
}
#[async_trait]
@@ -279,18 +311,38 @@ trait HdfsDelegator {
struct HdfsNativeClient {
client: Client,
+ root: String,
}
impl HdfsNativeClient {
- fn new() -> Self {
- let client = Client::default();
- Self { client }
+ fn new(root: String, configs: HashMap<String, String>) -> Result<Self> {
+ // todo: do more optimizations!
+ let url = Url::parse(root.as_str())?;
+ let url_header = format!("{}://{}", url.scheme(), url.host().unwrap());
+
+ let root_path = url.path();
+
+ info!(
+ "Created hdfs client, header: {}, path: {}",
+ &url_header, root_path
+ );
+
+ let client = Client::new_with_config(url_header.as_str(), configs)?;
+ Ok(Self {
+ client,
+ root: root_path.to_string(),
+ })
+ }
+
+ fn wrap_root(&self, path: &str) -> String {
+ format!("{}/{}", &self.root, path)
}
}
#[async_trait]
impl HdfsDelegator for HdfsNativeClient {
async fn touch(&self, file_path: &str) -> Result<()> {
+ let file_path = &self.wrap_root(file_path);
self.client
.create(file_path, WriteOptions::default())
.await?
@@ -300,6 +352,7 @@ impl HdfsDelegator for HdfsNativeClient {
}
async fn append(&self, file_path: &str, data: Bytes) -> Result<()> {
+ let file_path = &self.wrap_root(file_path);
let mut file_writer = self.client.append(file_path).await?;
file_writer.write(data).await?;
file_writer.close().await?;
@@ -307,16 +360,19 @@ impl HdfsDelegator for HdfsNativeClient {
}
async fn len(&self, file_path: &str) -> Result<u64> {
+ let file_path = &self.wrap_root(file_path);
let file_info = self.client.get_file_info(file_path).await?;
Ok(file_info.length as u64)
}
async fn create_dir(&self, dir: &str) -> Result<()> {
+ let dir = &self.wrap_root(dir);
let _ = self.client.mkdirs(dir, 777, true).await?;
Ok(())
}
async fn delete_dir(&self, dir: &str) -> Result<()> {
+ let dir = &self.wrap_root(dir);
self.client.delete(dir, true).await?;
Ok(())
}
@@ -325,6 +381,16 @@ impl HdfsDelegator for HdfsNativeClient {
#[cfg(test)]
mod tests {
use std::path::Path;
+ use url::Url;
+
+ #[test]
+ fn url_test() {
+ let url = Url::parse("hdfs://rbf-1:19999/a/b").unwrap();
+ assert_eq!("hdfs", url.scheme());
+ assert_eq!("rbf-1", url.host().unwrap().to_string());
+ assert_eq!(19999, url.port().unwrap());
+ assert_eq!("/a/b", url.path());
+ }
#[test]
fn dir_test() -> anyhow::Result<()> {
diff --git a/rust/experimental/server/src/store/hybrid.rs
b/rust/experimental/server/src/store/hybrid.rs
index f9feaa091..cb29357fe 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -17,7 +17,7 @@
use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions,
ReadingViewContext,
- ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+ RegisterAppContext, ReleaseBufferContext, RequireBufferContext,
WritingViewContext,
};
use crate::await_tree::AWAIT_TREE_REGISTRY;
@@ -85,9 +85,12 @@ pub struct HybridStore {
runtime_manager: RuntimeManager,
}
+#[derive(Clone)]
struct SpillMessage {
ctx: WritingViewContext,
id: i64,
+ retry_cnt: i32,
+ previous_spilled_storage: Option<Arc<Box<dyn PersistentStore>>>,
}
unsafe impl Send for HybridStore {}
@@ -181,9 +184,12 @@ impl HybridStore {
async fn memory_spill_to_persistent_store(
&self,
- mut ctx: WritingViewContext,
- in_flight_blocks_id: i64,
+ spill_message: SpillMessage,
) -> Result<String, WorkerError> {
+ let mut ctx: WritingViewContext = spill_message.ctx;
+ let in_flight_blocks_id: i64 = spill_message.id;
+ let retry_cnt = spill_message.retry_cnt;
+
let uid = ctx.uid.clone();
let blocks = &ctx.data_blocks;
let mut spill_size = 0i64;
@@ -197,9 +203,15 @@ impl HybridStore {
.ok_or(anyhow!("empty warm store. It should not happen"))?;
let cold = self.cold_store.as_ref().unwrap_or(warm);
- let candidate_store = if warm.is_healthy().await? {
+ // we should cover the following cases
+ // 1. local store is unhealthy. spill to hdfs
+ // 2. event flushed to localfile failed. and exceed retry max cnt,
fallback to hdfs
+ // 3. huge partition directly flush to hdfs
+
+ // normal assignment
+ let mut candidate_store = if warm.is_healthy().await? {
let cold_spilled_size =
self.memory_spill_to_cold_threshold_size.unwrap_or(u64::MAX);
- if cold_spilled_size < spill_size as u64 {
+ if cold_spilled_size < spill_size as u64 ||
ctx.owned_by_huge_partition {
cold
} else {
warm
@@ -208,6 +220,11 @@ impl HybridStore {
cold
};
+ // fallback assignment. propose hdfs always is active and stable
+ if retry_cnt >= 1 {
+ candidate_store = cold;
+ }
+
match self.get_store_type(candidate_store) {
StorageType::LOCALFILE => {
TOTAL_MEMORY_SPILL_TO_LOCALFILE.inc();
@@ -290,16 +307,15 @@ impl HybridStore {
blocks: Vec<PartitionedDataBlock>,
uid: PartitionedUId,
) -> Result<()> {
- let writing_ctx = WritingViewContext {
- uid,
- data_blocks: blocks,
- };
+ let writing_ctx = WritingViewContext::from(uid, blocks);
if self
.memory_spill_send
.send(SpillMessage {
ctx: writing_ctx,
id: in_flight_uid,
+ retry_cnt: 0,
+ previous_spilled_storage: None,
})
.await
.is_err()
@@ -339,7 +355,7 @@ impl Store for HybridStore {
let concurrency_limiter =
Arc::new(Semaphore::new(store.memory_spill_max_concurrency as
usize));
self.runtime_manager.write_runtime.spawn(async move {
- while let Ok(message) = store.memory_spill_recv.recv().await {
+ while let Ok(mut message) = store.memory_spill_recv.recv().await {
let await_root = await_tree_registry
.register(format!("hot->warm flush. uid: {:#?}",
&message.ctx.uid))
.await;
@@ -364,7 +380,7 @@ impl Store for HybridStore {
size += block.length as u64;
}
match store_cloned
-
.memory_spill_to_persistent_store(message.ctx.clone(), message.id)
+ .memory_spill_to_persistent_store(message.clone())
.instrument_await("memory_spill_to_persistent_store.")
.await
{
@@ -375,9 +391,11 @@ impl Store for HybridStore {
Err(error) => {
TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
error!(
- "Errors on spill memory data to persistent
storage. error: {:#?}",
- error
- );
+ "Errors on spill memory data to persistent
storage for event_id:{:?}. The error: {:#?}",
+ message.id,
+ error);
+
+ message.retry_cnt = message.retry_cnt + 1;
// re-push to the queue to execute
let _ =
store_cloned.memory_spill_send.send(message).await;
}
@@ -495,6 +513,25 @@ impl Store for HybridStore {
.unwrap_or(false);
Ok(self.hot_store.is_healthy().await? && (warm || cold))
}
+
+ async fn register_app(&self, ctx: RegisterAppContext) -> Result<()> {
+ self.hot_store.register_app(ctx.clone()).await?;
+ if self.warm_store.is_some() {
+ self.warm_store
+ .as_ref()
+ .unwrap()
+ .register_app(ctx.clone())
+ .await?;
+ }
+ if self.cold_store.is_some() {
+ self.cold_store
+ .as_ref()
+ .unwrap()
+ .register_app(ctx.clone())
+ .await?;
+ }
+ Ok(())
+ }
}
pub async fn watermark_flush(store: Arc<HybridStore>) -> Result<()> {
@@ -628,9 +665,9 @@ mod tests {
let mut block_ids = vec![];
for i in 0..batch_size {
block_ids.push(i);
- let writing_ctx = WritingViewContext {
- uid: uid.clone(),
- data_blocks: vec![PartitionedDataBlock {
+ let writing_ctx = WritingViewContext::from(
+ uid.clone(),
+ vec![PartitionedDataBlock {
block_id: i,
length: data_len as i32,
uncompress_length: 100,
@@ -638,7 +675,7 @@ mod tests {
data: Bytes::copy_from_slice(data),
task_attempt_id: 0,
}],
- };
+ );
let _ = store.insert(writing_ctx).await;
}
diff --git a/rust/experimental/server/src/store/local/disk.rs
b/rust/experimental/server/src/store/local/disk.rs
index a0e16af74..b8cd2c14a 100644
--- a/rust/experimental/server/src/store/local/disk.rs
+++ b/rust/experimental/server/src/store/local/disk.rs
@@ -228,15 +228,15 @@ impl LocalDisk {
Ok(())
}
- fn mark_corrupted(&self) {
+ pub fn mark_corrupted(&self) {
self.is_corrupted.store(true, Ordering::SeqCst);
}
- fn mark_unhealthy(&self) {
+ pub fn mark_unhealthy(&self) {
self.is_healthy.store(false, Ordering::SeqCst);
}
- fn mark_healthy(&self) {
+ pub fn mark_healthy(&self) {
self.is_healthy.store(true, Ordering::SeqCst);
}
diff --git a/rust/experimental/server/src/store/localfile.rs
b/rust/experimental/server/src/store/localfile.rs
index cc2a881f6..bb810dc78 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -18,7 +18,7 @@
use crate::app::ReadingOptions::FILE_OFFSET_AND_LEN;
use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
- ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+ RegisterAppContext, ReleaseBufferContext, RequireBufferContext,
WritingViewContext,
};
use crate::config::LocalfileStoreConfig;
use crate::error::WorkerError;
@@ -203,6 +203,12 @@ impl Store for LocalFileStore {
return
Err(WorkerError::PARTIAL_DATA_LOST(local_disk.root.to_string()));
}
+ if !local_disk.is_healthy()? {
+ return Err(WorkerError::LOCAL_DISK_UNHEALTHY(
+ local_disk.root.to_string(),
+ ));
+ }
+
if !parent_dir_is_created {
if let Some(path) = Path::new(&data_file_path).parent() {
local_disk
@@ -400,6 +406,10 @@ impl Store for LocalFileStore {
async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64,
WorkerError> {
todo!()
}
+
+ async fn register_app(&self, _ctx: RegisterAppContext) -> Result<()> {
+ Ok(())
+ }
}
#[cfg(test)]
@@ -410,10 +420,94 @@ mod test {
};
use crate::store::localfile::LocalFileStore;
+ use crate::error::WorkerError;
use crate::store::{PartitionedDataBlock, ResponseData, ResponseDataIndex,
Store};
use bytes::{Buf, Bytes, BytesMut};
use log::info;
+ fn create_writing_ctx() -> WritingViewContext {
+ let uid = PartitionedUId {
+ app_id: "100".to_string(),
+ shuffle_id: 0,
+ partition_id: 0,
+ };
+
+ let data = b"hello world!hello china!";
+ let size = data.len();
+ let writing_ctx = WritingViewContext::from(
+ uid.clone(),
+ vec![
+ PartitionedDataBlock {
+ block_id: 0,
+ length: size as i32,
+ uncompress_length: 200,
+ crc: 0,
+ data: Bytes::copy_from_slice(data),
+ task_attempt_id: 0,
+ },
+ PartitionedDataBlock {
+ block_id: 1,
+ length: size as i32,
+ uncompress_length: 200,
+ crc: 0,
+ data: Bytes::copy_from_slice(data),
+ task_attempt_id: 0,
+ },
+ ],
+ );
+
+ writing_ctx
+ }
+
+ #[test]
+ fn local_disk_under_exception_test() -> anyhow::Result<()> {
+ let temp_dir =
tempdir::TempDir::new("local_disk_under_exception_test").unwrap();
+ let temp_path = temp_dir.path().to_str().unwrap().to_string();
+ println!("init local file path: {}", &temp_path);
+ let local_store = LocalFileStore::new(vec![temp_path.to_string()]);
+
+ let runtime = local_store.runtime_manager.clone();
+
+ let writing_view_ctx = create_writing_ctx();
+ let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+
+ if insert_result.is_err() {
+ println!("{:?}", insert_result.err());
+ panic!()
+ }
+
+ // case1: mark the local disk unhealthy, that will the following flush
throw exception directly.
+ let local_disk = local_store.local_disks[0].clone();
+ local_disk.mark_unhealthy();
+
+ let writing_view_ctx = create_writing_ctx();
+ let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+ match insert_result {
+ Err(WorkerError::LOCAL_DISK_UNHEALTHY(_)) => {}
+ _ => panic!(),
+ }
+
+ // case2: mark the local disk healthy, all things work!
+ local_disk.mark_healthy();
+ let writing_view_ctx = create_writing_ctx();
+ let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+ match insert_result {
+ Err(WorkerError::LOCAL_DISK_UNHEALTHY(_)) => panic!(),
+ _ => {}
+ }
+
+ // case3: mark the local disk corrupted, fail directly.
+ local_disk.mark_corrupted();
+ let writing_view_ctx = create_writing_ctx();
+ let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+ match insert_result {
+ Err(WorkerError::PARTIAL_DATA_LOST(_)) => {}
+ _ => panic!(),
+ }
+
+ Ok(())
+ }
+
#[test]
fn purge_test() -> anyhow::Result<()> {
let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
@@ -432,9 +526,9 @@ mod test {
let data = b"hello world!hello china!";
let size = data.len();
- let writing_ctx = WritingViewContext {
- uid: uid.clone(),
- data_blocks: vec![
+ let writing_ctx = WritingViewContext::from(
+ uid.clone(),
+ vec![
PartitionedDataBlock {
block_id: 0,
length: size as i32,
@@ -452,7 +546,7 @@ mod test {
task_attempt_id: 0,
},
],
- };
+ );
let insert_result = runtime.wait(local_store.insert(writing_ctx));
if insert_result.is_err() {
@@ -506,9 +600,9 @@ mod test {
let data = b"hello world!hello china!";
let size = data.len();
- let writing_ctx = WritingViewContext {
- uid: uid.clone(),
- data_blocks: vec![
+ let writing_ctx = WritingViewContext::from(
+ uid.clone(),
+ vec![
PartitionedDataBlock {
block_id: 0,
length: size as i32,
@@ -526,7 +620,7 @@ mod test {
task_attempt_id: 0,
},
],
- };
+ );
let insert_result = runtime.wait(local_store.insert(writing_ctx));
if insert_result.is_err() {
diff --git a/rust/experimental/server/src/store/memory.rs
b/rust/experimental/server/src/store/memory.rs
index cfb6ff0e0..b1be8e5ae 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -18,7 +18,7 @@
use crate::app::ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE;
use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
- ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+ RegisterAppContext, ReleaseBufferContext, RequireBufferContext,
WritingViewContext,
};
use crate::config::MemoryStoreConfig;
use crate::error::WorkerError;
@@ -455,6 +455,10 @@ impl Store for MemoryStore {
let ticket_id = ctx.ticket_id;
self.ticket_manager.delete(ticket_id)
}
+
+ async fn register_app(&self, _ctx: RegisterAppContext) -> Result<()> {
+ Ok(())
+ }
}
/// thread safe, this will be guarded by the lock
@@ -882,7 +886,7 @@ mod test {
task_attempt_id: 0,
});
}
- WritingViewContext { uid, data_blocks }
+ WritingViewContext::from(uid, data_blocks)
}
#[test]
@@ -927,9 +931,9 @@ mod test {
.wait(store.require_buffer(RequireBufferContext::new(uid.clone(),
40)))
.expect("");
- let writing_ctx = WritingViewContext {
- uid: uid.clone(),
- data_blocks: vec![PartitionedDataBlock {
+ let writing_ctx = WritingViewContext::from(
+ uid.clone(),
+ vec![PartitionedDataBlock {
block_id: 0,
length: 10,
uncompress_length: 100,
@@ -937,7 +941,7 @@ mod test {
data: Default::default(),
task_attempt_id: 0,
}],
- };
+ );
runtime.wait(store.insert(writing_ctx)).expect("");
let reading_ctx = ReadingViewContext {
@@ -988,9 +992,9 @@ mod test {
let store = MemoryStore::new(1024 * 1024 * 1024);
let runtime = store.runtime_manager.clone();
- let writing_ctx = WritingViewContext {
- uid: Default::default(),
- data_blocks: vec![
+ let writing_ctx = WritingViewContext::from(
+ Default::default(),
+ vec![
PartitionedDataBlock {
block_id: 0,
length: 10,
@@ -1008,7 +1012,7 @@ mod test {
task_attempt_id: 1,
},
],
- };
+ );
runtime.wait(store.insert(writing_ctx)).unwrap();
let reading_ctx = ReadingViewContext {
@@ -1033,9 +1037,9 @@ mod test {
let runtime = store.runtime_manager.clone();
// 1. insert 2 block
- let writing_ctx = WritingViewContext {
- uid: Default::default(),
- data_blocks: vec![
+ let writing_ctx = WritingViewContext::from(
+ Default::default(),
+ vec![
PartitionedDataBlock {
block_id: 0,
length: 10,
@@ -1053,7 +1057,7 @@ mod test {
task_attempt_id: 1,
},
],
- };
+ );
runtime.wait(store.insert(writing_ctx)).unwrap();
// 2. block_ids_filter is empty, should return 2 blocks
diff --git a/rust/experimental/server/src/store/mod.rs
b/rust/experimental/server/src/store/mod.rs
index caf00a068..12a509d9b 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -24,8 +24,8 @@ pub mod mem;
pub mod memory;
use crate::app::{
- PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
ReleaseBufferContext,
- RequireBufferContext, WritingViewContext,
+ PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
RegisterAppContext,
+ ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::config::Config;
use crate::error::WorkerError;
@@ -176,6 +176,7 @@ pub trait Store {
ctx: RequireBufferContext,
) -> Result<RequireBufferResponse, WorkerError>;
async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64,
WorkerError>;
+ async fn register_app(&self, ctx: RegisterAppContext) -> Result<()>;
}
pub trait Persistent {}