This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 76b947f56b Add built-in M4 UDF (#7755)
76b947f56b is described below

commit 76b947f56becf1e2c80fca48d4dac1d5a8e656fd
Author: Rui,Lei <[email protected]>
AuthorDate: Sun Oct 30 23:31:20 2022 +0800

    Add built-in M4 UDF (#7755)
---
 docs/UserGuide/Query-Data/Select-Expression.md     | 155 ++++++++++
 docs/UserGuide/UDF-Library/M4.md                   | 101 ++-----
 docs/zh/UserGuide/Query-Data/Select-Expression.md  | 156 +++++++++++
 docs/zh/UserGuide/UDF-Library/M4.md                | 101 ++-----
 .../BuiltinTimeSeriesGeneratingFunctionEnum.java   |   3 +-
 .../org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java   | 283 +++++++++++++++++++
 .../BuiltinTimeSeriesGeneratingFunction.java       |   3 +-
 .../apache/iotdb/commons/udf/builtin/UDTFM4.java   | 311 +++++++++++++++++++++
 8 files changed, 944 insertions(+), 169 deletions(-)

diff --git a/docs/UserGuide/Query-Data/Select-Expression.md 
b/docs/UserGuide/Query-Data/Select-Expression.md
index 927519e468..8df849d69e 100644
--- a/docs/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/UserGuide/Query-Data/Select-Expression.md
@@ -842,6 +842,161 @@ Total line number = 10
 It costs 0.041s
 ```
 
+### M4 Function
+
+M4 is used to sample the `first, last, bottom, top` points for each sliding 
window:
+
+-   the first point is the point with the **m**inimal time;
+-   the last point is the point with the **m**aximal time;
+-   the bottom point is the point with the **m**inimal value (if there are 
multiple such points, M4 returns one of them);
+-   the top point is the point with the **m**aximal value (if there are 
multiple such points, M4 returns one of them).
+
+<img 
src="https://user-images.githubusercontent.com/33376433/198178733-a0919d17-0663-4672-9c4f-1efad6f463c2.png";
 alt="image" style="zoom:50%;" />
+
+| Function Name | Allowed Input Series Data Types | Attributes                 
                                  | Output Series Data Type        | Series 
Data Type  Description                                |
+| ------------- | ------------------------------- | 
------------------------------------------------------------ | 
------------------------------ | 
------------------------------------------------------------ |
+| M4            | INT32 / INT64 / FLOAT / DOUBLE  | Different attributes used 
by the size window and the time window. The size window uses attributes 
`windowSize` and `slidingStep`. The time window uses attributes `timeInterval`, 
`slidingStep`, `displayWindowBegin`, and `displayWindowEnd`. More details see 
below. | INT32 / INT64 / FLOAT / DOUBLE | Returns the `first, last, bottom, 
top` points in each sliding window. M4 sorts and deduplicates the aggregated 
points within the window before [...]
+
+#### Attributes
+
+**(1) Attributes for the size window:**
+
++ `windowSize`: The number of points in a window. Int data type. **Required**.
++ `slidingStep`: Slide a window by the number of points. Int data type. 
Optional. If not set, default to the same as `windowSize`.
+
+<img 
src="https://user-images.githubusercontent.com/33376433/198181449-00d563c8-7bce-4ecd-a031-ec120ca42c3f.png";
 alt="image" style="zoom: 50%;" />
+
+*(image source: 
https://iotdb.apache.org/UserGuide/Master/Process-Data/UDF-User-Defined-Function.html#udtf-user-defined-timeseries-generating-function)*
+
+**(2) Attributes for the time window:**
+
++ `timeInterval`: The time interval length of a window. Long data type. 
**Required**.
++ `slidingStep`: Slide a window by the time length. Long data type. Optional. 
If not set, default to the same as `timeInterval`.
++ `displayWindowBegin`: The starting position of the window (included). Long 
data type. Optional. If not set, default to Long.MIN_VALUE, meaning using the 
time of the first data point of the input time series as the starting position 
of the window.
++ `displayWindowEnd`: End time limit (excluded, essentially playing the same 
role as `WHERE time < displayWindowEnd`). Long data type. Optional. If not set, 
default to Long.MAX_VALUE, meaning there is no additional end time limit other 
than the end of the input time series itself.
+
+<img 
src="https://user-images.githubusercontent.com/33376433/198183015-93b56644-3330-4acf-ae9e-d718a02b5f4c.png";
 alt="groupBy window" style="zoom: 67%;" />
+
+*(image source: 
https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#downsampling-aggregate-query)*
+
+#### Examples
+
+Input series:
+
+```sql
++-----------------------------+------------------+
+|                         Time|root.vehicle.d1.s1|
++-----------------------------+------------------+
+|1970-01-01T08:00:00.001+08:00|               5.0|
+|1970-01-01T08:00:00.002+08:00|              15.0|
+|1970-01-01T08:00:00.005+08:00|              10.0|
+|1970-01-01T08:00:00.008+08:00|               8.0|
+|1970-01-01T08:00:00.010+08:00|              30.0|
+|1970-01-01T08:00:00.020+08:00|              20.0|
+|1970-01-01T08:00:00.025+08:00|               8.0|
+|1970-01-01T08:00:00.027+08:00|              20.0|
+|1970-01-01T08:00:00.030+08:00|              40.0|
+|1970-01-01T08:00:00.033+08:00|               9.0|
+|1970-01-01T08:00:00.035+08:00|              10.0|
+|1970-01-01T08:00:00.040+08:00|              20.0|
+|1970-01-01T08:00:00.045+08:00|              30.0|
+|1970-01-01T08:00:00.052+08:00|               8.0|
+|1970-01-01T08:00:00.054+08:00|              18.0|
++-----------------------------+------------------+
+```
+
+SQL for query1:
+
+```sql
+select 
M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') 
from root.vehicle.d1
+```
+
+Output1:
+
+```sql
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "timeInterval"="25", 
"displayWindowBegin"="0", "displayWindowEnd"="100")|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                                
                                            5.0|
+|1970-01-01T08:00:00.010+08:00|                                                
                                           30.0|
+|1970-01-01T08:00:00.020+08:00|                                                
                                           20.0|
+|1970-01-01T08:00:00.025+08:00|                                                
                                            8.0|
+|1970-01-01T08:00:00.030+08:00|                                                
                                           40.0|
+|1970-01-01T08:00:00.045+08:00|                                                
                                           30.0|
+|1970-01-01T08:00:00.052+08:00|                                                
                                            8.0|
+|1970-01-01T08:00:00.054+08:00|                                                
                                           18.0|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+Total line number = 8
+```
+
+SQL for query2:
+
+```sql
+select M4(s1,'windowSize'='10') from root.vehicle.d1
+```
+
+Output2:
+
+```sql
++-----------------------------+-----------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "windowSize"="10")|
++-----------------------------+-----------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                      5.0|
+|1970-01-01T08:00:00.030+08:00|                                     40.0|
+|1970-01-01T08:00:00.033+08:00|                                      9.0|
+|1970-01-01T08:00:00.035+08:00|                                     10.0|
+|1970-01-01T08:00:00.045+08:00|                                     30.0|
+|1970-01-01T08:00:00.052+08:00|                                      8.0|
+|1970-01-01T08:00:00.054+08:00|                                     18.0|
++-----------------------------+-----------------------------------------+
+Total line number = 7
+```
+
+
+
+#### Suggested Use Cases
+
+**(1) Use Case: Extreme-point-preserving downsampling**
+
+As M4 aggregation selects the `first, last, bottom, top` points for each 
window, M4 usually preserves extreme points and thus patterns better than other 
downsampling methods such as Piecewise Aggregate Approximation (PAA). 
Therefore, if you want to downsample the time series while preserving extreme 
points, you may give M4 a try.
+
+**(2) Use case: Error-free two-color line chart visualization of large-scale 
time series using reduced data**
+
+Refer to paper: ["M4: A Visualization-Oriented Time Series Data 
Aggregation"](http://www.vldb.org/pvldb/vol7/p797-jugel.pdf).
+
+Given a chart of `w*h` pixels, suppose the visualization time range of the 
time series root.vehicle.d1.s1 is `[tqs,tqe)`(in this use case please extend 
tqe to make sure (tqe-tqs) is divisible by w), the points that fall within the  
`i`-th time span `Ii=[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i)` will be drawn 
on the `i`-th pixel column, i=1,2,...,w.
+
+Therefore, from a visualization-driven perspective, use the sql: `"select 
M4(s1,'timeInterval'='(tqe-tqs)/w','displayWindowBegin'='tqs','displayWindowEnd'='tqe')
 from root.vehicle.d1"` to sample the `first, last, bottom, top` points for 
each time span. The resulting series has no more than `4*w` points, a big 
reduction compared to the original large-scale time series. The line chart 
drawn from the reduced data is identical that to that drawn from the original 
data (pixel-level consistency).
+
+
+
+#### Comparison with Other SQL
+
+| SQL                                                          | Whether 
support M4 aggregation                               | Sliding window type      
                         | Example                                              
        | Docs                                                         |
+| ------------------------------------------------------------ | 
------------------------------------------------------------ | 
------------------------------------------------- | 
------------------------------------------------------------ | 
------------------------------------------------------------ |
+| 1. native built-in aggregate functions with Group By clause  | No. Lack 
`BOTTOM_TIME` and `TOP_TIME`, which are respectively the time of the points 
that have the mininum and maximum value. | Time Window                          
             | `select count(status), max_value(temperature) from 
root.ln.wf01.wt01 group by ([2017-11-01 00:00:00, 2017-11-07 23:00:00), 3h, 
1d)` | 
https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#built-in-aggregate-functions
 <br />http [...]
+| 2. EQUAL_SIZE_BUCKET_M4_SAMPLE (built-in UDF)                | Yes*          
                                               | Size Window. `windowSize = 
4*(int)(1/proportion)` | `select equal_size_bucket_m4_sample(temperature, 
'proportion'='0.1') as M4_sample from root.ln.wf01.wt01` | 
https://iotdb.apache.org/UserGuide/Master/Query-Data/Select-Expression.html#time-series-generating-functions
 |
+| **3. M4 (built-in UDF)**                                     | Yes*          
                                               | Size Window, Time Window       
                   | (1) Size Window: `select M4(s1,'windowSize'='10') from 
root.vehicle.d1` <br />(2) Time Window: `select 
M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') 
from root.vehicle.d1` | refer to this doc                                       
     |
+| 4. extend native built-in aggregate functions with Group By clause to 
support M4 aggregation | not implemented                                        
      | not implemented                                   | not implemented     
                                         | not implemented                      
                        |
+
+Further compare `EQUAL_SIZE_BUCKET_M4_SAMPLE` and `M4`:
+
+**(1) Different M4 aggregation definition:**
+
+For each window, `EQUAL_SIZE_BUCKET_M4_SAMPLE` extracts the top and bottom 
points from points **EXCLUDING** the first and last points.
+
+In contrast, `M4` extracts the top and bottom points from points **INCLUDING** 
the first and last points, which is more consistent with the semantics of 
`max_value` and `min_value` stored in metadata.
+
+It is worth noting that both functions sort and deduplicate the aggregated 
points in a window before outputting them to the collectors.
+
+**(2) Different sliding windows:** 
+
+`EQUAL_SIZE_BUCKET_M4_SAMPLE` uses SlidingSizeWindowAccessStrategy and 
**indirectly** controls sliding window size by sampling proportion. The 
conversion formula is `windowSize = 4*(int)(1/proportion)`. 
+
+`M4` supports two types of sliding window: SlidingSizeWindowAccessStrategy and 
SlidingTimeWindowAccessStrategy. `M4` **directly** controls the window point 
size or time length using corresponding parameters.
+
+
+
 ### JEXL Function
 
 Java Expression Language (JEXL) is an expression language engine. We use JEXL 
to extend UDFs, which are implemented on the command line with simple lambda 
expressions. See the link for [operators supported in jexl lambda 
expressions](https://commons.apache.org/proper/commons-jexl/apidocs/org/apache/commons/jexl3/package-summary.html#customization).
diff --git a/docs/UserGuide/UDF-Library/M4.md b/docs/UserGuide/UDF-Library/M4.md
index 420cefde3c..c2934f3d0b 100644
--- a/docs/UserGuide/UDF-Library/M4.md
+++ b/docs/UserGuide/UDF-Library/M4.md
@@ -1,92 +1,27 @@
 <!--
 
-​    Licensed to the Apache Software Foundation (ASF) under one
-​    or more contributor license agreements.  See the NOTICE file
-​    distributed with this work for additional information
-​    regarding copyright ownership.  The ASF licenses this file
-​    to you under the Apache License, Version 2.0 (the
-​    "License"); you may not use this file except in compliance
-​    with the License.  You may obtain a copy of the License at
-​    
-​        http://www.apache.org/licenses/LICENSE-2.0
-​    
-​    Unless required by applicable law or agreed to in writing,
-​    software distributed under the License is distributed on an
-​    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-​    KIND, either express or implied.  See the License for the
-​    specific language governing permissions and limitations
-​    under the License.
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
 
 -->
 
+
 # M4
 
 ## M4
 
-### Usage
-
-This function is used to execute the M4 aggregation query using the MAC 
(merging all chunks) approach.
-
-**Name:** M4
-
-**Input Series:** Only supports a single input sequence, the type is INT32 / 
INT64 / FLOAT / DOUBLE
-
-**Parameters:**
-
-+ `tqs`: The start time (included) of the query time range.
-+ `tqe`: The end time (excluded) of the query time range.
-+ `w`: The number of time spans in the M4 aggregation.
-
-**Output Series:** The first, last, bottom, and top points in each time span
-[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i), i=1,...,w.
-
-**Note:**
-+ This function is right now only supported in the branch 
`research/M4-visualization` right now.
-+ You need to make sure (tqe-tqs) is divisible by w.
-+ You need to add `time>=tqs and time<tqe` in the where clause.
-
-### Examples
-
-Input series:
-
-```
-+-----------------------------+------------------+
-|                         Time|root.vehicle.d0.s0|
-+-----------------------------+------------------+
-|1970-01-01T08:00:00.001+08:00|               5.0|
-|1970-01-01T08:00:00.002+08:00|              15.0|
-|1970-01-01T08:00:00.005+08:00|              10.0|
-|1970-01-01T08:00:00.008+08:00|               8.0|
-|1970-01-01T08:00:00.010+08:00|              20.0|
-|1970-01-01T08:00:00.020+08:00|              20.0|
-|1970-01-01T08:00:00.025+08:00|               8.0|
-|1970-01-01T08:00:00.027+08:00|              20.0|
-|1970-01-01T08:00:00.030+08:00|              40.0|
-|1970-01-01T08:00:00.033+08:00|               9.0|
-|1970-01-01T08:00:00.035+08:00|              10.0|
-|1970-01-01T08:00:00.040+08:00|              20.0|
-|1970-01-01T08:00:00.045+08:00|              30.0|
-|1970-01-01T08:00:00.052+08:00|               8.0|
-|1970-01-01T08:00:00.054+08:00|              18.0|
-|1970-01-01T08:00:00.120+08:00|               8.0|
-+-----------------------------+------------------+
-```
-
-SQL for query:
-
-```sql
-select M4(s0,'tqs'='0','tqe'='100','w'='4') from root.vehicle.d0 where time>=0 
and time<100
-```
-
-Output:
-
-```
-+-----------------------------+----------------------------------------------------------------------------------+
-|                         Time|                           
M4(root.vehicle.d0.s0, "tqs"="0", "tqe"="100", "w"="4")|
-+-----------------------------+----------------------------------------------------------------------------------+
-|1970-01-01T08:00:00.000+08:00|  FirstPoint=(1,5.0), LastPoint=(20,20.0), 
BottomPoint=(1,5.0), TopPoint=(10,20.0)|
-|1970-01-01T08:00:00.025+08:00|FirstPoint=(25,8.0), LastPoint=(45,30.0), 
BottomPoint=(25,8.0), TopPoint=(30,40.0)|
-|1970-01-01T08:00:00.050+08:00|FirstPoint=(52,8.0), LastPoint=(54,18.0), 
BottomPoint=(52,8.0), TopPoint=(54,18.0)|
-|1970-01-01T08:00:00.075+08:00|                                                
                             empty|
-+-----------------------------+----------------------------------------------------------------------------------+
-```
\ No newline at end of file
+The documentation of M4 has been removed to [Query Data->Select 
Expression->Time Series Generating 
Functions](../Query-Data/Select-Expression.md).
diff --git a/docs/zh/UserGuide/Query-Data/Select-Expression.md 
b/docs/zh/UserGuide/Query-Data/Select-Expression.md
index 4a3af92bc2..2db8fc662d 100644
--- a/docs/zh/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/zh/UserGuide/Query-Data/Select-Expression.md
@@ -843,7 +843,163 @@ Total line number = 10
 It costs 0.041s
 ```
 
+### M4函数
+
+M4用于在窗口内采样第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`):
+
+-   第一个点是拥有这个窗口内最小时间戳的点;
+-   最后一个点是拥有这个窗口内最大时间戳的点;
+-   最小值点是拥有这个窗口内最小值的点(如果有多个这样的点,M4只返回其中一个);
+-   最大值点是拥有这个窗口内最大值的点(如果有多个这样的点,M4只返回其中一个)。
+
+<img 
src="https://user-images.githubusercontent.com/33376433/198178733-a0919d17-0663-4672-9c4f-1efad6f463c2.png";
 alt="image" style="zoom:50%;" />
+
+| 函数名 | 可接收的输入序列类型           | 属性参数                                            
         | 输出序列类型                   | 功能类型                                      
               |
+| ------ | ------------------------------ | 
------------------------------------------------------------ | 
------------------------------ | 
------------------------------------------------------------ |
+| M4     | INT32 / INT64 / FLOAT / DOUBLE | 
包含固定点数的窗口和滑动时间窗口使用不同的属性参数。包含固定点数的窗口使用属性`windowSize`和`slidingStep`。滑动时间窗口使用属性`timeInterval`、`slidingStep`、`displayWindowBegin`和`displayWindowEnd`。更多细节见下文。
 | INT32 / INT64 / FLOAT / DOUBLE | 
返回每个窗口内的第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`)。在一个窗口内的聚合点输出之前,M4会将它们按照时间戳递增排序并且去重。
 |
+
+#### 属性参数
+
+**(1) 包含固定点数的窗口(SlidingSizeWindowAccessStrategy)使用的属性参数:**
+
++ `windowSize`: 一个窗口内的点数。Int数据类型。必需的属性参数。
++ `slidingStep`: 按照设定的点数来滑动窗口。Int数据类型。可选的属性参数;如果没有设置,默认取值和`windowSize`一样。
+
+<img 
src="https://user-images.githubusercontent.com/33376433/198181449-00d563c8-7bce-4ecd-a031-ec120ca42c3f.png";
 alt="image" style="zoom: 50%;" />
+
+*(图片来源: 
https://iotdb.apache.org/UserGuide/Master/Process-Data/UDF-User-Defined-Function.html#udtf-user-defined-timeseries-generating-function)*
+
+**(2) 滑动时间窗口(SlidingTimeWindowAccessStrategy)使用的属性参数:**
+
++ `timeInterval`: 一个窗口的时间长度。Long数据类型。必需的属性参数。
++ `slidingStep`: 按照设定的时长来滑动窗口。Long数据类型。可选的属性参数;如果没有设置,默认取值和`timeInterval`一样。
++ `displayWindowBegin`: 
窗口滑动的起始时间戳位置(包含在内)。Long数据类型。可选的属性参数;如果没有设置,默认取值为Long.MIN_VALUE,意为使用输入的时间序列的第一个点的时间戳作为窗口滑动的起始时间戳位置。
++ `displayWindowEnd`: 结束时间限制(不包含在内;本质上和`WHERE time < 
displayWindowEnd`起的效果是一样的)。Long数据类型。可选的属性参数;如果没有设置,默认取值为Long.MAX_VALUE,意为除了输入的时间序列自身数据读取完毕之外没有增加额外的结束时间过滤条件限制。
+
+<img 
src="https://user-images.githubusercontent.com/33376433/198183015-93b56644-3330-4acf-ae9e-d718a02b5f4c.png";
 alt="groupBy window" style="zoom: 67%;" />
+
+*(图片来源: 
https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#downsampling-aggregate-query)*
+
+#### 演示
+
+输入的时间序列:
+
+```sql
++-----------------------------+------------------+
+|                         Time|root.vehicle.d1.s1|
++-----------------------------+------------------+
+|1970-01-01T08:00:00.001+08:00|               5.0|
+|1970-01-01T08:00:00.002+08:00|              15.0|
+|1970-01-01T08:00:00.005+08:00|              10.0|
+|1970-01-01T08:00:00.008+08:00|               8.0|
+|1970-01-01T08:00:00.010+08:00|              30.0|
+|1970-01-01T08:00:00.020+08:00|              20.0|
+|1970-01-01T08:00:00.025+08:00|               8.0|
+|1970-01-01T08:00:00.027+08:00|              20.0|
+|1970-01-01T08:00:00.030+08:00|              40.0|
+|1970-01-01T08:00:00.033+08:00|               9.0|
+|1970-01-01T08:00:00.035+08:00|              10.0|
+|1970-01-01T08:00:00.040+08:00|              20.0|
+|1970-01-01T08:00:00.045+08:00|              30.0|
+|1970-01-01T08:00:00.052+08:00|               8.0|
+|1970-01-01T08:00:00.054+08:00|              18.0|
++-----------------------------+------------------+
+```
+
+查询语句1:
+
+```sql
+select 
M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') 
from root.vehicle.d1
+```
+
+输出结果1:
+
+```sql
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "timeInterval"="25", 
"displayWindowBegin"="0", "displayWindowEnd"="100")|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                                
                                            5.0|
+|1970-01-01T08:00:00.010+08:00|                                                
                                           30.0|
+|1970-01-01T08:00:00.020+08:00|                                                
                                           20.0|
+|1970-01-01T08:00:00.025+08:00|                                                
                                            8.0|
+|1970-01-01T08:00:00.030+08:00|                                                
                                           40.0|
+|1970-01-01T08:00:00.045+08:00|                                                
                                           30.0|
+|1970-01-01T08:00:00.052+08:00|                                                
                                            8.0|
+|1970-01-01T08:00:00.054+08:00|                                                
                                           18.0|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+Total line number = 8
+```
+
+查询语句2:
+
+```sql
+select M4(s1,'windowSize'='10') from root.vehicle.d1
+```
+
+输出结果2:
+
+```sql
++-----------------------------+-----------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "windowSize"="10")|
++-----------------------------+-----------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                      5.0|
+|1970-01-01T08:00:00.030+08:00|                                     40.0|
+|1970-01-01T08:00:00.033+08:00|                                      9.0|
+|1970-01-01T08:00:00.035+08:00|                                     10.0|
+|1970-01-01T08:00:00.045+08:00|                                     30.0|
+|1970-01-01T08:00:00.052+08:00|                                      8.0|
+|1970-01-01T08:00:00.054+08:00|                                     18.0|
++-----------------------------+-----------------------------------------+
+Total line number = 7
+```
+
+
+
+#### 推荐的使用场景
+
+**(1) 使用场景:保留极端点的降采样**
+
+由于M4为每个窗口聚合其第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`),因此M4通常保留了极值点,因此比其他下采样方法(如分段聚合近似
 (PAA))能更好地保留模式。如果你想对时间序列进行下采样并且希望保留极值点,你可以试试 M4。
+
+**(2) 使用场景:基于数据缩约的大规模时间序列的零误差双色折线图可视化**
+
+参考论文: ["M4: A Visualization-Oriented Time Series Data 
Aggregation"](http://www.vldb.org/pvldb/vol7/p797-jugel.pdf).
+
+假设屏幕画布的像素宽乘高是`w*h`,假设时间序列root.vehicle.d1.s1要可视化的时间范围是`[tqs,tqe)`(在这个使用场景里面,需要请你自行将tqe自适应调整使得(tqe-tqs)是w的整数倍),那么落在第i个时间跨度`Ii=[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i)`
 内的点将会被画在第i个像素列中,i=1,2,...,w。
+
+于是从可视化驱动的角度出发,使用查询语句:`"select 
M4(s1,'timeInterval'='(tqe-tqs)/w','displayWindowBegin'='tqs','displayWindowEnd'='tqe')
 from 
root.vehicle.d1"`,来采集每个时间跨度内的第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`)。最终结果点数不会超过`4*w`个,使用这些聚合点画出来的折线图与使用原始数据画出来的图在像素级别上是完全一致的。
+
+
+
+#### 和其它SQL的功能比较
+
+| SQL                                               | 是否支持M4聚合                 
                              | 滑动窗口类型                                      | 
示例                                                         | 相关文档               
                                      |
+| ------------------------------------------------- | 
------------------------------------------------------------ | 
------------------------------------------------- | 
------------------------------------------------------------ | 
------------------------------------------------------------ |
+| 1. 带有Group By子句的内置聚合函数                 | 
不支持,缺少`BOTTOM_TIME`和`TOP_TIME`,即缺少最小值点和最大值点的时间戳。 | Time Window                  
                     | `select count(status), max_value(temperature) from 
root.ln.wf01.wt01 group by ([2017-11-01 00:00:00, 2017-11-07 23:00:00), 3h, 
1d)` | 
https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#built-in-aggregate-functions
 <br 
/>https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#downsampling-aggregate-query
 |
+| 2. EQUAL_SIZE_BUCKET_M4_SAMPLE (内置UDF)          | 支持*                        
                                | Size Window. `windowSize = 
4*(int)(1/proportion)` | `select equal_size_bucket_m4_sample(temperature, 
'proportion'='0.1') as M4_sample from root.ln.wf01.wt01` | 
https://iotdb.apache.org/UserGuide/Master/Query-Data/Select-Expression.html#time-series-generating-functions
 |
+| **3. M4 (内置UDF)**                               | 支持*                        
                                | Size Window, Time Window                      
    | (1) Size Window: `select M4(s1,'windowSize'='10') from root.vehicle.d1` 
<br />(2) Time Window: `select 
M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') 
from root.vehicle.d1` | 本文档                                                     
  |
+| 4. 扩展带有Group By子句的内置聚合函数来支持M4聚合 | 未实施                                        
               | 未实施                                            | 未实施           
                                            | 未实施                               
                        |
+
+进一步比较`EQUAL_SIZE_BUCKET_M4_SAMPLE`和`M4`:
+
+**(1) 不同的M4聚合函数定义:**
+
+在每个窗口内,`EQUAL_SIZE_BUCKET_M4_SAMPLE`从排除了第一个点和最后一个点之后剩余的点中提取最小值点和最大值点。
+
+而`M4`则是从窗口内所有点中(包括第一个点和最后一个点)提取最小值点和最大值点,这个定义与元数据中保存的`max_value`和`min_value`的语义更加一致。
+
+值得注意的是,在一个窗口内的聚合点输出之前,`EQUAL_SIZE_BUCKET_M4_SAMPLE`和`M4`都会将它们按照时间戳递增排序并且去重。 
+
+**(2) 不同的滑动窗口:** 
+
+`EQUAL_SIZE_BUCKET_M4_SAMPLE`使用SlidingSizeWindowAccessStrategy,并且通过采样比例(`proportion`)来间接控制窗口点数(`windowSize`),转换公式是`windowSize
 = 4*(int)(1/proportion)`。
+
+`M4`支持两种滑动窗口:SlidingSizeWindowAccessStrategy和SlidingTimeWindowAccessStrategy,并且`M4`通过相应的参数直接控制窗口的点数或者时长。
+
+
+
 ### JEXL自定义函数
+
 Java Expression Language (JEXL) 
是一个表达式语言引擎。我们使用JEXL来扩展UDF,在命令行中,通过简易的lambda表达式来实现UDF。lambda表达式中支持的运算符详见链接 
[JEXL中lambda表达式支持的运算符](https://commons.apache.org/proper/commons-jexl/apidocs/org/apache/commons/jexl3/package-summary.html#customization)
 。
 
 
diff --git a/docs/zh/UserGuide/UDF-Library/M4.md 
b/docs/zh/UserGuide/UDF-Library/M4.md
index ffe6ad8308..67c9838034 100644
--- a/docs/zh/UserGuide/UDF-Library/M4.md
+++ b/docs/zh/UserGuide/UDF-Library/M4.md
@@ -1,21 +1,21 @@
 <!--
 
-​    Licensed to the Apache Software Foundation (ASF) under one
-​    or more contributor license agreements.  See the NOTICE file
-​    distributed with this work for additional information
-​    regarding copyright ownership.  The ASF licenses this file
-​    to you under the Apache License, Version 2.0 (the
-​    "License"); you may not use this file except in compliance
-​    with the License.  You may obtain a copy of the License at
-​    
-​        http://www.apache.org/licenses/LICENSE-2.0
-​    
-​    Unless required by applicable law or agreed to in writing,
-​    software distributed under the License is distributed on an
-​    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-​    KIND, either express or implied.  See the License for the
-​    specific language governing permissions and limitations
-​    under the License.
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
 
 -->
 
@@ -23,71 +23,4 @@
 
 ## M4
 
-### 函数简介
-
-本函数使用 MAC (merging all chunks) 方法执行M4聚合查询。
-
-**函数名:** M4
-
-**输入序列:** 仅支持单个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。
-
-**参数:**
-
-+ `tqs`: 查询的开始时间(含)。
-+ `tqe`: 查询的结束时间(不含)。
-+ `w`: M4聚合中的时间跨度数量。
-
-**输出序列:** 每个时间跨度的首、尾、最小和最大值。
-
-`[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i), i=1,...,w.`
-
-**说明:**
-+ 函数当前仅适用于 `research/M4-visualization` 分支。
-+ 输入参数需确保 `(tqe-tqs)` 是 `w` 的倍数。
-+ 查询时需在 where 语句后添加 `time>=tqs and time<tqe`。
-
-### 使用示例
-
-输入序列:
-
-```
-+-----------------------------+------------------+
-|                         Time|root.vehicle.d0.s0|
-+-----------------------------+------------------+
-|1970-01-01T08:00:00.001+08:00|               5.0|
-|1970-01-01T08:00:00.002+08:00|              15.0|
-|1970-01-01T08:00:00.005+08:00|              10.0|
-|1970-01-01T08:00:00.008+08:00|               8.0|
-|1970-01-01T08:00:00.010+08:00|              20.0|
-|1970-01-01T08:00:00.020+08:00|              20.0|
-|1970-01-01T08:00:00.025+08:00|               8.0|
-|1970-01-01T08:00:00.027+08:00|              20.0|
-|1970-01-01T08:00:00.030+08:00|              40.0|
-|1970-01-01T08:00:00.033+08:00|               9.0|
-|1970-01-01T08:00:00.035+08:00|              10.0|
-|1970-01-01T08:00:00.040+08:00|              20.0|
-|1970-01-01T08:00:00.045+08:00|              30.0|
-|1970-01-01T08:00:00.052+08:00|               8.0|
-|1970-01-01T08:00:00.054+08:00|              18.0|
-|1970-01-01T08:00:00.120+08:00|               8.0|
-+-----------------------------+------------------+
-```
-
-用于查询的 SQL 语句:
-
-```sql
-select M4(s0,'tqs'='0','tqe'='100','w'='4') from root.vehicle.d0 where time>=0 
and time<100
-```
-
-输出序列:
-
-```
-+-----------------------------+----------------------------------------------------------------------------------+
-|                         Time|                           
M4(root.vehicle.d0.s0, "tqs"="0", "tqe"="100", "w"="4")|
-+-----------------------------+----------------------------------------------------------------------------------+
-|1970-01-01T08:00:00.000+08:00|  FirstPoint=(1,5.0), LastPoint=(20,20.0), 
BottomPoint=(1,5.0), TopPoint=(10,20.0)|
-|1970-01-01T08:00:00.025+08:00|FirstPoint=(25,8.0), LastPoint=(45,30.0), 
BottomPoint=(25,8.0), TopPoint=(30,40.0)|
-|1970-01-01T08:00:00.050+08:00|FirstPoint=(52,8.0), LastPoint=(54,18.0), 
BottomPoint=(52,8.0), TopPoint=(54,18.0)|
-|1970-01-01T08:00:00.075+08:00|                                                
                             empty|
-+-----------------------------+----------------------------------------------------------------------------------+
-```
\ No newline at end of file
+M4文档已移至[Query Data->Select Expression->Time Series Generating 
Functions](../Query-Data/Select-Expression.md).
\ No newline at end of file
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
index 944f42f643..fa76bfe333 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
@@ -75,7 +75,8 @@ public enum BuiltinTimeSeriesGeneratingFunctionEnum {
   EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE"),
   EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE("EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE"),
   JEXL("JEXL"),
-  MASTER_REPAIR("MASTER_REPAIR");
+  MASTER_REPAIR("MASTER_REPAIR"),
+  M4("M4");
 
   private final String functionName;
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java
new file mode 100644
index 0000000000..d1dd4d0d37
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBUDFM4IT {
+
+  public static final String WINDOW_SIZE_KEY = "windowSize";
+  public static final String TIME_INTERVAL_KEY = "timeInterval";
+  public static final String SLIDING_STEP_KEY = "slidingStep";
+  public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
+  public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  @Test
+  public void test_M4_slidingTimeWindow() {
+    String[] res =
+        new String[] {
+          "1,5.0", "10,30.0", "20,20.0", "25,8.0", "30,40.0", "45,30.0", 
"52,8.0", "54,18.0"
+        };
+
+    String sql =
+        String.format(
+            "select M4(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from 
root.vehicle.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            100);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void test_M4_slidingSizeWindow() {
+    String[] res =
+        new String[] {"1,5.0", "30,40.0", "33,9.0", "35,10.0", "45,30.0", 
"52,8.0", "54,18.0"};
+
+    String sql =
+        String.format(
+            "select M4(s1,'%s'='%s','%s'='%s') from root.vehicle.d1",
+            WINDOW_SIZE_KEY, 10, SLIDING_STEP_KEY, 10);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void test_M4_constantTimeSeries() {
+    /** Result: 0,1 24,1 25,1 49,1 50,1 74,1 75,1 99,1 */
+    String sql =
+        String.format(
+            "select M4(s2, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from 
root.vehicle.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            100);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String expStr;
+        if (count % 2 == 0) {
+          expStr = 25 * (count / 2) + ",1";
+        } else {
+          expStr = 25 * (count / 2) + 24 + ",1";
+        }
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(expStr, str);
+        count++;
+      }
+      Assert.assertEquals(8, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void test_EQUAL_SIZE_BUCKET_M4_SAMPLE() {
+    String[] res =
+        new String[] {
+          "1,5.0", "8,8.0", "10,30.0", "27,20.0", "30,40.0", "45,30.0", 
"52,8.0", "54,18.0"
+        };
+
+    String sql = "select EQUAL_SIZE_BUCKET_M4_SAMPLE(s1,'proportion'='0.5') 
from root.vehicle.d1";
+    // the window size is 4*(int)(1/proportion) = 8
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void test_EQUAL_SIZE_BUCKET_M4_SAMPLE_constantTimeSeries() {
+    String sql = "select EQUAL_SIZE_BUCKET_M4_SAMPLE(s2, 'proportion'='0.5') 
from root.vehicle.d1";
+    // the window size is 4*(int)(1/proportion) = 8
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String expStr;
+        if (count / 4 * 8 < 8 * 12) { // each 8-point window sample 4 
different points
+          if (count % 4 == 0) {
+            expStr = 8 * (count / 4) + ",1";
+          } else if (count % 4 == 1) {
+            expStr = 8 * (count / 4) + 1 + ",1";
+          } else if (count % 4 == 2) {
+            expStr = 8 * (count / 4 + 1) - 2 + ",1";
+          } else {
+            expStr = 8 * (count / 4 + 1) - 1 + ",1";
+          }
+        } else { // the last 4 points
+          expStr = count - 48 + 96 + ",1";
+        }
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(expStr, str);
+        count++;
+      }
+      Assert.assertEquals(52, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s1 with 
datatype=double,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s2 with 
datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static final String insertTemplate =
+      "INSERT INTO root.vehicle.d1(timestamp,%s)" + " VALUES(%d,%d)";
+
+  private static void generateData() {
+    // data:
+    // 
https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png
+    // NOTE: The last point (120,8) is commented out, because bug#7738 has not 
been fixed
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 1, 
5));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 2, 
15));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
20, 1));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
25, 8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
54, 3));
+      //      statement.execute(String.format(Locale.ENGLISH, insertTemplate, 
120, 8)); // TODO add
+      // back after fixing bug#7738
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 5, 
10));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 8, 
8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
10, 30));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
20, 20));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
27, 20));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
30, 40));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
35, 10));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
40, 20));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
33, 9));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
45, 30));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
52, 8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
54, 18));
+      statement.execute("FLUSH");
+
+      for (int i = 0; i < 100; i++) {
+        statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s2", 
i, 1));
+      }
+      statement.execute("FLUSH");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
index 437c6d2ac6..f4561ef95c 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
@@ -89,7 +89,8 @@ public enum BuiltinTimeSeriesGeneratingFunction {
   EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE(
       "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE", 
UDTFEqualSizeBucketOutlierSample.class),
   JEXL("JEXL", UDTFJexl.class),
-  MASTER_REPAIR("MASTER_REPAIR", UDTFMasterRepair.class);
+  MASTER_REPAIR("MASTER_REPAIR", UDTFMasterRepair.class),
+  M4("M4", UDTFM4.class);
 
   private final String functionName;
   private final Class<?> functionClass;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
new file mode 100644
index 0000000000..6351e548b3
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import 
org.apache.iotdb.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+
+/**
+ * For each sliding window, M4 returns the first, last, bottom, top points. 
The window can be
+ * controlled by either point size or time interval length. The aggregated 
points in the output
+ * series has been sorted and deduplicated.
+ *
+ * <p>SlidingSizeWindow usage Example: "select 
M4(s1,'windowSize'='10','slidingStep'='10') from
+ * root.vehicle.d1" (windowSize is required, slidingStep is optional.)
+ *
+ * <p>SlidingTimeWindow usage Example: "select
+ * 
M4(s1,'timeInterval'='25','slidingStep'='25','displayWindowBegin'='0','displayWindowEnd'='100')
+ * from root.vehicle.d1" (timeInterval is required, 
slidingStep/displayWindowBegin/displayWindowEnd
+ * are optional.)
+ */
+public class UDTFM4 implements UDTF {
+
+  enum AccessStrategy {
+    SIZE_WINDOW,
+    TIME_WINDOW
+  }
+
+  protected AccessStrategy accessStrategy;
+  protected TSDataType dataType;
+
+  public static final String WINDOW_SIZE_KEY = "windowSize";
+  public static final String TIME_INTERVAL_KEY = "timeInterval";
+  public static final String SLIDING_STEP_KEY = "slidingStep";
+  public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
+  public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws UDFException {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, 
Type.DOUBLE);
+
+    if (!validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
+        && !validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
+      throw new UDFParameterNotValidException(
+          String.format(
+              "attribute \"%s\"/\"%s\" is required but was not provided.",
+              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
+    }
+    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
+        && validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
+      throw new UDFParameterNotValidException(
+          String.format(
+              "use attribute \"%s\" or \"%s\" only one at a time.",
+              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
+    }
+    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)) {
+      accessStrategy = AccessStrategy.SIZE_WINDOW;
+    } else {
+      accessStrategy = AccessStrategy.TIME_WINDOW;
+    }
+
+    dataType =
+        
UDFDataTypeTransformer.transformToTsDataType(validator.getParameters().getDataType(0));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations 
configurations)
+      throws MetadataException {
+    // set data type
+    
configurations.setOutputDataType(UDFDataTypeTransformer.transformToUDFDataType(dataType));
+
+    // set access strategy
+    if (accessStrategy == AccessStrategy.SIZE_WINDOW) {
+      int windowSize = parameters.getInt(WINDOW_SIZE_KEY);
+      int slidingStep = parameters.getIntOrDefault(SLIDING_STEP_KEY, 
windowSize);
+      configurations.setAccessStrategy(
+          new SlidingSizeWindowAccessStrategy(windowSize, slidingStep));
+    } else {
+      long timeInterval = parameters.getLong(TIME_INTERVAL_KEY);
+      long displayWindowBegin =
+          parameters.getLongOrDefault(DISPLAY_WINDOW_BEGIN_KEY, 
Long.MIN_VALUE);
+      long displayWindowEnd = 
parameters.getLongOrDefault(DISPLAY_WINDOW_END_KEY, Long.MAX_VALUE);
+      long slidingStep = parameters.getLongOrDefault(SLIDING_STEP_KEY, 
timeInterval);
+      configurations.setAccessStrategy(
+          new SlidingTimeWindowAccessStrategy(
+              timeInterval, slidingStep, displayWindowBegin, 
displayWindowEnd));
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws UDFException, IOException {
+    switch (dataType) {
+      case INT32:
+        transformInt(rowWindow, collector);
+        break;
+      case INT64:
+        transformLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        transformFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        transformDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0,
+            UDFDataTypeTransformer.transformToUDFDataType(dataType),
+            Type.INT32,
+            Type.INT64,
+            Type.FLOAT,
+            Type.DOUBLE);
+    }
+  }
+
+  public void transformInt(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    int firstValue = rowWindow.getRow(0).getInt(0);
+    int lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getInt(0);
+
+    int minValue = Math.min(firstValue, lastValue);
+    int maxValue = Math.max(firstValue, lastValue);
+    int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+    int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+    for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+      int value = rowWindow.getRow(i).getInt(0);
+      if (value < minValue) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (value > maxValue) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putInt(row.getTime(), row.getInt(0));
+
+    int smallerIndex = Math.min(minIndex, maxIndex);
+    int largerIndex = Math.max(minIndex, maxIndex);
+    if (smallerIndex > 0) {
+      row = rowWindow.getRow(smallerIndex);
+      collector.putInt(row.getTime(), row.getInt(0));
+    }
+    if (largerIndex > smallerIndex) {
+      row = rowWindow.getRow(largerIndex);
+      collector.putInt(row.getTime(), row.getInt(0));
+    }
+    if (largerIndex < rowWindow.windowSize() - 1) {
+      row = rowWindow.getRow(rowWindow.windowSize() - 1);
+      collector.putInt(row.getTime(), row.getInt(0));
+    }
+  }
+
+  public void transformLong(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    long firstValue = rowWindow.getRow(0).getLong(0);
+    long lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getLong(0);
+
+    long minValue = Math.min(firstValue, lastValue);
+    long maxValue = Math.max(firstValue, lastValue);
+    int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+    int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+    for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+      long value = rowWindow.getRow(i).getLong(0);
+      if (value < minValue) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (value > maxValue) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putLong(row.getTime(), row.getLong(0));
+
+    int smallerIndex = Math.min(minIndex, maxIndex);
+    int largerIndex = Math.max(minIndex, maxIndex);
+    if (smallerIndex > 0) {
+      row = rowWindow.getRow(smallerIndex);
+      collector.putLong(row.getTime(), row.getLong(0));
+    }
+    if (largerIndex > smallerIndex) {
+      row = rowWindow.getRow(largerIndex);
+      collector.putLong(row.getTime(), row.getLong(0));
+    }
+    if (largerIndex < rowWindow.windowSize() - 1) {
+      row = rowWindow.getRow(rowWindow.windowSize() - 1);
+      collector.putLong(row.getTime(), row.getLong(0));
+    }
+  }
+
+  public void transformFloat(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    float firstValue = rowWindow.getRow(0).getFloat(0);
+    float lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getFloat(0);
+
+    float minValue = Math.min(firstValue, lastValue);
+    float maxValue = Math.max(firstValue, lastValue);
+    int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+    int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+    for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+      float value = rowWindow.getRow(i).getFloat(0);
+      if (value < minValue) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (value > maxValue) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putFloat(row.getTime(), row.getFloat(0));
+
+    int smallerIndex = Math.min(minIndex, maxIndex);
+    int largerIndex = Math.max(minIndex, maxIndex);
+    if (smallerIndex > 0) {
+      row = rowWindow.getRow(smallerIndex);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+    }
+    if (largerIndex > smallerIndex) {
+      row = rowWindow.getRow(largerIndex);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+    }
+    if (largerIndex < rowWindow.windowSize() - 1) {
+      row = rowWindow.getRow(rowWindow.windowSize() - 1);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+    }
+  }
+
+  public void transformDouble(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    double firstValue = rowWindow.getRow(0).getDouble(0);
+    double lastValue = rowWindow.getRow(rowWindow.windowSize() - 
1).getDouble(0);
+
+    double minValue = Math.min(firstValue, lastValue);
+    double maxValue = Math.max(firstValue, lastValue);
+    int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+    int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+    for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+      double value = rowWindow.getRow(i).getDouble(0);
+      if (value < minValue) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (value > maxValue) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putDouble(row.getTime(), row.getDouble(0));
+
+    int smallerIndex = Math.min(minIndex, maxIndex);
+    int largerIndex = Math.max(minIndex, maxIndex);
+    if (smallerIndex > 0) {
+      row = rowWindow.getRow(smallerIndex);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+    if (largerIndex > smallerIndex) {
+      row = rowWindow.getRow(largerIndex);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+    if (largerIndex < rowWindow.windowSize() - 1) {
+      row = rowWindow.getRow(rowWindow.windowSize() - 1);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+  }
+}

Reply via email to