yezhizi commented on code in PR #3262:
URL: https://github.com/apache/kvrocks/pull/3262#discussion_r2567495566


##########
src/types/redis_timeseries.cc:
##########
@@ -97,7 +101,114 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
     }
     return 0;
   };
+  /// Computes area of polygon from start of the current bucket to the first 
sample of the current span.
+  /// Total Area = Area of bottom rectangle + Area of above triangle.
+  auto front_area = [](uint64_t bucket_left, const TSSample &prev, const 
TSSample &curr) {
+    auto x = static_cast<double>(bucket_left - prev.ts);  // Distance from
+    auto y = static_cast<double>(curr.ts - prev.ts);
+    auto z = curr.v - prev.v;
+    auto triangle_area = (z * (y - (x * x) / y)) / 2;
+    auto rect_area = static_cast<double>(y - x) * prev.v;
+    return triangle_area + rect_area;
+  };
+  /// Computes area of polygon from the last sample of the current span to the 
end of current bucket.
+  /// Total Area = Area of bottom rectangle + Area of above triangle.
+  auto end_area = [](uint64_t bucket_right, const TSSample &curr, const 
TSSample &next) {
+    auto x = static_cast<double>(bucket_right - curr.ts);
+    auto y = static_cast<double>(next.ts - curr.ts);
+    auto z = next.v - curr.v;
+    auto rect_area = x * curr.v;
+    auto triangle_area = (x * x * z) / (2 * y);
+    return triangle_area + rect_area;
+  };
+  // Computes the TWA of empty bucket from its neighbor samples.
+  auto empty_bucket_twa = [&front_area](const TSSample &left_nb, uint64_t 
bucket_left, uint64_t bucket_right,
+                                        const TSSample &right_nb) {
+    // Area of empty bucket = Area from left_nb to bucket_right - Area from 
left_nb to bucket_left
+    auto f_area = front_area(bucket_left, left_nb, right_nb);
+    auto s_area = front_area(bucket_right, left_nb, right_nb);
+    return (f_area - s_area) / static_cast<double>(bucket_right - bucket_left);
+  };
+
+  // Retrieve prev_sample and next_sample from samples when TWA aggregation.
+  TSSample prev_sample, next_sample;
+  bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, 
prev_available = false, next_available = false;
+  if (is_twa_aggregator) {
+    const bool discard_boundaries = !option.filter_by_ts.empty() || 
option.filter_by_value.has_value();
+    next_sample = samples.back();
+    samples.pop_back();
+    prev_sample = samples.back();
+    samples.pop_back();
+    // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary 
samples.
+    prev_available = discard_boundaries ? false : !samples.empty() && 
(samples.front().ts != prev_sample.ts);
+    next_available = discard_boundaries ? false : !samples.empty() && 
(samples.back().ts != next_sample.ts);
+  }
+  std::vector<TSSample> res;
+  if (is_twa_aggregator && option.is_return_empty && samples.empty()) {
+    const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP || 
next_sample.ts == TSSample::MAX_TIMESTAMP ||
+                              prev_sample.ts == next_sample.ts;  // When 
filter entire range lies left or right to data.
+    if (early_return) {
+      res = std::move(samples);
+      return res;
+    }
+    // Both prev and next should be available. Total range should be in 
between the prev and next samples.
+    assert(prev_sample.ts <= option.start_ts && option.end_ts <= 
next_sample.ts);
+
+    uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / 
option.aggregator.bucket_duration;
+    res.reserve(n_buckets_estimate + 1);
+    uint64_t bucket_left = 
aggregator.CalculateAlignedBucketLeft(option.start_ts);
+    uint64_t bucket_right = 
aggregator.CalculateAlignedBucketRight(bucket_left);
+    for (size_t i = 0; i < n_buckets_estimate; i++) {
+      bucket_left = std::max(bucket_left, option.start_ts);
+      bucket_right = std::min(bucket_right, option.end_ts);
+      TSSample sample;
+      sample.ts = bucket_left;
+      sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, 
next_sample);
+      res.push_back(sample);
+      bucket_left = bucket_right;
+      bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
+    }
+    // Process last bucket.
+    TSSample sample;
+    sample.ts = bucket_left;
+    if (bucket_left == option.end_ts) {  // Calculate last sample.
+      double y_diff = next_sample.v - prev_sample.v;
+      auto x_diff = static_cast<double>(next_sample.ts - prev_sample.ts);
+      auto x_prime_diff = static_cast<double>(option.end_ts - prev_sample.ts);
+      double y_prime_diff = (x_prime_diff * y_diff) / x_diff;
+      sample.v = y_prime_diff + prev_sample.v;
+    } else {
+      sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, 
next_sample);
+    }
+    res.push_back(sample);
+    return res;
+  } else if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
+    res = std::move(samples);
+    return res;
+  }
+
+  auto spans = aggregator.SplitSamplesToBuckets(samples);
   res.reserve(spans.size());
+
+  auto non_empty_left_bucket_idx = [&spans](size_t curr) {
+    while (--curr && spans[curr].empty());
+    return curr;
+  };
+  auto non_empty_right_bucket_idx = [&spans](size_t curr) {
+    while (++curr < spans.size() && spans[curr].empty());
+    return curr;
+  };
+
+  std::vector<std::pair<TSSample, TSSample>> neighbors;
+  neighbors.reserve(spans.size());
+  for (size_t i = 0; i < spans.size(); i++) {
+    TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : 
prev_sample;
+    TSSample next = (i != (spans.size() - 1)) ? 
spans[non_empty_right_bucket_idx(i)].front() : next_sample;
+    neighbors.emplace_back(prev, next);
+    assert(spans[i].empty() ||
+           (neighbors[i].first.ts <= spans[i].front().ts && spans[i].back().ts 
<= neighbors[i].second.ts));
+  }  // Should follow: neighbors[i].first <= span[i].front() <= span[i].back() 
<= neighbors[i].second;

Review Comment:
   Why is there an `assert` here? Under what case would this assertion fail?



##########
src/types/redis_timeseries.cc:
##########
@@ -97,7 +101,114 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
     }
     return 0;
   };
+  /// Computes area of polygon from start of the current bucket to the first 
sample of the current span.
+  /// Total Area = Area of bottom rectangle + Area of above triangle.
+  auto front_area = [](uint64_t bucket_left, const TSSample &prev, const 
TSSample &curr) {
+    auto x = static_cast<double>(bucket_left - prev.ts);  // Distance from
+    auto y = static_cast<double>(curr.ts - prev.ts);
+    auto z = curr.v - prev.v;
+    auto triangle_area = (z * (y - (x * x) / y)) / 2;
+    auto rect_area = static_cast<double>(y - x) * prev.v;
+    return triangle_area + rect_area;
+  };
+  /// Computes area of polygon from the last sample of the current span to the 
end of current bucket.
+  /// Total Area = Area of bottom rectangle + Area of above triangle.
+  auto end_area = [](uint64_t bucket_right, const TSSample &curr, const 
TSSample &next) {
+    auto x = static_cast<double>(bucket_right - curr.ts);
+    auto y = static_cast<double>(next.ts - curr.ts);
+    auto z = next.v - curr.v;
+    auto rect_area = x * curr.v;
+    auto triangle_area = (x * x * z) / (2 * y);
+    return triangle_area + rect_area;
+  };
+  // Computes the TWA of empty bucket from its neighbor samples.
+  auto empty_bucket_twa = [&front_area](const TSSample &left_nb, uint64_t 
bucket_left, uint64_t bucket_right,
+                                        const TSSample &right_nb) {
+    // Area of empty bucket = Area from left_nb to bucket_right - Area from 
left_nb to bucket_left
+    auto f_area = front_area(bucket_left, left_nb, right_nb);
+    auto s_area = front_area(bucket_right, left_nb, right_nb);
+    return (f_area - s_area) / static_cast<double>(bucket_right - bucket_left);
+  };
+
+  // Retrieve prev_sample and next_sample from samples when TWA aggregation.
+  TSSample prev_sample, next_sample;
+  bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, 
prev_available = false, next_available = false;
+  if (is_twa_aggregator) {
+    const bool discard_boundaries = !option.filter_by_ts.empty() || 
option.filter_by_value.has_value();
+    next_sample = samples.back();
+    samples.pop_back();
+    prev_sample = samples.back();
+    samples.pop_back();
+    // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary 
samples.
+    prev_available = discard_boundaries ? false : !samples.empty() && 
(samples.front().ts != prev_sample.ts);
+    next_available = discard_boundaries ? false : !samples.empty() && 
(samples.back().ts != next_sample.ts);
+  }
+  std::vector<TSSample> res;
+  if (is_twa_aggregator && option.is_return_empty && samples.empty()) {
+    const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP || 
next_sample.ts == TSSample::MAX_TIMESTAMP ||
+                              prev_sample.ts == next_sample.ts;  // When 
filter entire range lies left or right to data.
+    if (early_return) {
+      res = std::move(samples);
+      return res;
+    }
+    // Both prev and next should be available. Total range should be in 
between the prev and next samples.
+    assert(prev_sample.ts <= option.start_ts && option.end_ts <= 
next_sample.ts);
+

Review Comment:
   Remove this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to