This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4edd60fef [#1213] feat(rust): Support block filter by taskId when
getting memory data (#1311)
4edd60fef is described below
commit 4edd60fef8e18f627d36d809a28a481ab825b692
Author: wenlongbrother <[email protected]>
AuthorDate: Wed Nov 22 15:12:29 2023 +0800
[#1213] feat(rust): Support block filter by taskId when getting memory data
(#1311)
### What changes were proposed in this pull request?
Support block filter by taskId when getting memory data
### Why are the changes needed?
In AQE, after executing the sub-QueryStages, collect the shuffle data size,
So if we can filter block,
it will improve the performance of AQE.
Fix: #1213
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
1. UTs
---------
Co-authored-by: 蒋文龙 <[email protected]>
Co-authored-by: 蒋文龙 <[email protected]>
---
rust/experimental/server/src/app.rs | 2 +
rust/experimental/server/src/grpc.rs | 18 +++++
rust/experimental/server/src/store/hybrid.rs | 4 ++
rust/experimental/server/src/store/localfile.rs | 1 +
rust/experimental/server/src/store/memory.rs | 87 ++++++++++++++++++++++++-
5 files changed, 110 insertions(+), 2 deletions(-)
diff --git a/rust/experimental/server/src/app.rs
b/rust/experimental/server/src/app.rs
index 43d9a32aa..d75f8f969 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -308,6 +308,7 @@ pub struct WritingViewContext {
pub struct ReadingViewContext {
pub uid: PartitionedUId,
pub reading_options: ReadingOptions,
+ pub serialized_expected_task_ids_bitmap: Option<Treemap>,
}
pub struct ReadingIndexViewContext {
@@ -630,6 +631,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options:
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+ serialized_expected_task_ids_bitmap: Default::default(),
};
// case2: get
diff --git a/rust/experimental/server/src/grpc.rs
b/rust/experimental/server/src/grpc.rs
index 138e0f656..0c9730522 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -34,6 +34,8 @@ use crate::proto::uniffle::{
use crate::store::{PartitionedData, ResponseDataIndex};
use await_tree::InstrumentAwait;
use bytes::{BufMut, BytesMut};
+use croaring::treemap::JvmSerializer;
+use croaring::Treemap;
use std::collections::HashMap;
use log::{debug, error, info, warn};
@@ -284,6 +286,7 @@ impl ShuffleServer for DefaultShuffleServer {
.select(ReadingViewContext {
uid: partition_id.clone(),
reading_options:
ReadingOptions::FILE_OFFSET_AND_LEN(req.offset, req.length as i64),
+ serialized_expected_task_ids_bitmap: Default::default(),
})
.instrument_await(format!(
"select data from localfile. uid: {:?}",
@@ -341,6 +344,20 @@ impl ShuffleServer for DefaultShuffleServer {
shuffle_id,
partition_id,
};
+
+ let serialized_expected_task_ids_bitmap =
+ if !req.serialized_expected_task_ids_bitmap.is_empty() {
+ match
Treemap::deserialize(&req.serialized_expected_task_ids_bitmap) {
+ Ok(filter) => Some(filter),
+ Err(e) => {
+ error!("Failed to deserialize: {}", e);
+ None
+ }
+ }
+ } else {
+ None
+ };
+
let data_fetched_result = app
.unwrap()
.select(ReadingViewContext {
@@ -349,6 +366,7 @@ impl ShuffleServer for DefaultShuffleServer {
req.last_block_id,
req.read_buffer_size as i64,
),
+ serialized_expected_task_ids_bitmap,
})
.instrument_await(format!("select data from memory. uid: {:?}",
&partition_id))
.await;
diff --git a/rust/experimental/server/src/store/hybrid.rs
b/rust/experimental/server/src/store/hybrid.rs
index 93dcc9016..d81314212 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -676,6 +676,7 @@ mod tests {
let response_data = runtime.wait(store.get(ReadingViewContext {
uid: uid.clone(),
reading_options: MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1024 * 1024
* 1024),
+ serialized_expected_task_ids_bitmap: Default::default(),
}))?;
let mut accepted_block_ids = vec![];
@@ -739,6 +740,7 @@ mod tests {
last_block_id,
data_len as i64,
),
+ serialized_expected_task_ids_bitmap: Default::default(),
};
let read_data = store.get(reading_view_ctx).await;
@@ -773,6 +775,7 @@ mod tests {
let reading_view_ctx = ReadingViewContext {
uid: uid.clone(),
reading_options:
ReadingOptions::FILE_OFFSET_AND_LEN(offset, length as i64),
+ serialized_expected_task_ids_bitmap:
Default::default(),
};
let read_data = store.get(reading_view_ctx).await.unwrap();
match read_data {
@@ -829,6 +832,7 @@ mod tests {
last_block_id,
data_len as i64,
),
+ serialized_expected_task_ids_bitmap: Default::default(),
};
let read_data = runtime.wait(store.get(reading_view_ctx));
diff --git a/rust/experimental/server/src/store/localfile.rs
b/rust/experimental/server/src/store/localfile.rs
index e85745851..34bd9d97b 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -868,6 +868,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid,
reading_options: ReadingOptions::FILE_OFFSET_AND_LEN(0, size
as i64),
+ serialized_expected_task_ids_bitmap: Default::default(),
};
let read_result = local_store.get(reading_ctx).await;
diff --git a/rust/experimental/server/src/store/memory.rs
b/rust/experimental/server/src/store/memory.rs
index 0d0852d0b..8894dbf2c 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -41,6 +41,7 @@ use std::str::FromStr;
use crate::store::mem::InstrumentAwait;
use crate::store::mem::MemoryBufferTicket;
+use croaring::Treemap;
use log::error;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@@ -209,15 +210,21 @@ impl MemoryStore {
buffer.clone()
}
- pub(crate) fn read_partial_data_with_max_size_limit<'a>(
+ pub(crate) fn read_partial_data_with_max_size_limit_and_filter<'a>(
&'a self,
blocks: Vec<&'a PartitionedDataBlock>,
fetched_size_limit: i64,
+ serialized_expected_task_ids_bitmap: Option<Treemap>,
) -> (Vec<&PartitionedDataBlock>, i64) {
let mut fetched = vec![];
let mut fetched_size = 0;
for block in blocks {
+ if let Some(ref filter) = serialized_expected_task_ids_bitmap {
+ if !filter.contains(block.task_attempt_id as u64) {
+ continue;
+ }
+ }
if fetched_size >= fetched_size_limit {
break;
}
@@ -416,7 +423,11 @@ impl Store for MemoryStore {
}
}
- self.read_partial_data_with_max_size_limit(candidate_blocks,
max_size)
+ self.read_partial_data_with_max_size_limit_and_filter(
+ candidate_blocks,
+ max_size,
+ ctx.serialized_expected_task_ids_bitmap,
+ )
}
_ => (vec![], 0),
};
@@ -728,6 +739,7 @@ mod test {
use crate::config::MemoryStoreConfig;
use crate::runtime::manager::RuntimeManager;
use anyhow::Result;
+ use croaring::Treemap;
#[test]
fn test_ticket_timeout() -> Result<()> {
@@ -975,6 +987,7 @@ mod test {
last_block_id,
default_single_read_size,
),
+ serialized_expected_task_ids_bitmap: Default::default(),
};
if let Ok(data) = store.get(ctx).await {
match data {
@@ -1066,6 +1079,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid: uid.clone(),
reading_options:
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+ serialized_expected_task_ids_bitmap: Default::default(),
};
let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
assert_eq!(1, data.from_memory().shuffle_data_block_segments.len());
@@ -1114,6 +1128,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options:
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+ serialized_expected_task_ids_bitmap: Default::default(),
};
match runtime.wait(store.get(reading_ctx)).unwrap() {
@@ -1125,4 +1140,72 @@ mod test {
_ => panic!("should not"),
}
}
+
+ #[test]
+ fn test_block_id_filter_for_memory() {
+ let store = MemoryStore::new(1024 * 1024 * 1024);
+ let runtime = store.runtime_manager.clone();
+
+ // 1. insert 2 block
+ let writing_ctx = WritingViewContext {
+ uid: Default::default(),
+ data_blocks: vec![
+ PartitionedDataBlock {
+ block_id: 0,
+ length: 10,
+ uncompress_length: 100,
+ crc: 99,
+ data: Default::default(),
+ task_attempt_id: 0,
+ },
+ PartitionedDataBlock {
+ block_id: 1,
+ length: 20,
+ uncompress_length: 200,
+ crc: 99,
+ data: Default::default(),
+ task_attempt_id: 1,
+ },
+ ],
+ };
+ runtime.wait(store.insert(writing_ctx)).unwrap();
+
+ // 2. block_ids_filter is empty, should return 2 blocks
+ let mut reading_ctx = ReadingViewContext {
+ uid: Default::default(),
+ reading_options:
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+ serialized_expected_task_ids_bitmap: Default::default(),
+ };
+
+ match runtime.wait(store.get(reading_ctx)).unwrap() {
+ Mem(data) => {
+ assert_eq!(data.shuffle_data_block_segments.len(), 2);
+ }
+ _ => panic!("should not"),
+ }
+
+ // 3. set serialized_expected_task_ids_bitmap, and set last_block_id
equals 1, should return 1 block
+ let mut bitmap = Treemap::default();
+ bitmap.add(1);
+ reading_ctx = ReadingViewContext {
+ uid: Default::default(),
+ reading_options:
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(0, 1000000),
+ serialized_expected_task_ids_bitmap: Option::from(bitmap.clone()),
+ };
+
+ match runtime.wait(store.get(reading_ctx)).unwrap() {
+ Mem(data) => {
+ assert_eq!(data.shuffle_data_block_segments.len(), 1);
+
assert_eq!(data.shuffle_data_block_segments.get(0).unwrap().offset, 0);
+ assert_eq!(
+ data.shuffle_data_block_segments
+ .get(0)
+ .unwrap()
+ .uncompress_length,
+ 200
+ );
+ }
+ _ => panic!("should not"),
+ }
+ }
}