This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new acb2f3f9d fix(rust): avoid checking storage type in runtime (#1503)
acb2f3f9d is described below
commit acb2f3f9dcfd9c7362ecaf2012a6d9cc8a427322
Author: Junfan Zhang <[email protected]>
AuthorDate: Sun Feb 4 16:24:21 2024 +0800
fix(rust): avoid checking storage type in runtime (#1503)
### What changes were proposed in this pull request?
Avoid checking storage type of spill in runtime to improve performance.
### Why are the changes needed?
improve performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing uts
---
rust/experimental/server/src/store/hdfs.rs | 6 +++++-
rust/experimental/server/src/store/hybrid.rs | 23 ++++++++++-------------
rust/experimental/server/src/store/localfile.rs | 6 +++++-
rust/experimental/server/src/store/memory.rs | 6 +++++-
rust/experimental/server/src/store/mod.rs | 4 +++-
5 files changed, 28 insertions(+), 17 deletions(-)
diff --git a/rust/experimental/server/src/store/hdfs.rs
b/rust/experimental/server/src/store/hdfs.rs
index ce503d0e7..8abf563ab 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -19,7 +19,7 @@ use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
RegisterAppContext, ReleaseBufferContext, RequireBufferContext,
WritingViewContext,
};
-use crate::config::HdfsStoreConfig;
+use crate::config::{HdfsStoreConfig, StorageType};
use crate::error::WorkerError;
use std::collections::HashMap;
@@ -300,6 +300,10 @@ impl Store for HdfsStore {
.or_insert_with(|| client);
Ok(())
}
+
+ async fn name(&self) -> StorageType {
+ StorageType::HDFS
+ }
}
#[async_trait]
diff --git a/rust/experimental/server/src/store/hybrid.rs
b/rust/experimental/server/src/store/hybrid.rs
index ce996841a..74176072c 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -159,17 +159,6 @@ impl HybridStore {
self.cold_store.is_none() && self.warm_store.is_none()
}
- fn get_store_type(&self, store: &dyn Any) -> StorageType {
- if store.is::<LocalFileStore>() {
- return StorageType::LOCALFILE;
- }
- #[cfg(feature = "hdfs")]
- if store.is::<HdfsStore>() {
- return StorageType::HDFS;
- }
- return StorageType::MEMORY;
- }
-
fn is_localfile(&self, store: &dyn Any) -> bool {
store.is::<LocalFileStore>()
}
@@ -226,7 +215,7 @@ impl HybridStore {
candidate_store = cold;
}
- match self.get_store_type(candidate_store) {
+ match candidate_store.name().await {
StorageType::LOCALFILE => {
TOTAL_MEMORY_SPILL_TO_LOCALFILE.inc();
GAUGE_MEMORY_SPILL_TO_LOCALFILE.inc();
@@ -270,7 +259,7 @@ impl HybridStore {
.await?;
self.hot_store.free_used(spill_size).await?;
- match self.get_store_type(candidate_store) {
+ match candidate_store.name().await {
StorageType::LOCALFILE => {
GAUGE_MEMORY_SPILL_TO_LOCALFILE.dec();
}
@@ -538,6 +527,10 @@ impl Store for HybridStore {
}
Ok(())
}
+
+ async fn name(&self) -> StorageType {
+ unimplemented!()
+ }
}
pub async fn watermark_flush(store: Arc<HybridStore>) -> Result<()> {
@@ -610,6 +603,10 @@ mod tests {
assert_eq!(true, is_apple(&Apple {}));
assert_eq!(false, is_apple(&Banana {}));
+
+ let boxed_apple = Box::new(Apple {});
+ assert_eq!(true, is_apple(&*boxed_apple));
+ assert_eq!(false, is_apple(&boxed_apple));
}
#[test]
diff --git a/rust/experimental/server/src/store/localfile.rs
b/rust/experimental/server/src/store/localfile.rs
index 782311fcf..8afdb7bfd 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -20,7 +20,7 @@ use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
RegisterAppContext, ReleaseBufferContext, RequireBufferContext,
WritingViewContext,
};
-use crate::config::LocalfileStoreConfig;
+use crate::config::{LocalfileStoreConfig, StorageType};
use crate::error::WorkerError;
use crate::metric::TOTAL_LOCALFILE_USED;
use crate::store::ResponseDataIndex::Local;
@@ -415,6 +415,10 @@ impl Store for LocalFileStore {
async fn register_app(&self, _ctx: RegisterAppContext) -> Result<()> {
Ok(())
}
+
+ async fn name(&self) -> StorageType {
+ StorageType::LOCALFILE
+ }
}
#[cfg(test)]
diff --git a/rust/experimental/server/src/store/memory.rs
b/rust/experimental/server/src/store/memory.rs
index a77ae0397..e93e4ae7f 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -20,7 +20,7 @@ use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
RegisterAppContext, ReleaseBufferContext, RequireBufferContext,
WritingViewContext,
};
-use crate::config::MemoryStoreConfig;
+use crate::config::{MemoryStoreConfig, StorageType};
use crate::error::WorkerError;
use crate::metric::{
GAUGE_MEMORY_ALLOCATED, GAUGE_MEMORY_CAPACITY, GAUGE_MEMORY_USED,
TOTAL_MEMORY_USED,
@@ -459,6 +459,10 @@ impl Store for MemoryStore {
async fn register_app(&self, _ctx: RegisterAppContext) -> Result<()> {
Ok(())
}
+
+ async fn name(&self) -> StorageType {
+ StorageType::MEMORY
+ }
}
/// thread safe, this will be guarded by the lock
diff --git a/rust/experimental/server/src/store/mod.rs
b/rust/experimental/server/src/store/mod.rs
index 29d15cd18..99bd36e6f 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -27,7 +27,7 @@ use crate::app::{
PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
RegisterAppContext,
ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
-use crate::config::Config;
+use crate::config::{Config, StorageType};
use crate::error::WorkerError;
use crate::proto::uniffle::{ShuffleData, ShuffleDataBlockSegment};
use crate::store::hybrid::HybridStore;
@@ -177,6 +177,8 @@ pub trait Store {
) -> Result<RequireBufferResponse, WorkerError>;
async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64,
WorkerError>;
async fn register_app(&self, ctx: RegisterAppContext) -> Result<()>;
+
+ async fn name(&self) -> StorageType;
}
pub trait Persistent {}