GitHub user yezhizi edited a discussion: TimeSeries Proposal


This is an [OSPP 2025 
project](https://summer-ospp.ac.cn/org/prodetail/259430064?list=pro). I 
sincerely appreciate and welcome feedback and suggestions from all community 
members to help me improve!

## Introduction

RedisTimeSeries is a redis module used to operate and query time series data, 
giving redis basic time series database capabilities.

As Apache Kvrocks is characterized by being compatible with the Redis protocol 
and commands, we also hope to provide temporal data processing capabilities 
that are compatible with RedisTimeSeries.

---

## Data Structure Design

Most of the design is inspired by [beihao's 
proposal](https://beihao-rebecca.notion.site/TimeSeries-Proposal-6c5d2f9bc0624a50a209cb8f517109a8)
 and [PR #2573](https://github.com/apache/kvrocks/pull/2573), but I've added 
many details and made significant changes to the core data storage approach.

### 1. `Datameta`

```plaintext
        
+----------+------------+-----------+-------------------+-------------------+-------------------+
key =>  |  flags   |  expire    |  version  |size(totalSamples) |     
memoryUsage   |   firstTimestamp  |
        | (1byte)  | (Ebyte)    |  (8byte)  |       (Sbyte)     |      (8byte)  
    |      (8byte)      |
        
+----------+------------+-----------+-------------------+-------------------+-------------------+
        
+----------------+----------------+----------------+----------------+----------------+
        | lastTimestamp  | retentionTime  |   chunkCount   |    chunkSize   |   
 chunkType   |
        |    (8byte)     |    (8byte)     |    (8byte)     |    (8byte)     |   
 (1byte)     |
        
+----------------+----------------+----------------+----------------+----------------+
        
+----------------+----------------+----------------+--------***--------+--------***--------+
        |duplicatePolicy | sourceKey_size |   sourceKey    
|downstream_keysize1|  downstream_key1  |...
        |    (1byte)     |   (4byte)      |    (Xbyte)     |   (4byte)         
|   (Nbyte)         |...
        
+----------------+----------------+----------------+--------***--------+--------***--------+
```

- Most fields in `datameta` align with 
[`TS.INFO`](https://redis.io/commands/ts.info/), except :

  - `rules` field, which is stored in a separate `subkey` (see [Downstream Key 
Datameta](#Downstream-Key-Datameta)). 

  - `label` field, a label is a (`label_key`, `label_value`) pair, where 
both`label_key` and `label_value` are strings of arbitrary length. All labels 
can be retrieved using `TS.INFO` and modified using `TS.ALTER`. So it might be 
better to store the labels in a separate subkey. See [Label Index](#Label-Index)
- `downstream_key` is used to index downstream time series (see [Downstream Key 
Datameta](#Downstream-Key-Datameta)). Since there may be multiple downstream 
sequences, this section may contain the keys of multiple downstream sequences.

### 2. Primary Data Storage (Time Chunks)

This section explains how timestamp and value data are stored. In 
RedisTimeSeries, both compressed and uncompressed storage methods are 
supported. Below is the corresponding design in KvRocks:

#### Uncompressed Chunk Type

`Chunk datameta`:

```
                        
+-----------------+-----------------+-----------------+-----------+
key|version|chunk_id => | first_timestamp |   end_timestamp |   count         | 
 bytes    |
                        |   (8byte)       |   (8byte)       |   (8byte)       | 
(8byte)   |
                        
+-----------------+-----------------+-----------------+-----------+
```

Data storage (requires a new `Column Family`, e.g., `TimeSeries`):

```
                        
+-------------+-------------+-------------+-------------+
key|version|chunk_id => |  timestamp1 |    value1   |  timestamp2 |    value2   
|...
                        |   (8byte)   |   (8byte)   |   (8byte)   |   (8byte)   
|...
                        
+-------------+-------------+-------------+-------------+
```

Uncompressed chunks store raw timestamps and values as `uint64` and `double` 
types, respectively.

#### Compressed Chunk Type

`Chunk datameta`:

```
                        
+-----------------+-----------------+-----------------+-----------+
key|version|chunk_id => | first_timestamp |   end_timestamp |   count         | 
 bytes    |
                        |   (8byte)       |   (8byte)       |   (8byte)       | 
(8byte)   |
                        
+-----------------+-----------------+-----------------+-----------+
                        
+-----------------+-----------------+---------------------+-----------------+
                        |   base_value    | prev_timestamp  | 
prev_timestampDelta |  prev_value     |
                        |   (8byte)       |  (8byte)        |   (8byte)         
  |   (8byte)       |
                        
+-----------------+-----------------+---------------------+-----------------+
```

Data storage (requires a new `Column Family`, e.g., `TimeSeries`):

```
                        
+-------------+-------------+-------------+-------------+
key|version|chunk_id => |  timestamp1 |    value1   |  timestamp2 |    value2   
|...
                        |   (Xbyte)   |   (Ybyte)   |   (Zbyte)   |   (Kbyte)   
|...
                        
+-------------+-------------+-------------+-------------+
```

- `chunk_id = timestamp // chunk_size` (e.g., `chunk_size=60000ms` for 
minute-level chunks)
- `chunk datameta` assists in chunk queries and writes and supports the 
`Chunks` field in `TS.INFO` (when `DEBUG` is specified).
- Compression algorithm implementation (see 
[RedisTimeSeries/src/gorilla.c](https://github.com/RedisTimeSeries/RedisTimeSeries/blob/master/src/gorilla.c)):
  - `timestamp`: Delta-of-delta encoding.
  - `value`:  [Facebook 
Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).


> This design is similar to 
> [`RedisTimeSeries`](https://github.com/RedisTimeSeries/RedisTimeSeries), but 
> with a key difference: `RedisTimeSeries` uses fixed-size memory chunks and 
> always appends data to the head chunk. Here, we use fixed-time-range chunks, 
> which align better with `Kvrocks`'s key-value storage for efficient 
> timestamp-based indexing.
>
> Compression might not be mandatory initially, but as noted in `influxDB`, it 
> helps ["reduce storage space and disk IO when 
> querying"](https://docs.influxdata.com/influxdb/v1/concepts/storage_engine/). 
> We may start with uncompressed chunks and add compression later.

### 3. Secondary Indexes

#### Label Index

```
               
+------------------+----------------+------------------+------------------+
key|version => |  label_key1_size |  label_key1    |label_value1_size |  
label_value1    |
               |     (4byte)      |   (Xbyte)      |   (4byte)        |     
(Ybyte)      | 
               
+------------------+----------------+------------------+------------------+
               
+------------------+----------------+------------------+------------------+
               |  label_key2_size |  label_key2    |label_value2_size |  
label_value2    |...
               |     (4byte)      |   (Zbyte)      |   (4byte)        |     
(Sbyte)      |...
               
+------------------+----------------+------------------+------------------+
```

- Used to retrieve all labels via `TS.INFO`.
- Supports modifying labels with `TS.ALTER`

#### Reverse Index

    
+------------------+----------------+------------------+------------------+------------------+
    |  label_key_size  |  label_key     |label_value_size  |  label_value     | 
      key        |  => null
    |     (4byte)      |   (Xbyte)      |   (4byte)        |     (Ybyte)      | 
    (Kbyte)      | 
    
+------------------+----------------+------------------+------------------+------------------+

- This is used to filter by labels in `TS.MRANGE`
- This may need to be placed in a new column family.

**I'm not sure if this is a good approach for indexing by labels. More ideas 
are welcome!**

#### Downstream Key Datameta

> In `Redis`, you can use `TS.CREATERULE` to create one or more "destination 
> timeseries" for compaction from a "source timeseries" (see 
> [here](https://redis.io/commands/ts.createrule/)). Here we use "downstream" 
> to represent the destination timeseries for better semantics.

```
                              
+-------------+----------------+-------------+---------------------+
key|version|downstream_key => |  aggregator |bucket_duration |  alignment  | 
latest_bucket_index |
                              |   (1byte)   |   (8byte)      |   (8byte)   |    
   (8byte)       | 
                              
+-------------+----------------+-------------+---------------------+
                              +--------------+---------------+
                              | last_offset  |  auxinfo      |
                              |  (8Byte)     |   (XByte)     |
                              +--------------+---------------+
```

- Supports the `rules` field in `TS.INFO`.
- `aggregator` is an enum for aggregation methods supported by 
[`TS.CREATERULE`](https://redis.io/docs/latest/commands/ts.createrule/). The 
meanings of `aggregator`, `bucket_duration`, and `alignment` are defined in the 
`Redis` command.
- `latest_bucket_index`helps quickly determine if it's the latest `bucket` when 
adding data downstream.
- `last_offset`helps compressed chunk types quickly locate the last data point 
for efficient updates.
- `auxInfo`: Supports various aggregation types in `Redis` and helps downstream 
series update quickly with lower CPU overhead(see [Quickly Update Latest 
Bucket](#Quickly-Update-Latest-Bucket)).


## Key Operation Workflows

### Writing

```
TS.ADD key timestamp value 
  [RETENTION retentionPeriod] 
  [ENCODING <COMPRESSED|UNCOMPRESSED>] 
  [CHUNK_SIZE size] 
  [DUPLICATE_POLICY policy] 
  [ON_DUPLICATE policy_ovr] 
  [IGNORE ignoreMaxTimediff ignoreMaxValDiff] 
  [LABELS [label value ...]]
```

1. Get `datameta` and its `lastTimestamp` field.
2. Compute `chunk_id` from `timestamp` using: `chunk_id = timestamp // 
chunk_size`
3. Determine if it's the latest data:
   1. Yes( `lastTimestamp<timestamp`): Append data to the corresponding 
chunk(`chunk_id`) . For compressed types, this can quickly compute compressed 
values using `chunk datameta` and append to chunk end, reducing CPU usage; 
uncompressed types insert directly.
   2. No(`lastTimestamp>timestamp`): For uncompressed types, use binary search 
to locate insertion position; compressed types need to reconstruct the entire 
chunk.
4. Pass the data to the downstream series. See [below]().
5. Update `datameta`and`chunk datameta`.

#### Optimizations for Compressed Data Writes

There are two distinct patterns for time series writes that require different 
handling. For source series, new data points are typically "appended" to 
chunks, while downstream series primarily involve "updating" the latest data 
points. Compression algorithms (Delta-of-delta encoding and [Facebook 
Gorilla](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf)) are both 
differential-based algorithms. We can implement different approaches tailored 
to these two scenarios.

1. "Append" mode: Updates `prev_timestamp`, `prev_timestampDelta`, `prev_value` 
during writes.

2. "Update" mode: `prev_timestamp` and `prev_value` point to the second-newest 
data, allowing direct updates of the latest data.

- The `last_offset` field in [Downstream Key 
Datameta](#Downstream-Key-Datameta) helps quickly locate the latest data point 
for "update" mode optimizations.
- Source series defaults to "append" mode, downstream to "update" mode.

### Time Range Query (`TS.RANGE`)

```
TS.RANGE ts_key start_ts end_ts [AGGREGATION avg 60000] [FILTER_BY_VALUE 25 40]
```

1. Adjust `start_ts` to `end_ts` to ensure the query range hasn't expired.
2. Compute `chunk_id` range from `start_ts` to `end_ts`.
3. Iterate chunks using `ns_key|version|*`.
4. Return filtered and aggregated(if needed) results.

### Multi-Series Query (`TS.MRANGE`)

```
TS.MRANGE start_ts end_ts FILTER sensor_type=temperature FILTER_BY_VALUE 0 100
```

1. Resolve `ts_key` candidates via label index.  See [Reverse 
Index](#Reverse-Index)
2. Execute parallel `TS.RANGE` on each candidate.
3. Merge results with filtering.

### Passing Data Downstream (`TS.CREATERULE`)

> Here we use the term "bucket" consistently with `Redis`, where data in the 
> same `bucket` will be downsampled into one data point. See 
> [here](https://redis.io/commands/ts.createrule/). `chunk` refers to storage 
> units where data in the same `chunk` is written to the same location. These 
> terms have different meanings.

When a new data point is added to the source series (e.g., (`ts`, `value`)):

1. Check `downstream_key` and get `aggregator`,`bucket_duration`,`alignment` , 
`latest_bucket_index` via [Downstream Key Datameta](#Downstream Key Datameta).
2. Calculate `bucket_index`: `(ts-align)//bucket_duration`, and aligned time 
`ts'`: `ts' = bucket_index*bucket_duration+align`
   1. `bucket_index` > `latest_bucket_index`: Directly add a new data point 
(`ts'`,`value`) downstream.
   2. `bucket_index` = `latest_bucket_index`:  Most frequent case, see [Quickly 
Update Latest Bucket](#Quickly-Update-Latest-Bucket).
   3. `bucket_index` < `latest_bucket_index`:
      1. No need to fetch from source: e.g., when aggregation type is `MIN` and 
new value > current value.
      2. Need to fetch all bucket data from source and recalculate: e.g., when 
aggregation type is `AVG`.

3. Write result to `downstream_key` atomically, consistent with 
[Writing](#writing).

#### Quickly Update Latest Bucket

This is the most common case for downstream time series data writes, 
corresponding to the "update" mode in [Optimizations for Compressed Data 
Writes](#Optimizations-for-Compressed-Data-Writes). The auxiliary information 
in [Downstream Key Datameta](#Downstream-Key-Datameta) helps determine updated 
values without fetching from source, reducing I/O and CPU since 
`bucket_duration` may be much larger than `chunk_size`.

Specifically, we store current bucket statistics for different aggregation 
types:

| Aggregation type | Stored Information                     | Update Logic      
                                           |
| ---------------- | -------------------------------------- | 
------------------------------------------------------------ |
| `AVG`            | Data point count `count`               | 
`update_avg=(cur_avg_value*count+new_value)/(count+1)`       |
| `SUM`            | null                                   | 
`new_sum=cur_sum+new_value`                                  |
| `MIN`            | Current min `cur_min_value`            | 
`new_min_value=new_value<cur_min_value?new_value:cur_min_value` |
| `MAX`            | Current max `cur_max_value`            | Similar to `MIN`  
                                           |
| `RANGE`          | `cur_min_value`和`cur_max_value`       | Update 
`cur_min_value`和`cur_max_value`,then`new_range = cur_max_value - cur_min_value` 
|
| `FIRST`          | First timestamp `first_timestamp`      | Update only if 
`new_timestamp<first_timestamp`               |
| `LAST`           | Latest timestamp `last_timestamp`      | Similar to 
`FIRST`                                           |
| `std.p`          | Bucket mean and count `mean`, `count` | See [Welford's 
algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm)
 |
| `std.s`          | Bucket mean and count `mean`, `count` | Same as above      
                                          |
| `var.p`          | Bucket mean and count `mean`, `count` | Same as above      
                                          |
| `var.s`          | Bucket mean and count `mean`, `count` | Same as above      
                                          |

### Retention ( `[RETENTION retentionPeriod] `)

Use `RocketDB` compact filters to delete expired data. When all data in a 
`chunk` expires, the `chunk` can be deleted (when 
`chunk_last_timestamp+retention<lastTimestamp`). First delete expired `chunk 
datameta`, then the corresponding `chunk`.

Queries automatically filter data outside the retention window.

Note: `TS.INFO` needs to show the oldest unexpired `firstTimstamp`. This field 
in `datameta` isn't updated during data addition/expiration, but retrieved when 
`TS.INFO` is explicitly called.

## Command Support Status

This section outlines the core commands.

| [`RedisTimeSeries`](https://github.com/RedisTimeSeries/RedisTimeSeries) 
Commands |             Optional arguments or output fields              | 
Supported? |
| :----------------------------------------------------------: | 
:----------------------------------------------------------: | :--------: |
|                       `TS.ADD/TS.MADD`                       |                
 `[DUPLICATE_POLICY policy] `                 |     ✅      |
|                         `TS.CREATE`                          |                
 `[ON_DUPLICATE policy_ovr]`                  |     ✅      |
|                                                              |                
 `[LABELS [label value ...]]`                 |     ✅      |
|                                                              |                
`[RETENTION retentionPeriod] `                |     ✅      |
|                                                              |           ` 
[ENCODING] `             |     ✅      |
|                                                              |                
     `[CHUNK_SIZE size] `                     |     ❓      |
|                                                              |                
 `[LABELS [label value ...]]`                 |     ✅      |
|                           `TS.DEL`                           |                
                                              |     ✅      |
|                     `TS.MRANGE/TS.RANGE`                     |                
    `[FILTER_BY_TS ts...]`                    |     ✅      |
|                                                              |                
 `[FILTER_BY_VALUE min max]`                  |     ✅      |
|                                                              |                
       `[COUNT count]`                        |     ✅      |
|                                                              | `[[ALIGN 
align] AGGREGATION aggregator bucketDuration [BUCKETTIMESTAMP bt] [EMPTY]]` |   
  ✅      |
|                                                              |                
    `FILTER filterExpr...`                    |     ✅      |
|                       `TS.CREATERULE`                        |           
`AGGREGATION aggregator bucketDuration`            |     ✅      |
|                                                              |                
     ` [alignTimestamp]`                      |     ✅      |
|                    `TS.INCRBY/TS.DECRBY`                     |                
                                              |     ✅      |
|                          `TS.INFO`                           |                
        `memoryUsage`                         |     ❓      |
|                                                              |                
       `firstTimestamp`                       |     ✅      |
|                                                              |                
         `chunkSize`                          |     ❓      |
|                                                              |                
          `other...`                          |     ✅      |

❓indicates fields that may be incompatible or inconsistent with `Redis` 
implementation.

### `TS.INFO` Field Compatibility

Some fields may not align with our design:

- `chunkSize`: In `Redis`, this is a fixed byte size specified at creation.
- `memoryUsage`: This metric might be challenging to calculate and maintain 
accurately. As an alternative, we could potentially use `diskUsage` instead, 
with support from the `chunk datameta` structure ?


GitHub link: https://github.com/apache/kvrocks/discussions/3044

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to