jiacai2050 commented on code in PR #1603:
URL: https://github.com/apache/horaedb/pull/1603#discussion_r1878121632


##########
horaedb/metric_engine/src/compaction/picker.rs:
##########
@@ -28,7 +37,105 @@ impl TimeWindowCompactionStrategy {
         Self { segment_duration }
     }
 
-    pub fn pick_candidate(&self, _ssts: Vec<SstFile>) -> Option<Task> {
-        todo!()
+    pub fn pick_candidate(
+        &self,
+        ssts: Vec<SstFile>,
+        expire_time: Option<Timestamp>,
+    ) -> Option<Task> {
+        let (uncompacted_files, expired_files) =
+            Self::find_uncompacted_and_expired_files(ssts, expire_time);
+        debug!(
+            "uncompacted_files: {:?}, expired_files: {:?}",
+            uncompacted_files, expired_files
+        );
+
+        let buckets = self.get_buckets(&uncompacted_files);
+        let compact_files = self.get_compact_files(buckets);
+
+        let expireds = expired_files
+            .into_iter()
+            .map(|f| f.id())
+            .collect::<Vec<_>>();
+
+        if compact_files.is_empty() && expireds.is_empty() {
+            return None;
+        }
+
+        let task = Task {
+            inputs: compact_files,
+            expireds,
+        };
+        debug!("Pick compaction task: {:?}", task);
+
+        Some(task)
+    }
+
+    ///  Group files of similar timestamp into buckets.
+    fn get_buckets(&self, files: &[SstFile]) -> HashMap<i64, Vec<FileId>> {
+        let mut buckets: HashMap<i64, Vec<FileId>> = HashMap::new();
+        for f in files {
+            let (left, _) = self.get_window_bounds(f.meta().time_range.end);
+
+            let bucket_files = buckets.entry(left).or_default();
+
+            bucket_files.push(f.id());
+        }
+
+        debug!(
+            "Group files of similar timestamp into buckets: {:?}",
+            buckets
+        );
+        buckets
+    }
+
+    fn get_window_bounds(&self, ts: Timestamp) -> (i64, i64) {
+        let ts_secs = ts.0 / 1000;
+
+        let size = self.segment_duration.as_secs() as i64;
+
+        let lower = ts_secs - (ts_secs % size);
+        let upper = lower + size - 1;
+
+        (lower * 1000, upper * 1000)
+    }
+
+    fn get_compact_files(&self, buckets: HashMap<i64, Vec<FileId>>) -> 
Vec<FileId> {
+        let all_keys: BTreeSet<_> = buckets.keys().collect();
+        let mut compact_files = Vec::new();
+
+        for key in all_keys.into_iter().rev() {
+            if let Some(bucket) = buckets.get(key) {
+                if bucket.len() >= 2 {
+                    compact_files = bucket.clone();

Review Comment:
   We need to limit input compact files, also we should compact smaller files 
first.



##########
horaedb/metric_engine/src/compaction/picker.rs:
##########
@@ -28,7 +37,105 @@ impl TimeWindowCompactionStrategy {
         Self { segment_duration }
     }
 
-    pub fn pick_candidate(&self, _ssts: Vec<SstFile>) -> Option<Task> {
-        todo!()
+    pub fn pick_candidate(
+        &self,
+        ssts: Vec<SstFile>,
+        expire_time: Option<Timestamp>,
+    ) -> Option<Task> {
+        let (uncompacted_files, expired_files) =
+            Self::find_uncompacted_and_expired_files(ssts, expire_time);
+        debug!(
+            "uncompacted_files: {:?}, expired_files: {:?}",
+            uncompacted_files, expired_files
+        );
+
+        let buckets = self.get_buckets(&uncompacted_files);
+        let compact_files = self.get_compact_files(buckets);
+
+        let expireds = expired_files
+            .into_iter()
+            .map(|f| f.id())
+            .collect::<Vec<_>>();
+
+        if compact_files.is_empty() && expireds.is_empty() {
+            return None;
+        }
+
+        let task = Task {
+            inputs: compact_files,
+            expireds,
+        };
+        debug!("Pick compaction task: {:?}", task);
+
+        Some(task)
+    }
+
+    ///  Group files of similar timestamp into buckets.
+    fn get_buckets(&self, files: &[SstFile]) -> HashMap<i64, Vec<FileId>> {
+        let mut buckets: HashMap<i64, Vec<FileId>> = HashMap::new();
+        for f in files {
+            let (left, _) = self.get_window_bounds(f.meta().time_range.end);
+
+            let bucket_files = buckets.entry(left).or_default();
+
+            bucket_files.push(f.id());
+        }
+
+        debug!(
+            "Group files of similar timestamp into buckets: {:?}",
+            buckets
+        );
+        buckets
+    }
+
+    fn get_window_bounds(&self, ts: Timestamp) -> (i64, i64) {

Review Comment:
   Add UT for this function.



##########
horaedb/metric_engine/src/sst.rs:
##########
@@ -71,6 +74,18 @@ impl SstFile {
     pub fn mark_compaction(&self) {
         self.inner.in_compaction.store(true, Ordering::Relaxed);
     }
+
+    pub fn is_uncompacted(&self) -> bool {
+        !self.inner.in_compaction.load(Ordering::Relaxed)
+    }

Review Comment:
   ```suggestion
       pub fn is_compaction(&self) -> bool {
           self.inner.in_compaction.load(Ordering::Relaxed)
       }
   ```
   
   Negate this bool inside this function is not very readable.



##########
horaedb/metric_engine/src/compaction/picker.rs:
##########
@@ -28,7 +37,105 @@ impl TimeWindowCompactionStrategy {
         Self { segment_duration }
     }
 
-    pub fn pick_candidate(&self, _ssts: Vec<SstFile>) -> Option<Task> {
-        todo!()
+    pub fn pick_candidate(
+        &self,
+        ssts: Vec<SstFile>,
+        expire_time: Option<Timestamp>,
+    ) -> Option<Task> {
+        let (uncompacted_files, expired_files) =
+            Self::find_uncompacted_and_expired_files(ssts, expire_time);
+        debug!(
+            "uncompacted_files: {:?}, expired_files: {:?}",
+            uncompacted_files, expired_files
+        );
+
+        let buckets = self.get_buckets(&uncompacted_files);
+        let compact_files = self.get_compact_files(buckets);
+
+        let expireds = expired_files
+            .into_iter()
+            .map(|f| f.id())
+            .collect::<Vec<_>>();
+
+        if compact_files.is_empty() && expireds.is_empty() {
+            return None;
+        }
+
+        let task = Task {
+            inputs: compact_files,
+            expireds,
+        };
+        debug!("Pick compaction task: {:?}", task);
+
+        Some(task)
+    }
+
+    ///  Group files of similar timestamp into buckets.
+    fn get_buckets(&self, files: &[SstFile]) -> HashMap<i64, Vec<FileId>> {
+        let mut buckets: HashMap<i64, Vec<FileId>> = HashMap::new();
+        for f in files {
+            let (left, _) = self.get_window_bounds(f.meta().time_range.end);
+
+            let bucket_files = buckets.entry(left).or_default();
+
+            bucket_files.push(f.id());
+        }
+
+        debug!(
+            "Group files of similar timestamp into buckets: {:?}",
+            buckets
+        );
+        buckets
+    }
+
+    fn get_window_bounds(&self, ts: Timestamp) -> (i64, i64) {
+        let ts_secs = ts.0 / 1000;
+
+        let size = self.segment_duration.as_secs() as i64;
+
+        let lower = ts_secs - (ts_secs % size);
+        let upper = lower + size - 1;
+
+        (lower * 1000, upper * 1000)
+    }
+
+    fn get_compact_files(&self, buckets: HashMap<i64, Vec<FileId>>) -> 
Vec<FileId> {
+        let all_keys: BTreeSet<_> = buckets.keys().collect();
+        let mut compact_files = Vec::new();
+
+        for key in all_keys.into_iter().rev() {
+            if let Some(bucket) = buckets.get(key) {
+                if bucket.len() >= 2 {
+                    compact_files = bucket.clone();

Review Comment:
   We a SST is picked, we should mark it in compacting, so it won't got 
compacted twice.



##########
horaedb/metric_engine/src/compaction/picker.rs:
##########
@@ -28,7 +37,105 @@ impl TimeWindowCompactionStrategy {
         Self { segment_duration }
     }
 
-    pub fn pick_candidate(&self, _ssts: Vec<SstFile>) -> Option<Task> {
-        todo!()
+    pub fn pick_candidate(
+        &self,
+        ssts: Vec<SstFile>,
+        expire_time: Option<Timestamp>,
+    ) -> Option<Task> {
+        let (uncompacted_files, expired_files) =
+            Self::find_uncompacted_and_expired_files(ssts, expire_time);
+        debug!(
+            "uncompacted_files: {:?}, expired_files: {:?}",
+            uncompacted_files, expired_files
+        );
+
+        let buckets = self.get_buckets(&uncompacted_files);
+        let compact_files = self.get_compact_files(buckets);
+
+        let expireds = expired_files
+            .into_iter()
+            .map(|f| f.id())
+            .collect::<Vec<_>>();
+
+        if compact_files.is_empty() && expireds.is_empty() {
+            return None;
+        }
+
+        let task = Task {
+            inputs: compact_files,
+            expireds,
+        };
+        debug!("Pick compaction task: {:?}", task);
+
+        Some(task)
+    }
+
+    ///  Group files of similar timestamp into buckets.
+    fn get_buckets(&self, files: &[SstFile]) -> HashMap<i64, Vec<FileId>> {
+        let mut buckets: HashMap<i64, Vec<FileId>> = HashMap::new();
+        for f in files {
+            let (left, _) = self.get_window_bounds(f.meta().time_range.end);
+
+            let bucket_files = buckets.entry(left).or_default();
+
+            bucket_files.push(f.id());

Review Comment:
   FileId is not suitable here, we need to mutate sst inner state, so a Task 
should hold `SstFile`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to