GitHub user yezhizi edited a discussion: TimeSeries Proposal

This is an [OSPP 2025 
project](https://summer-ospp.ac.cn/org/prodetail/259430064?list=pro). I 
sincerely appreciate and welcome feedback and suggestions from all community 
members to help me improve!

## Introduction

RedisTimeSeries is a redis module used to operate and query time series data, 
giving redis basic time series database capabilities.

As Apache Kvrocks is characterized by being compatible with the Redis protocol 
and commands, we also hope to provide temporal data processing capabilities 
that are compatible with RedisTimeSeries.

---

## Data Structure Design

Most of the design is inspired by [beihao's 
proposal](https://beihao-rebecca.notion.site/TimeSeries-Proposal-6c5d2f9bc0624a50a209cb8f517109a8)
 and [PR #2573](https://github.com/apache/kvrocks/pull/2573), but I've added 
many details and made significant changes to the core data storage approach.

### Key Type(1 byte emun)

| key type          | enum value |
| ----------------- | ---------- |
| `CHUNK`      | 0          |
| `LABEL`           | 1          |
| `DOWNSTREAM` | 2          |


### 1. `Datameta` (Metadata CF)

```plaintext
        +----------+------------+-----------+-------------------+
key =>  |  flags   |  expire    |  version  |size(chunkCount)   |
        | (1byte)  | (Ebyte)    |  (8byte)  |       (Sbyte)     |
        +----------+------------+-----------+-------------------+
        +----------------+----------------+----------------+
        | retentionTime  |    chunkSize   |    chunkType   |
        |    (8byte)     |    (8byte)     |    (1byte)     |
        +----------------+----------------+----------------+
        +----------------+----------------+----------------+
        |duplicatePolicy | sourceKey_size |   sourceKey    |
        |    (1byte)     |   (4byte)      |    (Xbyte)     |
        +----------------+----------------+----------------+
```

### 2. Primary Data Storage (Time Chunks)

This section explains how timestamp and value data are stored. In 
RedisTimeSeries, both compressed and uncompressed storage methods are 
supported. Below is the corresponding design in KvRocks:

#### Uncompressed Chunk Type



Data storage (default CF):

```
                              
+--------+-------------+-------------+-------------+-------------+
key|version|CHUNK|chunk_id => | count  |  timestamp1 |    value1   |  
timestamp2 |    value2   |...
                              |(8byte) |   (8byte)   |   (8byte)   |   (8byte)  
 |   (8byte)   |...
                              
+--------+-------------+-------------+-------------+-------------+
```

Uncompressed chunks store raw timestamps and values as `uint64` and `double` 
types, respectively.

#### Compressed Chunk Type


Data storage (default CF):

```
                              
+--------+-------------+-------------+-------------+-------------+
key|version|CHUNK|chunk_id => | count  |  timestamp1 |    value1   |  
timestamp2 |    value2   |...
                              |(8byte) |   (Xbyte)   |   (Ybyte)   |   (Zbyte)  
 |   (Kbyte)   |...
                              
+--------+-------------+-------------+-------------+-------------+
```

- Compression algorithm implementation (see 
[RedisTimeSeries/src/gorilla.c](https://github.com/RedisTimeSeries/RedisTimeSeries/blob/master/src/gorilla.c)):
  - `timestamp`: Delta-of-delta encoding.
  - `value`:  [Facebook 
Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).


> Compression might not be mandatory initially, but as noted in `influxDB`, it 
> helps ["reduce storage space and disk IO when 
> querying"](https://docs.influxdata.com/influxdb/v1/concepts/storage_engine/). 
> We may start with uncompressed chunks and add compression later.

### 3. Secondary Indexes

#### Label Index (default cf)

```
                                +------------------+
key|version|LABEL|label_key1 => |  label_value1    |
                                |     (Xbyte)      |  
                                +------------------+
                                +------------------+
key|version|LABEL|label_key2 => |  label_value2    |
                                |     (Xbyte)      |  
                                +------------------+
...
```

- Used to retrieve all labels via `TS.INFO`.
- Supports modifying labels with `TS.ALTER`

#### Reverse Index (TimeSeries CF)

Since querying multiple time series using label indexing is a very common 
operation, designing an efficient reverse index is crucial. Given that the key 
encoding format of the reverse index differs significantly, it is necessary to 
introduce a new Column Family (`TimeSeries`).

```
+-----------+--------------+----------------+------------+------------------+--------------+---------+
|     ns    |  index_type  | label_key_size |  label_key | label_value_size |  
label_value |  key    | => null
| (1+Xbytes)|    (1byte)   |    (4byte)     |   (Ybyte)  |   (4byte)        |   
(Zbyte)    | (Kbyte) |
+-----------+--------------+----------------+------------+------------------+--------------+---------+
```


- This is used to filter by labels in `TS.MRANGE`
- This may need to be placed in a new column family.

#### Downstream Key Datameta (default cf)

> In `Redis`, you can use `TS.CREATERULE` to create one or more "destination 
> timeseries" for compaction from a "source timeseries" (see 
> [here](https://redis.io/commands/ts.createrule/)). Here we use "downstream" 
> to represent the destination timeseries for better semantics.

```
                                             
+-------------+----------------+-------------+---------------------+---------------+
key|version|DOWNSTREAM|downstream_key => |  aggregator |bucket_duration |  
alignment  | latest_bucket_index |  auxinfo      |
                                             |   (1byte)   |   (8byte)      |   
(8byte)   |       (8byte)       |   (XByte)     |
                                         
+-------------+----------------+-------------+---------------------+---------------+
```

- Supports the `rules` field in `TS.INFO`.
- `aggregator` is an enum for aggregation methods supported by 
[`TS.CREATERULE`](https://redis.io/docs/latest/commands/ts.createrule/). The 
meanings of `aggregator`, `bucket_duration`, and `alignment` are defined in the 
`Redis` command.
- `latest_bucket_index`helps quickly determine if it's the latest `bucket` when 
adding data downstream.
- `auxInfo`: Supports various aggregation types in `Redis` and helps downstream 
series update quickly with lower CPU overhead(see [Quickly Update Latest 
Bucket](#Quickly-Update-Latest-Bucket)).


## Key Operation Workflows

### Writing

```
TS.ADD key timestamp value 
  [RETENTION retentionPeriod] 
  [ENCODING <COMPRESSED|UNCOMPRESSED>] 
  [CHUNK_SIZE size] 
  [DUPLICATE_POLICY policy] 
  [ON_DUPLICATE policy_ovr] 
  [IGNORE ignoreMaxTimediff ignoreMaxValDiff] 
  [LABELS [label value ...]]
```

1. Get `datameta` and its `lastTimestamp` field.
2. Compute `chunk_id` from `timestamp` using: `chunk_id = timestamp // 
chunk_size`
3. Determine if it's the latest data:
   1. Yes( `lastTimestamp<timestamp`): Append data to the corresponding 
chunk(`chunk_id`) . For compressed types, this can quickly compute compressed 
values using `chunk datameta` and append to chunk end, reducing CPU usage; 
uncompressed types insert directly.
   2. No(`lastTimestamp>timestamp`): For uncompressed types, use binary search 
to locate insertion position; compressed types need to reconstruct the entire 
chunk.
4. Pass the data to the downstream series. See [below]().
5. Update `datameta`and`chunk datameta`.

#### Optimizations for Compressed Data Writes

There are two distinct patterns for time series writes that require different 
handling. For source series, new data points are typically "appended" to 
chunks, while downstream series primarily involve "updating" the latest data 
points. Compression algorithms (Delta-of-delta encoding and [Facebook 
Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf)) are both 
differential-based algorithms. We can implement different approaches tailored 
to these two scenarios.

1. "Append" mode: Updates `prev_timestamp`, `prev_timestampDelta`, `prev_value` 
during writes.

2. "Update" mode: `prev_timestamp` and `prev_value` point to the second-newest 
data, allowing direct updates of the latest data.

- The `last_offset` field in [Downstream Key 
Datameta](#Downstream-Key-Datameta) helps quickly locate the latest data point 
for "update" mode optimizations.
- Source series defaults to "append" mode, downstream to "update" mode.

### Time Range Query (`TS.RANGE`)

```
TS.RANGE ts_key start_ts end_ts [AGGREGATION avg 60000] [FILTER_BY_VALUE 25 40]
```

1. Adjust `start_ts` to `end_ts` to ensure the query range hasn't expired.
2. Compute `chunk_id` range from `start_ts` to `end_ts`.
3. Iterate chunks using `ns_key|version|*`.
4. Return filtered and aggregated(if needed) results.

### Multi-Series Query (`TS.MRANGE`)

```
TS.MRANGE start_ts end_ts FILTER sensor_type=temperature FILTER_BY_VALUE 0 100
```

1. Resolve `ts_key` candidates via label index.  See [Reverse 
Index](#Reverse-Index)
2. Execute parallel `TS.RANGE` on each candidate.
3. Merge results with filtering.

### Passing Data Downstream (`TS.CREATERULE`)

> Here we use the term "bucket" consistently with `Redis`, where data in the 
> same `bucket` will be downsampled into one data point. See 
> [here](https://redis.io/commands/ts.createrule/). `chunk` refers to storage 
> units where data in the same `chunk` is written to the same location. These 
> terms have different meanings.

When a new data point is added to the source series (e.g., (`ts`, `value`)):

1. Check `downstream_key` and get `aggregator`,`bucket_duration`,`alignment` , 
`latest_bucket_index` via [Downstream Key Datameta](#Downstream Key Datameta).
2. Calculate `bucket_index`: `(ts-align)//bucket_duration`, and aligned time 
`ts'`: `ts' = bucket_index*bucket_duration+align`
   1. `bucket_index` > `latest_bucket_index`: Directly add a new data point 
(`ts'`,`value`) downstream.
   2. `bucket_index` = `latest_bucket_index`:  Most frequent case, see [Quickly 
Update Latest Bucket](#Quickly-Update-Latest-Bucket).
   3. `bucket_index` < `latest_bucket_index`:
      1. No need to fetch from source: e.g., when aggregation type is `MIN` and 
new value > current value.
      2. Need to fetch all bucket data from source and recalculate: e.g., when 
aggregation type is `AVG`.

3. Write result to `downstream_key` atomically, consistent with 
[Writing](#writing).

#### Quickly Update Latest Bucket

This is the most common case for downstream time series data writes, 
corresponding to the "update" mode in [Optimizations for Compressed Data 
Writes](#Optimizations-for-Compressed-Data-Writes). The auxiliary information 
in [Downstream Key Datameta](#Downstream-Key-Datameta) helps determine updated 
values without fetching from source, reducing I/O and CPU since 
`bucket_duration` may be much larger than `chunk_size`.

Specifically, we store current bucket statistics for different aggregation 
types:

| Aggregation type | Stored Information                     | Update Logic      
                                           |
| ---------------- | -------------------------------------- | 
------------------------------------------------------------ |
| `AVG`            | Data point count `count`               | 
`update_avg=(cur_avg_value*count+new_value)/(count+1)`       |
| `SUM`            | null                                   | 
`new_sum=cur_sum+new_value`                                  |
| `MIN`            | Current min `cur_min_value`            | 
`new_min_value=new_value<cur_min_value?new_value:cur_min_value` |
| `MAX`            | Current max `cur_max_value`            | Similar to `MIN`  
                                           |
| `RANGE`          | `cur_min_value`and`cur_max_value`       | Update 
`cur_min_value`和`cur_max_value`,then`new_range = cur_max_value - cur_min_value` 
|
| `FIRST`          | First timestamp `first_timestamp`      | Update only if 
`new_timestamp<first_timestamp`               |
| `LAST`           | Latest timestamp `last_timestamp`      | Similar to 
`FIRST`                                           |
| `std.p`          | Bucket mean and count `mean`, `count` | See [Welford's 
algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm)
 |
| `std.s`          | Bucket mean and count `mean`, `count` | Same as above      
                                          |
| `var.p`          | Bucket mean and count `mean`, `count` | Same as above      
                                          |
| `var.s`          | Bucket mean and count `mean`, `count` | Same as above      
                                          |

### Retention ( `[RETENTION retentionPeriod] `)

Use `RocketDB` compact filters to delete expired data. When all data in a 
`chunk` expires, the `chunk` can be deleted (when 
`chunk_last_timestamp+retention<lastTimestamp`). First delete expired `chunk 
datameta`, then the corresponding `chunk`.

Queries automatically filter data outside the retention window.



## Command Support Status

This section outlines the core commands.

| [`RedisTimeSeries`](https://github.com/RedisTimeSeries/RedisTimeSeries) 
Commands |             Optional arguments or output fields              | 
Supported? |
| :----------------------------------------------------------: | 
:----------------------------------------------------------: | :--------: |
|                       `TS.ADD/TS.MADD`                       |                
 `[DUPLICATE_POLICY policy] `                 |     ✅      |
|                         `TS.CREATE`                          |                
 `[ON_DUPLICATE policy_ovr]`                  |     ✅      |
|                                                              |                
 `[LABELS [label value ...]]`                 |     ✅      |
|                                                              |                
`[RETENTION retentionPeriod] `                |     ✅      |
|                                                              |                
        ` [ENCODING] `                        |     ✅      |
|                                                              |                
     `[CHUNK_SIZE size] `                     |     ❓      |
|                                                              |                
 `[LABELS [label value ...]]`                 |     ✅      |
|                           `TS.DEL`                           |                
                                              |     ✅      |
|                     `TS.MRANGE/TS.RANGE`                     |                
    `[FILTER_BY_TS ts...]`                    |     ✅      |
|                                                              |                
 `[FILTER_BY_VALUE min max]`                  |     ✅      |
|                                                              |                
       `[COUNT count]`                        |     ✅      |
|                                                              | `[[ALIGN 
align] AGGREGATION aggregator bucketDuration [BUCKETTIMESTAMP bt] [EMPTY]]` |   
  ✅      |
|                                                              |                
    `FILTER filterExpr...`                    |     ✅      |
|                       `TS.CREATERULE`                        |           
`AGGREGATION aggregator bucketDuration`            |     ✅      |
|                                                              |                
     ` [alignTimestamp]`                      |     ✅      |
|                    `TS.INCRBY/TS.DECRBY`                     |                
                                              |     ✅      |
|                          `TS.INFO`                           |                
        `memoryUsage`                         |     ❓      |
|                                                              |                
       `firstTimestamp`                       |     ✅      |
|                                                              |                
         `chunkSize`                          |     ❓      |
|                                                              |                
          `other...`                          |     ✅      |

❓indicates fields that may be incompatible or inconsistent with `Redis` 
implementation.

### `TS.INFO` Field Compatibility

Some fields may not align with our design:

- `chunkSize`: In `Redis`, this is a fixed byte size specified at creation.
- `memoryUsage`: This metric might be challenging to calculate and maintain 
accurately. As an alternative, we could potentially use `diskUsage` instead, 
with support from the `chunk datameta` structure ?

GitHub link: https://github.com/apache/kvrocks/discussions/3044

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to