yezhizi commented on code in PR #3262:
URL: https://github.com/apache/kvrocks/pull/3262#discussion_r2570452548
##########
src/types/redis_timeseries.cc:
##########
@@ -97,7 +101,96 @@ std::vector<TSSample>
AggregateSamplesByRangeOption(std::vector<TSSample> sample
}
return 0;
};
+ // Linear interpolation.
+ auto interpolate_sample = [](const TSSample &left_nb, uint64_t ts, const
TSSample &right_nb) {
+ auto y_diff = right_nb.v - left_nb.v;
+ auto x_diff = static_cast<double>(right_nb.ts - left_nb.ts);
+ auto x_diff_prime = static_cast<double>(ts - left_nb.ts);
+ auto y_diff_prime = (x_diff_prime * y_diff) / x_diff;
+ TSSample sample;
+ sample.ts = ts;
+ sample.v = y_diff_prime + left_nb.v;
+ return sample;
+ };
+ // Computes the TWA of empty bucket from its neighbor samples.
+ auto empty_bucket_twa = [&interpolate_sample](const TSSample &left_nb,
uint64_t bucket_left, uint64_t bucket_right,
+ const TSSample &right_nb) {
+ auto left = interpolate_sample(left_nb, bucket_left, right_nb);
+ auto right = interpolate_sample(left_nb, bucket_right, right_nb);
+ return Reducer::Area(std::array<TSSample, 2>{left, right}) /
static_cast<double>(bucket_right - bucket_left);
+ };
+
+ // Retrieve prev_sample and next_sample from samples when TWA aggregation.
+ TSSample prev_sample, next_sample;
+ bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA,
prev_available = false, next_available = false;
+ if (is_twa_aggregator) {
+ const bool discard_boundaries = !option.filter_by_ts.empty() ||
option.filter_by_value.has_value();
+ next_sample = samples.back();
+ samples.pop_back();
+ prev_sample = samples.back();
+ samples.pop_back();
+ // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary
samples.
+ prev_available = discard_boundaries ? false : !samples.empty() &&
(samples.front().ts != prev_sample.ts);
+ next_available = discard_boundaries ? false : !samples.empty() &&
(samples.back().ts != next_sample.ts);
+ }
+ std::vector<TSSample> res;
+ if (is_twa_aggregator && option.is_return_empty && samples.empty()) {
+ const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP ||
next_sample.ts == TSSample::MAX_TIMESTAMP ||
+ prev_sample.ts == next_sample.ts; // When
filter entire range lies left or right to data.
+ if (early_return) {
+ res = std::move(samples);
+ return res;
+ }
+
+ uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) /
option.aggregator.bucket_duration;
+ res.reserve(n_buckets_estimate + 1);
+ uint64_t bucket_left =
aggregator.CalculateAlignedBucketLeft(option.start_ts);
+ uint64_t bucket_right =
aggregator.CalculateAlignedBucketRight(bucket_left);
+ for (size_t i = 0; i < n_buckets_estimate; i++) {
+ bucket_left = std::max(bucket_left, option.start_ts);
+ bucket_right = std::min(bucket_right, option.end_ts);
+ TSSample sample;
+ sample.ts = bucket_left;
+ sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right,
next_sample);
+ res.push_back(sample);
+ bucket_left = bucket_right;
+ bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
+ }
+ // Process last bucket.
+ TSSample sample;
+ sample.ts = bucket_left;
+ if (bucket_left == option.end_ts) { // Calculate last sample.
+ sample.v = interpolate_sample(prev_sample, option.end_ts, next_sample).v;
+ } else {
+ sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right,
next_sample);
+ }
+ res.push_back(sample);
+ return res;
+ } else if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
+ res = std::move(samples);
+ return res;
+ }
+
+ auto spans = aggregator.SplitSamplesToBuckets(samples);
res.reserve(spans.size());
+
+ auto non_empty_left_bucket_idx = [&spans](size_t curr) {
+ while (--curr && spans[curr].empty());
+ return curr;
+ };
+ auto non_empty_right_bucket_idx = [&spans](size_t curr) {
+ while (++curr < spans.size() && spans[curr].empty());
+ return curr;
+ };
+
+ std::vector<std::pair<TSSample, TSSample>> neighbors;
+ neighbors.reserve(spans.size());
+ for (size_t i = 0; i < spans.size(); i++) {
+ TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() :
prev_sample;
+ TSSample next = (i != (spans.size() - 1)) ?
spans[non_empty_right_bucket_idx(i)].front() : next_sample;
+ neighbors.emplace_back(prev, next);
+ }
+
Review Comment:
The nested while loops inside the for loop result in O(N^2) time complexity,
which can be optimized to O(N). We could:
1. Iterate from 0 to N to resolve all prev neighbors by maintaining a "last
seen non-empty" variable.
2. Iterate from N to 0 to resolve all next neighbors similarly.
##########
src/types/redis_timeseries.cc:
##########
@@ -97,7 +101,96 @@ std::vector<TSSample>
AggregateSamplesByRangeOption(std::vector<TSSample> sample
}
return 0;
};
+ // Linear interpolation.
+ auto interpolate_sample = [](const TSSample &left_nb, uint64_t ts, const
TSSample &right_nb) {
+ auto y_diff = right_nb.v - left_nb.v;
+ auto x_diff = static_cast<double>(right_nb.ts - left_nb.ts);
+ auto x_diff_prime = static_cast<double>(ts - left_nb.ts);
+ auto y_diff_prime = (x_diff_prime * y_diff) / x_diff;
+ TSSample sample;
+ sample.ts = ts;
+ sample.v = y_diff_prime + left_nb.v;
+ return sample;
+ };
+ // Computes the TWA of empty bucket from its neighbor samples.
+ auto empty_bucket_twa = [&interpolate_sample](const TSSample &left_nb,
uint64_t bucket_left, uint64_t bucket_right,
+ const TSSample &right_nb) {
+ auto left = interpolate_sample(left_nb, bucket_left, right_nb);
+ auto right = interpolate_sample(left_nb, bucket_right, right_nb);
+ return Reducer::Area(std::array<TSSample, 2>{left, right}) /
static_cast<double>(bucket_right - bucket_left);
+ };
+
+ // Retrieve prev_sample and next_sample from samples when TWA aggregation.
+ TSSample prev_sample, next_sample;
+ bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA,
prev_available = false, next_available = false;
+ if (is_twa_aggregator) {
+ const bool discard_boundaries = !option.filter_by_ts.empty() ||
option.filter_by_value.has_value();
+ next_sample = samples.back();
+ samples.pop_back();
+ prev_sample = samples.back();
+ samples.pop_back();
+ // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary
samples.
+ prev_available = discard_boundaries ? false : !samples.empty() &&
(samples.front().ts != prev_sample.ts);
+ next_available = discard_boundaries ? false : !samples.empty() &&
(samples.back().ts != next_sample.ts);
+ }
Review Comment:
I think we can pass `next_sample`, `next_available`, etc. as function
parameters instead of calculating them inside the
`AggregateSamplesByRangeOption` function. For example, we can add a struct:
```
struct TWABounds {
std::optional<TSSample> prev_sample;
std::optional<TSSample> next_sample;
};
```
And modify the function interface to:
```
AggregateSamplesByRangeOption(std::vector<TSSample> samples, const
TSRangeOption &option, const TWABounds&)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]