This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4edd60fef [#1213] feat(rust): Support block filter by taskId when 
getting memory data (#1311)
4edd60fef is described below

commit 4edd60fef8e18f627d36d809a28a481ab825b692
Author: wenlongbrother <[email protected]>
AuthorDate: Wed Nov 22 15:12:29 2023 +0800

    [#1213] feat(rust): Support block filter by taskId when getting memory data 
(#1311)
    
    ### What changes were proposed in this pull request?
    
    Support block filter by taskId when getting memory data
    
    ### Why are the changes needed?
    
    In AQE, after executing the sub-QueryStages, collect the shuffle data size, 
So if we can filter block,
    it will improve the performance of AQE.
    
    Fix: #1213
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    1. UTs
    
    ---------
    
    Co-authored-by: 蒋文龙 <[email protected]>
    Co-authored-by: 蒋文龙 <[email protected]>
---
 rust/experimental/server/src/app.rs             |  2 +
 rust/experimental/server/src/grpc.rs            | 18 +++++
 rust/experimental/server/src/store/hybrid.rs    |  4 ++
 rust/experimental/server/src/store/localfile.rs |  1 +
 rust/experimental/server/src/store/memory.rs    | 87 ++++++++++++++++++++++++-
 5 files changed, 110 insertions(+), 2 deletions(-)

diff --git a/rust/experimental/server/src/app.rs 
b/rust/experimental/server/src/app.rs
index 43d9a32aa..d75f8f969 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -308,6 +308,7 @@ pub struct WritingViewContext {
 pub struct ReadingViewContext {
     pub uid: PartitionedUId,
     pub reading_options: ReadingOptions,
+    pub serialized_expected_task_ids_bitmap: Option<Treemap>,
 }
 
 pub struct ReadingIndexViewContext {
@@ -630,6 +631,7 @@ mod test {
             let reading_ctx = ReadingViewContext {
                 uid: Default::default(),
                 reading_options: 
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+                serialized_expected_task_ids_bitmap: Default::default(),
             };
 
             // case2: get
diff --git a/rust/experimental/server/src/grpc.rs 
b/rust/experimental/server/src/grpc.rs
index 138e0f656..0c9730522 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -34,6 +34,8 @@ use crate::proto::uniffle::{
 use crate::store::{PartitionedData, ResponseDataIndex};
 use await_tree::InstrumentAwait;
 use bytes::{BufMut, BytesMut};
+use croaring::treemap::JvmSerializer;
+use croaring::Treemap;
 use std::collections::HashMap;
 
 use log::{debug, error, info, warn};
@@ -284,6 +286,7 @@ impl ShuffleServer for DefaultShuffleServer {
             .select(ReadingViewContext {
                 uid: partition_id.clone(),
                 reading_options: 
ReadingOptions::FILE_OFFSET_AND_LEN(req.offset, req.length as i64),
+                serialized_expected_task_ids_bitmap: Default::default(),
             })
             .instrument_await(format!(
                 "select data from localfile. uid: {:?}",
@@ -341,6 +344,20 @@ impl ShuffleServer for DefaultShuffleServer {
             shuffle_id,
             partition_id,
         };
+
+        let serialized_expected_task_ids_bitmap =
+            if !req.serialized_expected_task_ids_bitmap.is_empty() {
+                match 
Treemap::deserialize(&req.serialized_expected_task_ids_bitmap) {
+                    Ok(filter) => Some(filter),
+                    Err(e) => {
+                        error!("Failed to deserialize: {}", e);
+                        None
+                    }
+                }
+            } else {
+                None
+            };
+
         let data_fetched_result = app
             .unwrap()
             .select(ReadingViewContext {
@@ -349,6 +366,7 @@ impl ShuffleServer for DefaultShuffleServer {
                     req.last_block_id,
                     req.read_buffer_size as i64,
                 ),
+                serialized_expected_task_ids_bitmap,
             })
             .instrument_await(format!("select data from memory. uid: {:?}", 
&partition_id))
             .await;
diff --git a/rust/experimental/server/src/store/hybrid.rs 
b/rust/experimental/server/src/store/hybrid.rs
index 93dcc9016..d81314212 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -676,6 +676,7 @@ mod tests {
         let response_data = runtime.wait(store.get(ReadingViewContext {
             uid: uid.clone(),
             reading_options: MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1024 * 1024 
* 1024),
+            serialized_expected_task_ids_bitmap: Default::default(),
         }))?;
 
         let mut accepted_block_ids = vec![];
@@ -739,6 +740,7 @@ mod tests {
                 last_block_id,
                 data_len as i64,
             ),
+            serialized_expected_task_ids_bitmap: Default::default(),
         };
 
         let read_data = store.get(reading_view_ctx).await;
@@ -773,6 +775,7 @@ mod tests {
                     let reading_view_ctx = ReadingViewContext {
                         uid: uid.clone(),
                         reading_options: 
ReadingOptions::FILE_OFFSET_AND_LEN(offset, length as i64),
+                        serialized_expected_task_ids_bitmap: 
Default::default(),
                     };
                     let read_data = store.get(reading_view_ctx).await.unwrap();
                     match read_data {
@@ -829,6 +832,7 @@ mod tests {
                     last_block_id,
                     data_len as i64,
                 ),
+                serialized_expected_task_ids_bitmap: Default::default(),
             };
 
             let read_data = runtime.wait(store.get(reading_view_ctx));
diff --git a/rust/experimental/server/src/store/localfile.rs 
b/rust/experimental/server/src/store/localfile.rs
index e85745851..34bd9d97b 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -868,6 +868,7 @@ mod test {
             let reading_ctx = ReadingViewContext {
                 uid,
                 reading_options: ReadingOptions::FILE_OFFSET_AND_LEN(0, size 
as i64),
+                serialized_expected_task_ids_bitmap: Default::default(),
             };
 
             let read_result = local_store.get(reading_ctx).await;
diff --git a/rust/experimental/server/src/store/memory.rs 
b/rust/experimental/server/src/store/memory.rs
index 0d0852d0b..8894dbf2c 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -41,6 +41,7 @@ use std::str::FromStr;
 
 use crate::store::mem::InstrumentAwait;
 use crate::store::mem::MemoryBufferTicket;
+use croaring::Treemap;
 use log::error;
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::Arc;
@@ -209,15 +210,21 @@ impl MemoryStore {
         buffer.clone()
     }
 
-    pub(crate) fn read_partial_data_with_max_size_limit<'a>(
+    pub(crate) fn read_partial_data_with_max_size_limit_and_filter<'a>(
         &'a self,
         blocks: Vec<&'a PartitionedDataBlock>,
         fetched_size_limit: i64,
+        serialized_expected_task_ids_bitmap: Option<Treemap>,
     ) -> (Vec<&PartitionedDataBlock>, i64) {
         let mut fetched = vec![];
         let mut fetched_size = 0;
 
         for block in blocks {
+            if let Some(ref filter) = serialized_expected_task_ids_bitmap {
+                if !filter.contains(block.task_attempt_id as u64) {
+                    continue;
+                }
+            }
             if fetched_size >= fetched_size_limit {
                 break;
             }
@@ -416,7 +423,11 @@ impl Store for MemoryStore {
                     }
                 }
 
-                self.read_partial_data_with_max_size_limit(candidate_blocks, 
max_size)
+                self.read_partial_data_with_max_size_limit_and_filter(
+                    candidate_blocks,
+                    max_size,
+                    ctx.serialized_expected_task_ids_bitmap,
+                )
             }
             _ => (vec![], 0),
         };
@@ -728,6 +739,7 @@ mod test {
     use crate::config::MemoryStoreConfig;
     use crate::runtime::manager::RuntimeManager;
     use anyhow::Result;
+    use croaring::Treemap;
 
     #[test]
     fn test_ticket_timeout() -> Result<()> {
@@ -975,6 +987,7 @@ mod test {
                 last_block_id,
                 default_single_read_size,
             ),
+            serialized_expected_task_ids_bitmap: Default::default(),
         };
         if let Ok(data) = store.get(ctx).await {
             match data {
@@ -1066,6 +1079,7 @@ mod test {
         let reading_ctx = ReadingViewContext {
             uid: uid.clone(),
             reading_options: 
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+            serialized_expected_task_ids_bitmap: Default::default(),
         };
         let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
         assert_eq!(1, data.from_memory().shuffle_data_block_segments.len());
@@ -1114,6 +1128,7 @@ mod test {
         let reading_ctx = ReadingViewContext {
             uid: Default::default(),
             reading_options: 
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+            serialized_expected_task_ids_bitmap: Default::default(),
         };
 
         match runtime.wait(store.get(reading_ctx)).unwrap() {
@@ -1125,4 +1140,72 @@ mod test {
             _ => panic!("should not"),
         }
     }
+
+    #[test]
+    fn test_block_id_filter_for_memory() {
+        let store = MemoryStore::new(1024 * 1024 * 1024);
+        let runtime = store.runtime_manager.clone();
+
+        // 1. insert 2 block
+        let writing_ctx = WritingViewContext {
+            uid: Default::default(),
+            data_blocks: vec![
+                PartitionedDataBlock {
+                    block_id: 0,
+                    length: 10,
+                    uncompress_length: 100,
+                    crc: 99,
+                    data: Default::default(),
+                    task_attempt_id: 0,
+                },
+                PartitionedDataBlock {
+                    block_id: 1,
+                    length: 20,
+                    uncompress_length: 200,
+                    crc: 99,
+                    data: Default::default(),
+                    task_attempt_id: 1,
+                },
+            ],
+        };
+        runtime.wait(store.insert(writing_ctx)).unwrap();
+
+        // 2. block_ids_filter is empty, should return 2 blocks
+        let mut reading_ctx = ReadingViewContext {
+            uid: Default::default(),
+            reading_options: 
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
+            serialized_expected_task_ids_bitmap: Default::default(),
+        };
+
+        match runtime.wait(store.get(reading_ctx)).unwrap() {
+            Mem(data) => {
+                assert_eq!(data.shuffle_data_block_segments.len(), 2);
+            }
+            _ => panic!("should not"),
+        }
+
+        // 3. set serialized_expected_task_ids_bitmap, and set last_block_id 
equals 1, should return 1 block
+        let mut bitmap = Treemap::default();
+        bitmap.add(1);
+        reading_ctx = ReadingViewContext {
+            uid: Default::default(),
+            reading_options: 
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(0, 1000000),
+            serialized_expected_task_ids_bitmap: Option::from(bitmap.clone()),
+        };
+
+        match runtime.wait(store.get(reading_ctx)).unwrap() {
+            Mem(data) => {
+                assert_eq!(data.shuffle_data_block_segments.len(), 1);
+                
assert_eq!(data.shuffle_data_block_segments.get(0).unwrap().offset, 0);
+                assert_eq!(
+                    data.shuffle_data_block_segments
+                        .get(0)
+                        .unwrap()
+                        .uncompress_length,
+                    200
+                );
+            }
+            _ => panic!("should not"),
+        }
+    }
 }

Reply via email to