This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 132ab5fe95290fb1a1f1b63cf0b615f84b5c0ba4
Author: Lei Rui <[email protected]>
AuthorDate: Sat Jun 17 16:01:59 2023 +0800

    add udf time window TW
---
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   1 -
 .../db/query/udf/builtin/BuiltinFunction.java      |   1 +
 .../apache/iotdb/db/query/udf/builtin/UDTFM4.java  | 315 +++++++++++++++++++++
 .../iotdb/db/query/udf/builtin/UDTFM4MAC.java      |   3 +
 .../apache/iotdb/db/integration/m4/MyTest5.java    | 281 ++++++++++++++++++
 .../iotdb/tsfile/read/common/IOMonitor2.java       |   3 +-
 6 files changed, 602 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index ffff5ac268d..b5fde29063a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -526,7 +526,6 @@ public class PlanExecutor implements IPlanExecutor {
         // no time series are selected, return EmptyDataSet
         return new EmptyDataSet();
       } else if (queryPlan instanceof UDTFPlan) {
-        IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet;
         UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
         queryDataSet = queryRouter.udtfQuery(udtfPlan, context);
       } else if (queryPlan instanceof GroupByTimeFillPlan) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
index fc8da6b3a0f..216d912d393 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
@@ -48,6 +48,7 @@ public enum BuiltinFunction {
   TOP_K("TOP_K", UDTFTopK.class),
   BOTTOM_K("BOTTOM_K", UDTFBottomK.class),
   M4("M4", UDTFM4MAC.class),
+  M4_TW("M4_TW", UDTFM4.class);
   ;
 
   private final String functionName;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java 
b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java
new file mode 100644
index 00000000000..d4f97ac7679
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import 
org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import 
org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import 
org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import 
org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import 
org.apache.iotdb.db.query.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.IOMonitor2;
+import org.apache.iotdb.tsfile.read.common.IOMonitor2.DataSetType;
+
+import java.io.IOException;
+
+/**
+ * For each sliding window, M4 returns the first, last, bottom, top points. 
The window can be
+ * controlled by either point size or time interval length. The aggregated 
points in the output
+ * series has been sorted and deduplicated.
+ *
+ * <p>SlidingSizeWindow usage Example: "select 
M4(s1,'windowSize'='10','slidingStep'='10') from
+ * root.vehicle.d1" (windowSize is required, slidingStep is optional.)
+ *
+ * <p>SlidingTimeWindow usage Example: "select
+ * 
M4(s1,'timeInterval'='25','slidingStep'='25','displayWindowBegin'='0','displayWindowEnd'='100')
+ * from root.vehicle.d1" (timeInterval is required, 
slidingStep/displayWindowBegin/displayWindowEnd
+ * are optional.)
+ */
+public class UDTFM4 implements UDTF {
+
+  enum AccessStrategy {
+    SIZE_WINDOW,
+    TIME_WINDOW
+  }
+
+  protected AccessStrategy accessStrategy;
+  protected TSDataType dataType;
+
+  public static final String WINDOW_SIZE_KEY = "windowSize";
+  public static final String TIME_INTERVAL_KEY = "timeInterval";
+  public static final String SLIDING_STEP_KEY = "slidingStep";
+  public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
+  public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws UDFException, 
MetadataException {
+    IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet_M4_TIMEWINDOW;
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, 
TSDataType.DOUBLE);
+
+    if (!validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
+        && !validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
+      throw new UDFParameterNotValidException(
+          String.format(
+              "attribute \"%s\"/\"%s\" is required but was not provided.",
+              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
+    }
+    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
+        && validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
+      throw new UDFParameterNotValidException(
+          String.format(
+              "use attribute \"%s\" or \"%s\" only one at a time.",
+              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
+    }
+    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)) {
+      accessStrategy = AccessStrategy.SIZE_WINDOW;
+    } else {
+      accessStrategy = AccessStrategy.TIME_WINDOW;
+    }
+
+    dataType = validator.getParameters().getDataType(0);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations 
configurations)
+      throws MetadataException {
+    // set data type
+    configurations.setOutputDataType(dataType);
+
+    // set access strategy
+    if (accessStrategy == AccessStrategy.SIZE_WINDOW) {
+      int windowSize = parameters.getInt(WINDOW_SIZE_KEY);
+      int slidingStep = parameters.getIntOrDefault(SLIDING_STEP_KEY, 
windowSize);
+      configurations.setAccessStrategy(
+          new SlidingSizeWindowAccessStrategy(windowSize, slidingStep));
+    } else {
+      long timeInterval = parameters.getLong(TIME_INTERVAL_KEY);
+      long displayWindowBegin =
+          parameters.getLongOrDefault(DISPLAY_WINDOW_BEGIN_KEY, 
Long.MIN_VALUE);
+      long displayWindowEnd = 
parameters.getLongOrDefault(DISPLAY_WINDOW_END_KEY, Long.MAX_VALUE);
+      long slidingStep = parameters.getLongOrDefault(SLIDING_STEP_KEY, 
timeInterval);
+      configurations.setAccessStrategy(
+          new SlidingTimeWindowAccessStrategy(
+              timeInterval, slidingStep, displayWindowBegin, 
displayWindowEnd));
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws UDFException, IOException {
+    switch (dataType) {
+      case INT32:
+        transformInt(rowWindow, collector);
+        break;
+      case INT64:
+        transformLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        transformFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        transformDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, 
TSDataType.DOUBLE);
+    }
+  }
+
+  public void transformInt(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      int firstValue = rowWindow.getRow(0).getInt(0);
+      int lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getInt(0);
+
+      int minValue = Math.min(firstValue, lastValue);
+      int maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        int value = rowWindow.getRow(i).getInt(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putInt(row.getTime(), row.getInt(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+    }
+  }
+
+  public void transformLong(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      long firstValue = rowWindow.getRow(0).getLong(0);
+      long lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getLong(0);
+
+      long minValue = Math.min(firstValue, lastValue);
+      long maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        long value = rowWindow.getRow(i).getLong(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putLong(row.getTime(), row.getLong(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+    }
+  }
+
+  public void transformFloat(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      float firstValue = rowWindow.getRow(0).getFloat(0);
+      float lastValue = rowWindow.getRow(rowWindow.windowSize() - 
1).getFloat(0);
+
+      float minValue = Math.min(firstValue, lastValue);
+      float maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        float value = rowWindow.getRow(i).getFloat(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+    }
+  }
+
+  public void transformDouble(RowWindow rowWindow, PointCollector collector) 
throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      double firstValue = rowWindow.getRow(0).getDouble(0);
+      double lastValue = rowWindow.getRow(rowWindow.windowSize() - 
1).getDouble(0);
+
+      double minValue = Math.min(firstValue, lastValue);
+      double maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        double value = rowWindow.getRow(i).getDouble(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java 
b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
index 7b2388af2d6..654221963f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
@@ -29,6 +29,8 @@ import 
org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
 import 
org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
 import org.apache.iotdb.db.query.udf.api.exception.UDFException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.IOMonitor2;
+import org.apache.iotdb.tsfile.read.common.IOMonitor2.DataSetType;
 
 import java.io.IOException;
 
@@ -100,6 +102,7 @@ public class UDTFM4MAC implements UDTF {
 
   @Override
   public void validate(UDFParameterValidator validator) throws UDFException {
+    IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet_M4_POINT;
     validator
         .validateInputSeriesNumber(1)
         .validateInputSeriesDataType(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java 
b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java
new file mode 100644
index 00000000000..d7803eac00f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.m4;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static 
org.apache.iotdb.db.query.udf.builtin.UDTFM4.DISPLAY_WINDOW_BEGIN_KEY;
+import static 
org.apache.iotdb.db.query.udf.builtin.UDTFM4.DISPLAY_WINDOW_END_KEY;
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.SLIDING_STEP_KEY;
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.TIME_INTERVAL_KEY;
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.WINDOW_SIZE_KEY;
+import static org.junit.Assert.fail;
+
+public class MyTest5 {
+
+  private static final String TIMESTAMP_STR = "Time";
+
+  private static String[] creationSqls =
+      new String[] {
+        "SET STORAGE GROUP TO root.vehicle.d0",
+        "CREATE TIMESERIES root.vehicle.d0.s0 WITH 
DATATYPE=DOUBLE,ENCODING=PLAIN",
+      };
+
+  private final String d0s0 = "root.vehicle.d0.s0";
+
+  private static final String insertTemplate =
+      "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
+
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static boolean originalEnableCPV;
+  private static CompactionStrategy originalCompactionStrategy;
+  private static boolean originalUseChunkIndex;
+
+  @Before
+  public void setUp() throws Exception {
+    TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN");
+    originalCompactionStrategy = config.getCompactionStrategy();
+    config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+    originalEnableCPV = config.isEnableCPV();
+    //    config.setEnableCPV(false); // MOC
+    config.setEnableCPV(true); // CPV
+
+    originalUseChunkIndex = 
TSFileDescriptor.getInstance().getConfig().isUseChunkIndex();
+    TSFileDescriptor.getInstance().getConfig().setUseChunkIndex(false);
+
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    config.setTimestampPrecision("ms");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    config.setCompactionStrategy(originalCompactionStrategy);
+    config.setEnableCPV(originalEnableCPV);
+    
TSFileDescriptor.getInstance().getConfig().setUseChunkIndex(originalUseChunkIndex);
+  }
+
+  @Test
+  public void testM4Function() {
+    // create timeseries
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.m4");
+      statement.execute("CREATE TIMESERIES root.m4.d1.s1 with 
datatype=double,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.m4.d1.s2 with 
datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    // insert data
+    String insertTemplate = "INSERT INTO root.m4.d1(timestamp,%s)" + " 
VALUES(%d,%d)";
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = connection.createStatement()) {
+      // "root.m4.d1.s1" data illustration:
+      // 
https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 1, 
5));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 2, 
15));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
20, 1));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
25, 8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
54, 3));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
120, 8));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 5, 
10));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 8, 
8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
10, 30));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
20, 20));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
27, 20));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
30, 40));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
35, 10));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
40, 20));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
33, 9));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
45, 30));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
52, 8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 
54, 18));
+      statement.execute("FLUSH");
+
+      // "root.m4.d1.s2" data: constant value 1
+      for (int i = 0; i < 100; i++) {
+        statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s2", 
i, 1));
+      }
+      statement.execute("FLUSH");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    // query tests
+    test_M4_firstWindowEmpty();
+    test_M4_slidingTimeWindow();
+    test_M4_slidingSizeWindow();
+    test_M4_constantTimeSeries();
+  }
+
+  private void test_M4_firstWindowEmpty() {
+    String[] res = new String[] {"120,8.0"};
+
+    String sql =
+        String.format(
+            "select M4_TW(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from 
root.m4.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            75,
+            DISPLAY_WINDOW_END_KEY,
+            150);
+
+    try (Connection conn =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void test_M4_slidingTimeWindow() {
+    String[] res =
+        new String[] {
+          "1,5.0", "10,30.0", "20,20.0", "25,8.0", "30,40.0", "45,30.0", 
"52,8.0", "54,18.0",
+          "120,8.0"
+        };
+
+    String sql =
+        String.format(
+            "select M4_TW(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from 
root.m4.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            150);
+
+    try (Connection conn =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void test_M4_slidingSizeWindow() {
+    String[] res = new String[] {"1,5.0", "30,40.0", "33,9.0", "35,10.0", 
"45,30.0", "120,8.0"};
+
+    String sql =
+        String.format(
+            "select M4_TW(s1,'%s'='%s','%s'='%s') from root.m4.d1",
+            WINDOW_SIZE_KEY, 10, SLIDING_STEP_KEY, 10);
+
+    try (Connection conn =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void test_M4_constantTimeSeries() {
+    /* Result: 0,1 24,1 25,1 49,1 50,1 74,1 75,1 99,1 */
+    String sql =
+        String.format(
+            "select M4_TW(s2, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from 
root.m4.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            100);
+
+    try (Connection conn =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String expStr;
+        if (count % 2 == 0) {
+          expStr = 25 * (count / 2) + ",1";
+        } else {
+          expStr = 25 * (count / 2) + 24 + ",1";
+        }
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(expStr, str);
+        count++;
+      }
+      Assert.assertEquals(8, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java
index 7541741b1f4..d9b436b8c5e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java
@@ -24,7 +24,8 @@ public class IOMonitor2 {
   public enum DataSetType { // dataSet, executor, reader, file
     NONE,
     RawQueryDataSetWithoutValueFilter,
-    UDTFAlignByTimeDataSet,
+    UDTFAlignByTimeDataSet_M4_POINT,
+    UDTFAlignByTimeDataSet_M4_TIMEWINDOW,
     GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV,
 
     GroupByWithoutValueFilterDataSet_LocalGroupByExecutor

Reply via email to