GitHub user yezhizi edited a discussion: TimeSeries Proposal
This is an [OSPP 2025 project](https://summer-ospp.ac.cn/org/prodetail/259430064?list=pro). I sincerely appreciate and welcome feedback and suggestions from all community members to help me improve! ## Introduction RedisTimeSeries is a redis module used to operate and query time series data, giving redis basic time series database capabilities. As Apache Kvrocks is characterized by being compatible with the Redis protocol and commands, we also hope to provide temporal data processing capabilities that are compatible with RedisTimeSeries. --- ## Data Structure Design Most of the design is inspired by [beihao's proposal](https://beihao-rebecca.notion.site/TimeSeries-Proposal-6c5d2f9bc0624a50a209cb8f517109a8) and [PR #2573](https://github.com/apache/kvrocks/pull/2573), but I've added many details and made significant changes to the core data storage approach. ### 1. `Datameta` ```plaintext +----------+------------+-----------+-------------------+-------------------+-------------------+ key => | flags | expire | version |size(totalSamples) | memoryUsage | firstTimestamp | | (1byte) | (Ebyte) | (8byte) | (Sbyte) | (8byte) | (8byte) | +----------+------------+-----------+-------------------+-------------------+-------------------+ +----------------+----------------+----------------+----------------+----------------+ | lastTimestamp | retentionTime | chunkCount | chunkSize | chunkType | | (8byte) | (8byte) | (8byte) | (8byte) | (1byte) | +----------------+----------------+----------------+----------------+----------------+ +----------------+----------------+----------------+--------***--------+--------***--------+ |duplicatePolicy | sourceKey_size | sourceKey |downstream_keysize1| downstream_key1 |... | (1byte) | (4byte) | (Xbyte) | (4byte) | (Nbyte) |... +----------------+----------------+----------------+--------***--------+--------***--------+ ``` - Most fields in `datameta` align with [`TS.INFO`](https://redis.io/commands/ts.info/), except : - `rules` field, which is stored in a separate `subkey` (see [Downstream Key Datameta](#Downstream-Key-Datameta)). - `label` field, a label is a (`label_key`, `label_value`) pair, where both`label_key` and `label_value` are strings of arbitrary length. All labels can be retrieved using `TS.INFO` and modified using `TS.ALTER`. So it might be better to store the labels in a separate subkey. See [Label Index](#Label-Index) - `downstream_key` is used to index downstream time series (see [Downstream Key Datameta](#Downstream-Key-Datameta)). Since there may be multiple downstream sequences, this section may contain the keys of multiple downstream sequences. ### 2. Primary Data Storage (Time Chunks) This section explains how timestamp and value data are stored. In RedisTimeSeries, both compressed and uncompressed storage methods are supported. Below is the corresponding design in KvRocks: #### Uncompressed Chunk Type `Chunk datameta`: ``` +-----------------+-----------------+-----------------+-----------+ key|version|chunk_id => | first_timestamp | end_timestamp | count | bytes | | (8byte) | (8byte) | (8byte) | (8byte) | +-----------------+-----------------+-----------------+-----------+ ``` Data storage (requires a new `Column Family`, e.g., `TimeSeries`): ``` +-------------+-------------+-------------+-------------+ key|version|chunk_id => | timestamp1 | value1 | timestamp2 | value2 |... | (8byte) | (8byte) | (8byte) | (8byte) |... +-------------+-------------+-------------+-------------+ ``` Uncompressed chunks store raw timestamps and values as `uint64` and `double` types, respectively. #### Compressed Chunk Type `Chunk datameta`: ``` +-----------------+-----------------+-----------------+-----------+ key|version|chunk_id => | first_timestamp | end_timestamp | count | bytes | | (8byte) | (8byte) | (8byte) | (8byte) | +-----------------+-----------------+-----------------+-----------+ +-----------------+-----------------+---------------------+-----------------+ | base_value | prev_timestamp | prev_timestampDelta | prev_value | | (8byte) | (8byte) | (8byte) | (8byte) | +-----------------+-----------------+---------------------+-----------------+ ``` Data storage (requires a new `Column Family`, e.g., `TimeSeries`): ``` +-------------+-------------+-------------+-------------+ key|version|chunk_id => | timestamp1 | value1 | timestamp2 | value2 |... | (Xbyte) | (Ybyte) | (Zbyte) | (Kbyte) |... +-------------+-------------+-------------+-------------+ ``` - `bucket_id = timestamp // bucket_size` (e.g., `bucket_size=60000ms` for minute-level buckets) - `chunk datameta` assists in chunk queries and writes and supports the `Chunks` field in `TS.INFO` (when `DEBUG` is specified). - Compression algorithm implementation (see [RedisTimeSeries/src/gorilla.c](https://github.com/RedisTimeSeries/RedisTimeSeries/blob/master/src/gorilla.c)): - `timestamp`: Delta-of-delta encoding. - `value`: [Facebook Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). > This design is similar to > [`RedisTimeSeries`](https://github.com/RedisTimeSeries/RedisTimeSeries), but > with a key difference: `RedisTimeSeries` uses fixed-size memory chunks and > always appends data to the head chunk. Here, we use fixed-time-range chunks, > which align better with `Kvrocks`'s key-value storage for efficient > timestamp-based indexing. > > Compression might not be mandatory initially, but as noted in `influxDB`, it > helps ["reduce storage space and disk IO when > querying"](https://docs.influxdata.com/influxdb/v1/concepts/storage_engine/). > We may start with uncompressed chunks and add compression later. ### 3. Secondary Indexes #### Label Index ``` +------------------+----------------+------------------+------------------+ key|version => | label_key1_size | label_key1 |label_value1_size | label_value1 | | (4byte) | (Xbyte) | (4byte) | (Ybyte) | +------------------+----------------+------------------+------------------+ +------------------+----------------+------------------+------------------+ | label_key2_size | label_key2 |label_value2_size | label_value2 |... | (4byte) | (Zbyte) | (4byte) | (Sbyte) |... +------------------+----------------+------------------+------------------+ ``` - Used to retrieve all labels via `TS.INFO`. - Supports modifying labels with `TS.ALTER` #### Reverse Index +------------------+----------------+------------------+------------------+------------------+ | label_key_size | label_key |label_value_size | label_value | key | => null | (4byte) | (Xbyte) | (4byte) | (Ybyte) | (Kbyte) | +------------------+----------------+------------------+------------------+------------------+ - This is used to filter by labels in `TS.MRANGE` - This may need to be placed in a new column family. **I'm not sure if this is a good approach for indexing by labels. More ideas are welcome!** #### Downstream Key Datameta > In `Redis`, you can use `TS.CREATERULE` to create one or more "destination > timeseries" for compaction from a "source timeseries" (see > [here](https://redis.io/commands/ts.createrule/)). Here we use "downstream" > to represent the destination timeseries for better semantics. ``` +-------------+----------------+-------------+---------------------+ key|version|downstream_key => | aggregator |bucket_duration | alignment | latest_bucket_index | | (1byte) | (8byte) | (8byte) | (8byte) | +-------------+----------------+-------------+---------------------+ +--------------+---------------+ | last_offset | auxinfo | | (8Byte) | (XByte) | +--------------+---------------+ ``` - Supports the `rules` field in `TS.INFO`. - `aggregator` is an enum for aggregation methods supported by [`TS.CREATERULE`](https://redis.io/docs/latest/commands/ts.createrule/). The meanings of `aggregator`, `bucket_duration`, and `alignment` are defined in the `Redis` command. - `latest_bucket_index`helps quickly determine if it's the latest `bucket` when adding data downstream. - `last_offset`helps compressed chunk types quickly locate the last data point for efficient updates. - `auxInfo`: Supports various aggregation types in `Redis` and helps downstream series update quickly with lower CPU overhead(see [Quickly Update Latest Bucket](#Quickly-Update-Latest-Bucket)). ## Key Operation Workflows ### Writing ``` TS.ADD key timestamp value [RETENTION retentionPeriod] [ENCODING <COMPRESSED|UNCOMPRESSED>] [CHUNK_SIZE size] [DUPLICATE_POLICY policy] [ON_DUPLICATE policy_ovr] [IGNORE ignoreMaxTimediff ignoreMaxValDiff] [LABELS [label value ...]] ``` 1. Get `datameta` and its `lastTimestamp` field. 2. Compute `bucket_id` range from `timestamp` using: `bucket_id = timestamp // bucket_size` 3. Determine if it's the latest data: 1. Yes: Append data to the corresponding chunk (typically at the end). For compressed types, this can quickly compute compressed values using `chunk datameta` and append to chunk end, reducing CPU usage; uncompressed types insert directly. 2. No: For uncompressed types, use binary search to locate insertion position; compressed types need to reconstruct the entire chunk. 4. Update `datameta`and`chunk datameta`. #### Optimizations for Compressed Data Writes There are two distinct patterns for time series writes that require different handling. For source series, new data points are typically "appended" to chunks, while downstream series primarily involve "updating" the latest data points. Compression algorithms (Delta-of-delta encoding and [Facebook Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf)) are both differential-based algorithms. We can implement different approaches tailored to these two scenarios. 1. "Append" mode: Updates `prev_timestamp`, `prev_timestampDelta`, `prev_value` during writes. 2. "Update" mode: `prev_timestamp` and `prev_value` point to the second-newest data, allowing direct updates of the latest data. - The `last_offset` field in [Downstream Key Datameta](#Downstream-Key-Datameta) helps quickly locate the latest data point for "update" mode optimizations. - Source series defaults to "append" mode, downstream to "update" mode. ### Time Range Query (`TS.RANGE`) ``` TS.RANGE ts_key start_ts end_ts [AGGREGATION avg 60000] [FILTER_BY_VALUE 25 40] ``` 1. Adjust `start_ts` to `end_ts` to ensure the query range hasn't expired. 2. Compute `bucket_id` range from `start_ts` to `end_ts`. 3. Iterate chunks using `ns_key|version|*`. 4. Return filtered and aggregated(if needed) results. ### Multi-Series Query (`TS.MRANGE`) ``` TS.MRANGE start_ts end_ts FILTER sensor_type=temperature FILTER_BY_VALUE 0 100 ``` 1. Resolve `ts_key` candidates via label index. See [Reverse Index](#Reverse-Index) 2. Execute parallel `TS.RANGE` on each candidate. 3. Merge results with filtering. ### Passing Data Downstream (`TS.CREATERULE`) > Here we use the term "bucket" consistently with `Redis`, where data in the > same `bucket` will be downsampled into one data point. See > [here](https://redis.io/commands/ts.createrule/). `chunk` refers to storage > units where data in the same `chunk` is written to the same location. These > terms have different meanings. When a new data point is added to the source series (e.g., (`ts`, `value`)): 1. Check `downstream_key` and get `aggregator`,`bucket_duration`,`alignment` , `latest_bucket_index` via [Downstream Key Datameta](#Downstream Key Datameta). 2. Calculate `bucket_index`: `(ts-align)//bucket_duration`, and aligned time `ts'`: `ts' = bucket_index*bucket_duration+align` 1. `bucket_index` > `latest_bucket_index`: Directly add a new data point (`ts'`,`value`) downstream. 2. `bucket_index` = `latest_bucket_index`: Most frequent case, see [Quickly Update Latest Bucket](#Quickly-Update-Latest-Bucket). 3. `bucket_index` < `latest_bucket_index`: 1. No need to fetch from source: e.g., when aggregation type is `MIN` and new value > current value. 2. Need to fetch all bucket data from source and recalculate: e.g., when aggregation type is `AVG`. 3. Write result to `downstream_key` atomically, consistent with [Writing](#writing). #### Quickly Update Latest Bucket This is the most common case for time series data writes, corresponding to the "update" mode in [Optimizations for Compressed Data Writes](#Optimizations-for-Compressed-Data-Writes). The auxiliary information in [Downstream Key Datameta](#Downstream-Key-Datameta) helps determine updated values without fetching from source, reducing I/O and CPU since `bucket_duration` may be much larger than `chunk_duration`. Specifically, we store current bucket statistics for different aggregation types: | Aggregation type | Stored Information | Update Logic | | ---------------- | -------------------------------------- | ------------------------------------------------------------ | | `AVG` | Data point count `count` | `update_avg=(cur_avg_value*count+new_value)/(count+1)` | | `SUM` | null | `new_sum=cur_sum+new_value` | | `MIN` | Current min `cur_min_value` | `new_min_value=new_value<cur_min_value?new_value:cur_min_value` | | `MAX` | Current max `cur_max_value` | Similar to `MIN` | | `RANGE` | `cur_min_value`和`cur_max_value` | Update `cur_min_value`和`cur_max_value`,then`new_range = cur_max_value - cur_min_value` | | `FIRST` | First timestamp `first_timestamp` | Update only if `new_timestamp<first_timestamp` | | `LAST` | Latest timestamp `last_timestamp` | Similar to `FIRST` | | `std.p` | Bucket mean and count `mean`, `count` | See [Welford's algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm) | | `std.s` | Bucket mean and count `mean`, `count` | Same as above | | `var.p` | Bucket mean and count `mean`, `count` | Same as above | | `var.s` | Bucket mean and count `mean`, `count` | Same as above | ### Retention ( `[RETENTION retentionPeriod] `) Use `RocketDB` compact filters to delete expired data. When all data in a `chunk` expires, the `chunk` can be deleted (when `chunk_last_timestamp+retention<lastTimestamp`). First delete expired `chunk datameta`, then the corresponding `chunk`. Queries automatically filter data outside the retention window. Note: `TS.INFO` needs to show the oldest unexpired `firstTimstamp`. This field in `datameta` isn't updated during data addition/expiration, but retrieved when `TS.INFO` is explicitly called. ## Command Support Status This section outlines the core commands. | [`RedisTimeSeries`](https://github.com/RedisTimeSeries/RedisTimeSeries) Commands | Optional arguments or output fields | Supported? | | :----------------------------------------------------------: | :----------------------------------------------------------: | :--------: | | `TS.ADD/TS.MADD` | `[DUPLICATE_POLICY policy] ` | ✅ | | `TS.CREATE` | `[ON_DUPLICATE policy_ovr]` | ✅ | | | `[LABELS [label value ...]]` | ✅ | | | `[RETENTION retentionPeriod] ` | ✅ | | | ` [ENCODING] ` | ✅ | | | `[CHUNK_SIZE size] ` | ❓ | | | `[LABELS [label value ...]]` | ✅ | | `TS.DEL` | | ✅ | | `TS.MRANGE/TS.RANGE` | `[FILTER_BY_TS ts...]` | ✅ | | | `[FILTER_BY_VALUE min max]` | ✅ | | | `[COUNT count]` | ✅ | | | `[[ALIGN align] AGGREGATION aggregator bucketDuration [BUCKETTIMESTAMP bt] [EMPTY]]` | ✅ | | | `FILTER filterExpr...` | ✅ | | `TS.CREATERULE` | `AGGREGATION aggregator bucketDuration` | ✅ | | | ` [alignTimestamp]` | ✅ | | `TS.INCRBY/TS.DECRBY` | | ✅ | | `TS.INFO` | `memoryUsage` | ❓ | | | `firstTimestamp` | ✅ | | | `chunkSize` | ❓ | | | `other...` | ✅ | ❓indicates fields that may be incompatible or inconsistent with `Redis` implementation. ### `TS.INFO` Field Compatibility Some fields may not align with our design: - `chunkSize`: In `Redis`, this is a fixed byte size specified at creation. - `memoryUsage`: This metric might be challenging to calculate and maintain accurately. As an alternative, we could potentially use `diskUsage` instead, with support from the `chunk datameta` structure ? GitHub link: https://github.com/apache/kvrocks/discussions/3044 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
