GitHub user yezhizi edited a discussion: TimeSeries Proposal
This is an [OSPP 2025 project](https://summer-ospp.ac.cn/org/prodetail/259430064?list=pro). I sincerely appreciate and welcome feedback and suggestions from all community members to help me improve! ## Introduction RedisTimeSeries is a redis module used to operate and query time series data, giving redis basic time series database capabilities. As Apache Kvrocks is characterized by being compatible with the Redis protocol and commands, we also hope to provide temporal data processing capabilities that are compatible with RedisTimeSeries. --- ## Data Structure Design Most of the design is inspired by [beihao's proposal](https://beihao-rebecca.notion.site/TimeSeries-Proposal-6c5d2f9bc0624a50a209cb8f517109a8) and [PR #2573](https://github.com/apache/kvrocks/pull/2573), but I've added many details and made significant changes to the core data storage approach. ### Key Type(1 byte emun) | key type | enum value | | ----------------- | ---------- | | `CHUNK` | 0 | | `LABEL` | 1 | | `DOWNSTREAM` | 2 | ### 1. `Datameta` (Metadata CF) ```plaintext +----------+------------+-----------+-------------------+ key => | flags | expire | version |size(chunkCount) | | (1byte) | (Ebyte) | (8byte) | (Sbyte) | +----------+------------+-----------+-------------------+ +----------------+----------------+----------------+ | retentionTime | chunkSize | chunkType | | (8byte) | (8byte) | (1byte) | +----------------+----------------+----------------+ +----------------+----------------+----------------+ |duplicatePolicy | sourceKey_size | sourceKey | | (1byte) | (4byte) | (Xbyte) | +----------------+----------------+----------------+ ``` ### 2. Primary Data Storage (Time Chunks) This section explains how timestamp and value data are stored. In RedisTimeSeries, both compressed and uncompressed storage methods are supported. Below is the corresponding design in KvRocks: #### Uncompressed Chunk Type Data storage (default CF): ``` +--------+-------------+-------------+-------------+-------------+ key|version|CHUNK_STORAGE|chunk_id => | count | timestamp1 | value1 | timestamp2 | value2 |... |(8byte) | (8byte) | (8byte) | (8byte) | (8byte) |... +--------+-------------+-------------+-------------+-------------+ ``` Uncompressed chunks store raw timestamps and values as `uint64` and `double` types, respectively. #### Compressed Chunk Type Data storage (default CF): ``` +--------+-------------+-------------+-------------+-------------+ key|version|CHUNK_STORAGE|chunk_id => | count | timestamp1 | value1 | timestamp2 | value2 |... |(8byte) | (Xbyte) | (Ybyte) | (Zbyte) | (Kbyte) |... +--------+-------------+-------------+-------------+-------------+ ``` - Compression algorithm implementation (see [RedisTimeSeries/src/gorilla.c](https://github.com/RedisTimeSeries/RedisTimeSeries/blob/master/src/gorilla.c)): - `timestamp`: Delta-of-delta encoding. - `value`: [Facebook Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). > Compression might not be mandatory initially, but as noted in `influxDB`, it > helps ["reduce storage space and disk IO when > querying"](https://docs.influxdata.com/influxdb/v1/concepts/storage_engine/). > We may start with uncompressed chunks and add compression later. ### 3. Secondary Indexes #### Label Index (default cf) ``` +------------------+ key|version|LABEL|label_key1 => | label_value1 | | (Xbyte) | +------------------+ +------------------+ key|version|LABEL|label_key2 => | label_value2 | | (Xbyte) | +------------------+ ... ``` - Used to retrieve all labels via `TS.INFO`. - Supports modifying labels with `TS.ALTER` #### Reverse Index (TimeSeries CF) Since querying multiple time series using label indexing is a very common operation, designing an efficient reverse index is crucial. Given that the key encoding format of the reverse index differs significantly, it is necessary to introduce a new Column Family (`TimeSeries`). ``` +-----------+--------------+----------------+------------+------------------+--------------+---------+ | ns | index_type | label_key_size | label_key | label_value_size | label_value | key | => null | (1+Xbytes)| (1byte) | (4byte) | (Ybyte) | (4byte) | (Zbyte) | (Kbyte) | +-----------+--------------+----------------+------------+------------------+--------------+---------+ ``` - This is used to filter by labels in `TS.MRANGE` - This may need to be placed in a new column family. #### Downstream Key Datameta (default cf) > In `Redis`, you can use `TS.CREATERULE` to create one or more "destination > timeseries" for compaction from a "source timeseries" (see > [here](https://redis.io/commands/ts.createrule/)). Here we use "downstream" > to represent the destination timeseries for better semantics. ``` +-------------+----------------+-------------+ key|version|DOWNSTREAM_META|downstream_key => | aggregator |bucket_duration | alignment | | (1byte) | (8byte) | (8byte) | +-------------+----------------+-------------+ +---------------------+---------------+ | latest_bucket_index | auxinfo | | (8byte) | (XByte) | +---------------------+---------------+ ``` - Supports the `rules` field in `TS.INFO`. - `aggregator` is an enum for aggregation methods supported by [`TS.CREATERULE`](https://redis.io/docs/latest/commands/ts.createrule/). The meanings of `aggregator`, `bucket_duration`, and `alignment` are defined in the `Redis` command. - `latest_bucket_index`helps quickly determine if it's the latest `bucket` when adding data downstream. - `auxInfo`: Supports various aggregation types in `Redis` and helps downstream series update quickly with lower CPU overhead(see [Quickly Update Latest Bucket](#Quickly-Update-Latest-Bucket)). ## Key Operation Workflows ### Writing ``` TS.ADD key timestamp value [RETENTION retentionPeriod] [ENCODING <COMPRESSED|UNCOMPRESSED>] [CHUNK_SIZE size] [DUPLICATE_POLICY policy] [ON_DUPLICATE policy_ovr] [IGNORE ignoreMaxTimediff ignoreMaxValDiff] [LABELS [label value ...]] ``` 1. Get `datameta` and its `lastTimestamp` field. 2. Compute `chunk_id` from `timestamp` using: `chunk_id = timestamp // chunk_size` 3. Determine if it's the latest data: 1. Yes( `lastTimestamp<timestamp`): Append data to the corresponding chunk(`chunk_id`) . For compressed types, this can quickly compute compressed values using `chunk datameta` and append to chunk end, reducing CPU usage; uncompressed types insert directly. 2. No(`lastTimestamp>timestamp`): For uncompressed types, use binary search to locate insertion position; compressed types need to reconstruct the entire chunk. 4. Pass the data to the downstream series. See [below](). 5. Update `datameta`and`chunk datameta`. #### Optimizations for Compressed Data Writes There are two distinct patterns for time series writes that require different handling. For source series, new data points are typically "appended" to chunks, while downstream series primarily involve "updating" the latest data points. Compression algorithms (Delta-of-delta encoding and [Facebook Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf)) are both differential-based algorithms. We can implement different approaches tailored to these two scenarios. 1. "Append" mode: Updates `prev_timestamp`, `prev_timestampDelta`, `prev_value` during writes. 2. "Update" mode: `prev_timestamp` and `prev_value` point to the second-newest data, allowing direct updates of the latest data. - The `last_offset` field in [Downstream Key Datameta](#Downstream-Key-Datameta) helps quickly locate the latest data point for "update" mode optimizations. - Source series defaults to "append" mode, downstream to "update" mode. ### Time Range Query (`TS.RANGE`) ``` TS.RANGE ts_key start_ts end_ts [AGGREGATION avg 60000] [FILTER_BY_VALUE 25 40] ``` 1. Adjust `start_ts` to `end_ts` to ensure the query range hasn't expired. 2. Compute `chunk_id` range from `start_ts` to `end_ts`. 3. Iterate chunks using `ns_key|version|*`. 4. Return filtered and aggregated(if needed) results. ### Multi-Series Query (`TS.MRANGE`) ``` TS.MRANGE start_ts end_ts FILTER sensor_type=temperature FILTER_BY_VALUE 0 100 ``` 1. Resolve `ts_key` candidates via label index. See [Reverse Index](#Reverse-Index) 2. Execute parallel `TS.RANGE` on each candidate. 3. Merge results with filtering. ### Passing Data Downstream (`TS.CREATERULE`) > Here we use the term "bucket" consistently with `Redis`, where data in the > same `bucket` will be downsampled into one data point. See > [here](https://redis.io/commands/ts.createrule/). `chunk` refers to storage > units where data in the same `chunk` is written to the same location. These > terms have different meanings. When a new data point is added to the source series (e.g., (`ts`, `value`)): 1. Check `downstream_key` and get `aggregator`,`bucket_duration`,`alignment` , `latest_bucket_index` via [Downstream Key Datameta](#Downstream Key Datameta). 2. Calculate `bucket_index`: `(ts-align)//bucket_duration`, and aligned time `ts'`: `ts' = bucket_index*bucket_duration+align` 1. `bucket_index` > `latest_bucket_index`: Directly add a new data point (`ts'`,`value`) downstream. 2. `bucket_index` = `latest_bucket_index`: Most frequent case, see [Quickly Update Latest Bucket](#Quickly-Update-Latest-Bucket). 3. `bucket_index` < `latest_bucket_index`: 1. No need to fetch from source: e.g., when aggregation type is `MIN` and new value > current value. 2. Need to fetch all bucket data from source and recalculate: e.g., when aggregation type is `AVG`. 3. Write result to `downstream_key` atomically, consistent with [Writing](#writing). #### Quickly Update Latest Bucket This is the most common case for downstream time series data writes, corresponding to the "update" mode in [Optimizations for Compressed Data Writes](#Optimizations-for-Compressed-Data-Writes). The auxiliary information in [Downstream Key Datameta](#Downstream-Key-Datameta) helps determine updated values without fetching from source, reducing I/O and CPU since `bucket_duration` may be much larger than `chunk_size`. Specifically, we store current bucket statistics for different aggregation types: | Aggregation type | Stored Information | Update Logic | | ---------------- | -------------------------------------- | ------------------------------------------------------------ | | `AVG` | Data point count `count` | `update_avg=(cur_avg_value*count+new_value)/(count+1)` | | `SUM` | null | `new_sum=cur_sum+new_value` | | `MIN` | Current min `cur_min_value` | `new_min_value=new_value<cur_min_value?new_value:cur_min_value` | | `MAX` | Current max `cur_max_value` | Similar to `MIN` | | `RANGE` | `cur_min_value`and`cur_max_value` | Update `cur_min_value`和`cur_max_value`,then`new_range = cur_max_value - cur_min_value` | | `FIRST` | First timestamp `first_timestamp` | Update only if `new_timestamp<first_timestamp` | | `LAST` | Latest timestamp `last_timestamp` | Similar to `FIRST` | | `std.p` | Bucket mean and count `mean`, `count` | See [Welford's algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm) | | `std.s` | Bucket mean and count `mean`, `count` | Same as above | | `var.p` | Bucket mean and count `mean`, `count` | Same as above | | `var.s` | Bucket mean and count `mean`, `count` | Same as above | ### Retention ( `[RETENTION retentionPeriod] `) Use `RocketDB` compact filters to delete expired data. When all data in a `chunk` expires, the `chunk` can be deleted (when `chunk_last_timestamp+retention<lastTimestamp`). First delete expired `chunk datameta`, then the corresponding `chunk`. Queries automatically filter data outside the retention window. ## Command Support Status This section outlines the core commands. | [`RedisTimeSeries`](https://github.com/RedisTimeSeries/RedisTimeSeries) Commands | Optional arguments or output fields | Supported? | | :----------------------------------------------------------: | :----------------------------------------------------------: | :--------: | | `TS.ADD/TS.MADD` | `[DUPLICATE_POLICY policy] ` | ✅ | | `TS.CREATE` | `[ON_DUPLICATE policy_ovr]` | ✅ | | | `[LABELS [label value ...]]` | ✅ | | | `[RETENTION retentionPeriod] ` | ✅ | | | ` [ENCODING] ` | ✅ | | | `[CHUNK_SIZE size] ` | ❓ | | | `[LABELS [label value ...]]` | ✅ | | `TS.DEL` | | ✅ | | `TS.MRANGE/TS.RANGE` | `[FILTER_BY_TS ts...]` | ✅ | | | `[FILTER_BY_VALUE min max]` | ✅ | | | `[COUNT count]` | ✅ | | | `[[ALIGN align] AGGREGATION aggregator bucketDuration [BUCKETTIMESTAMP bt] [EMPTY]]` | ✅ | | | `FILTER filterExpr...` | ✅ | | `TS.CREATERULE` | `AGGREGATION aggregator bucketDuration` | ✅ | | | ` [alignTimestamp]` | ✅ | | `TS.INCRBY/TS.DECRBY` | | ✅ | | `TS.INFO` | `memoryUsage` | ❓ | | | `firstTimestamp` | ✅ | | | `chunkSize` | ❓ | | | `other...` | ✅ | ❓indicates fields that may be incompatible or inconsistent with `Redis` implementation. ### `TS.INFO` Field Compatibility Some fields may not align with our design: - `chunkSize`: In `Redis`, this is a fixed byte size specified at creation. - `memoryUsage`: This metric might be challenging to calculate and maintain accurately. As an alternative, we could potentially use `diskUsage` instead, with support from the `chunk datameta` structure ? GitHub link: https://github.com/apache/kvrocks/discussions/3044 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
