yezhizi commented on code in PR #3262:
URL: https://github.com/apache/kvrocks/pull/3262#discussion_r2538252455


##########
src/types/redis_timeseries.cc:
##########
@@ -72,10 +72,33 @@ struct Reducer {
                                           [](const TSSample &a, const TSSample 
&b) { return a.v < b.v; });
     return max->v - min->v;
   }
+  static inline double Twa(nonstd::span<const TSSample> samples) {

Review Comment:
   The name `twa` might not be appropriate here since it calculates an area, 
not a “average”. Maybe `Area` is more intuitive.



##########
src/types/redis_timeseries.cc:
##########
@@ -126,6 +150,71 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
       }
     } else if (!spans[i].empty()) {
       sample.v = aggregator.AggregateSamplesValue(spans[i]);
+
+      if (is_twa_aggregator) {
+        auto bucket_right = 
aggregator.CalculateAlignedBucketRight(bucket_left);
+
+        /// Computes area of polygon from start of the current bucket to the 
first sample of the current span.
+        /// Total Area = Area of bottom rectangle + Area of above triangle.
+        auto front_area = [](uint64_t bucket_left, const TSSample &prev, const 
TSSample &curr) {
+          auto x = static_cast<double>(bucket_left - prev.ts);  // Distance 
from
+          auto y = static_cast<double>(curr.ts - prev.ts);
+          auto z = curr.v - prev.v;
+          auto triangle_area = (z * (y - (x * x) / y)) / 2;
+          auto rect_area = static_cast<double>(y - x) * prev.v;
+          return triangle_area + rect_area;
+        };
+        /// Computes area of polygon from the last sample of the current span 
to the end of current bucket.
+        /// Total Area = Area of bottom rectangle + Area of above triangle.
+        auto end_area = [](uint64_t bucket_right, const TSSample &curr, const 
TSSample &next) {
+          auto x = static_cast<double>(bucket_right - curr.ts);
+          auto y = static_cast<double>(next.ts - curr.ts);
+          auto z = next.v - curr.v;
+          auto rect_area = x * curr.v;
+          auto triangle_area = (x * x * z) / (2 * y);
+          return triangle_area + rect_area;
+        };
+        auto non_empty_left_bucket = [&spans](size_t curr) {
+          while (--curr && spans[curr].empty());
+          return curr;
+        };
+        auto non_empty_right_bucket = [&spans](size_t curr) {
+          while (++curr < spans.size() && spans[curr].empty());
+          return curr;
+        };
+
+        // Cut left and right empty regions.
+        bucket_left = std::max(bucket_left, option.start_ts);
+        bucket_right = std::min(bucket_right, option.end_ts);
+        uint64_t l = bucket_left, r = bucket_right;
+        double area = 0.0;
+        if (spans.size() == 1) {
+          area += prev_available ? front_area(bucket_left, prev_sample, 
spans[i].front()) : 0;
+          area += next_available ? end_area(bucket_right, spans[i].back(), 
next_sample) : 0;
+          l = prev_available ? bucket_left : spans[i].front().ts;
+          r = next_available ? bucket_right : spans[i].back().ts;
+          // Edge case: single bucket with only one element.
+          area += (!prev_available && !next_available && spans[i].size() == 1) 
? spans[i][0].v : 0;
+        } else if (i == 0) {
+          size_t p = non_empty_right_bucket(i);
+          area += spans[i].back().ts != bucket_right ? end_area(bucket_right, 
spans[i].back(), spans[p].front()) : 0;
+          area += prev_available ? front_area(bucket_left, prev_sample, 
spans[i].front()) : 0;
+          l = prev_available ? bucket_left : spans[i].front().ts;
+        } else if (i == (spans.size() - 1)) {
+          size_t p = non_empty_left_bucket(i);
+          area += spans[i].front().ts != bucket_left ? front_area(bucket_left, 
spans[p].back(), spans[i].front()) : 0;
+          area += next_available ? end_area(bucket_right, spans[i].back(), 
next_sample) : 0;
+          // Edge case: when last bucket contains one sample and its timestamp 
equals bucket boundary.
+          area += (spans[i].size() == 1 && spans[i].front().ts == bucket_left 
&& !next_available) ? spans[i][0].v : 0;
+          r = next_available ? bucket_right : spans[i].back().ts;
+        } else {
+          size_t x = non_empty_left_bucket(i), y = non_empty_right_bucket(i);
+          area += spans[i].front().ts != bucket_left ? front_area(bucket_left, 
spans[x].back(), spans[i].front()) : 0;
+          area += spans[i].back().ts != bucket_right ? end_area(bucket_right, 
spans[i].back(), spans[y].front()) : 0;
+        }
+        sample.v += area;
+        sample.v /= std::max(static_cast<double>(r - l), 1.0);
+      }
     } else {

Review Comment:
   This code looks a bit messy. Maybe we can split it into two steps:
   1. Get the `prev` and `next` for each bucket
   2. Calculate the TWA for each bucket.
   
   For step 1, we can do it before the loop starts, and focus on calculating 
the TWA within the for loop.
   Like this:
   ```
   // Some container to store the prev and next of each bucket...
   // Get the prev and next for each bucket...
   // ...
   
   for (size_t i = 0; i < spans.size(); i++) {
   ...
   
   // Calculate the TWA for spans[i]
   
   ...
   }
   ```
   This might be a helpful way to organize this code logic better. : )
   



##########
src/types/redis_timeseries.cc:
##########
@@ -1103,6 +1206,13 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context 
&ctx, const Slice &ns_ke
     }
   }
 
+  if (is_twa_aggregator) {
+    // prev_sample might not get initialized, if first element is in first 
bucket.
+    prev_sample = prev_sample.ts == TSSample::MAX_TIMESTAMP ? 
temp_results.front() : prev_sample;
+    temp_results.push_back(prev_sample);
+    temp_results.push_back(next_sample);
+  }

Review Comment:
   `next_sample` might not get initialized also here.



##########
src/types/redis_timeseries.cc:
##########
@@ -111,6 +134,7 @@ std::vector<TSSample> 
AggregateSamplesByRangeOption(std::vector<TSSample> sample
     if (option.is_return_empty && spans[i].empty()) {
       switch (aggregator.type) {
         case TSAggregatorType::SUM:
+        case TSAggregatorType::TWA:
         case TSAggregatorType::COUNT:
           sample.v = 0;
           break;

Review Comment:
   Here the result is not 0 when specify the `EMPTY` flag. It should be:
   >    Average value over the bucket's timeframe based on linear interpolation 
of the last sample before the bucket's start and the first sample after the 
bucket's end. NaN when no such samples.
   
   Refer to [[EMPTY] (since RedisTimeSeries 
v1.8)](https://redis.io/docs/latest/commands/ts.range/)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to