yezhizi commented on code in PR #3262:
URL: https://github.com/apache/kvrocks/pull/3262#discussion_r2539006086
##########
src/types/redis_timeseries.cc:
##########
@@ -126,6 +150,71 @@ std::vector<TSSample>
AggregateSamplesByRangeOption(std::vector<TSSample> sample
}
} else if (!spans[i].empty()) {
sample.v = aggregator.AggregateSamplesValue(spans[i]);
+
+ if (is_twa_aggregator) {
+ auto bucket_right =
aggregator.CalculateAlignedBucketRight(bucket_left);
+
+ /// Computes area of polygon from start of the current bucket to the
first sample of the current span.
+ /// Total Area = Area of bottom rectangle + Area of above triangle.
+ auto front_area = [](uint64_t bucket_left, const TSSample &prev, const
TSSample &curr) {
+ auto x = static_cast<double>(bucket_left - prev.ts); // Distance
from
+ auto y = static_cast<double>(curr.ts - prev.ts);
+ auto z = curr.v - prev.v;
+ auto triangle_area = (z * (y - (x * x) / y)) / 2;
+ auto rect_area = static_cast<double>(y - x) * prev.v;
+ return triangle_area + rect_area;
+ };
+ /// Computes area of polygon from the last sample of the current span
to the end of current bucket.
+ /// Total Area = Area of bottom rectangle + Area of above triangle.
+ auto end_area = [](uint64_t bucket_right, const TSSample &curr, const
TSSample &next) {
+ auto x = static_cast<double>(bucket_right - curr.ts);
+ auto y = static_cast<double>(next.ts - curr.ts);
+ auto z = next.v - curr.v;
+ auto rect_area = x * curr.v;
+ auto triangle_area = (x * x * z) / (2 * y);
+ return triangle_area + rect_area;
+ };
+ auto non_empty_left_bucket = [&spans](size_t curr) {
+ while (--curr && spans[curr].empty());
+ return curr;
+ };
+ auto non_empty_right_bucket = [&spans](size_t curr) {
+ while (++curr < spans.size() && spans[curr].empty());
+ return curr;
+ };
+
+ // Cut left and right empty regions.
+ bucket_left = std::max(bucket_left, option.start_ts);
+ bucket_right = std::min(bucket_right, option.end_ts);
+ uint64_t l = bucket_left, r = bucket_right;
+ double area = 0.0;
+ if (spans.size() == 1) {
+ area += prev_available ? front_area(bucket_left, prev_sample,
spans[i].front()) : 0;
+ area += next_available ? end_area(bucket_right, spans[i].back(),
next_sample) : 0;
+ l = prev_available ? bucket_left : spans[i].front().ts;
+ r = next_available ? bucket_right : spans[i].back().ts;
+ // Edge case: single bucket with only one element.
+ area += (!prev_available && !next_available && spans[i].size() == 1)
? spans[i][0].v : 0;
+ } else if (i == 0) {
+ size_t p = non_empty_right_bucket(i);
+ area += spans[i].back().ts != bucket_right ? end_area(bucket_right,
spans[i].back(), spans[p].front()) : 0;
+ area += prev_available ? front_area(bucket_left, prev_sample,
spans[i].front()) : 0;
+ l = prev_available ? bucket_left : spans[i].front().ts;
+ } else if (i == (spans.size() - 1)) {
+ size_t p = non_empty_left_bucket(i);
+ area += spans[i].front().ts != bucket_left ? front_area(bucket_left,
spans[p].back(), spans[i].front()) : 0;
+ area += next_available ? end_area(bucket_right, spans[i].back(),
next_sample) : 0;
+ // Edge case: when last bucket contains one sample and its timestamp
equals bucket boundary.
+ area += (spans[i].size() == 1 && spans[i].front().ts == bucket_left
&& !next_available) ? spans[i][0].v : 0;
+ r = next_available ? bucket_right : spans[i].back().ts;
+ } else {
+ size_t x = non_empty_left_bucket(i), y = non_empty_right_bucket(i);
+ area += spans[i].front().ts != bucket_left ? front_area(bucket_left,
spans[x].back(), spans[i].front()) : 0;
+ area += spans[i].back().ts != bucket_right ? end_area(bucket_right,
spans[i].back(), spans[y].front()) : 0;
+ }
+ sample.v += area;
+ sample.v /= std::max(static_cast<double>(r - l), 1.0);
+ }
} else {
Review Comment:
This part is a bit complex to follow. Maybe we can split it into two steps:
1. Get the `prev` and `next` for each bucket
2. Calculate the TWA for each bucket.
For step 1, we can do it before the loop starts, and focus on calculating
the TWA within the for loop.
Like this:
```
// Some container to store the prev and next of each bucket...
// Get the prev and next for each bucket...
// ...
for (size_t i = 0; i < spans.size(); i++) {
...
// Calculate the TWA for spans[i]
...
}
```
This might be a helpful way to organize this code logic better. : )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]