This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 132ab5fe95290fb1a1f1b63cf0b615f84b5c0ba4 Author: Lei Rui <[email protected]> AuthorDate: Sat Jun 17 16:01:59 2023 +0800 add udf time window TW --- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 1 - .../db/query/udf/builtin/BuiltinFunction.java | 1 + .../apache/iotdb/db/query/udf/builtin/UDTFM4.java | 315 +++++++++++++++++++++ .../iotdb/db/query/udf/builtin/UDTFM4MAC.java | 3 + .../apache/iotdb/db/integration/m4/MyTest5.java | 281 ++++++++++++++++++ .../iotdb/tsfile/read/common/IOMonitor2.java | 3 +- 6 files changed, 602 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index ffff5ac268d..b5fde29063a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -526,7 +526,6 @@ public class PlanExecutor implements IPlanExecutor { // no time series are selected, return EmptyDataSet return new EmptyDataSet(); } else if (queryPlan instanceof UDTFPlan) { - IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet; UDTFPlan udtfPlan = (UDTFPlan) queryPlan; queryDataSet = queryRouter.udtfQuery(udtfPlan, context); } else if (queryPlan instanceof GroupByTimeFillPlan) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java index fc8da6b3a0f..216d912d393 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java @@ -48,6 +48,7 @@ public enum BuiltinFunction { TOP_K("TOP_K", UDTFTopK.class), BOTTOM_K("BOTTOM_K", UDTFBottomK.class), M4("M4", UDTFM4MAC.class), + M4_TW("M4_TW", UDTFM4.class); ; private final String functionName; diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java new file mode 100644 index 00000000000..d4f97ac7679 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.builtin; + +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.query.udf.api.UDTF; +import org.apache.iotdb.db.query.udf.api.access.Row; +import org.apache.iotdb.db.query.udf.api.access.RowWindow; +import org.apache.iotdb.db.query.udf.api.collector.PointCollector; +import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; +import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator; +import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.api.exception.UDFException; +import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException; +import org.apache.iotdb.db.query.udf.api.exception.UDFParameterNotValidException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.iotdb.tsfile.read.common.IOMonitor2.DataSetType; + +import java.io.IOException; + +/** + * For each sliding window, M4 returns the first, last, bottom, top points. The window can be + * controlled by either point size or time interval length. The aggregated points in the output + * series has been sorted and deduplicated. + * + * <p>SlidingSizeWindow usage Example: "select M4(s1,'windowSize'='10','slidingStep'='10') from + * root.vehicle.d1" (windowSize is required, slidingStep is optional.) + * + * <p>SlidingTimeWindow usage Example: "select + * M4(s1,'timeInterval'='25','slidingStep'='25','displayWindowBegin'='0','displayWindowEnd'='100') + * from root.vehicle.d1" (timeInterval is required, slidingStep/displayWindowBegin/displayWindowEnd + * are optional.) + */ +public class UDTFM4 implements UDTF { + + enum AccessStrategy { + SIZE_WINDOW, + TIME_WINDOW + } + + protected AccessStrategy accessStrategy; + protected TSDataType dataType; + + public static final String WINDOW_SIZE_KEY = "windowSize"; + public static final String TIME_INTERVAL_KEY = "timeInterval"; + public static final String SLIDING_STEP_KEY = "slidingStep"; + public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin"; + public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd"; + + @Override + public void validate(UDFParameterValidator validator) throws UDFException, MetadataException { + IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet_M4_TIMEWINDOW; + validator + .validateInputSeriesNumber(1) + .validateInputSeriesDataType( + 0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE); + + if (!validator.getParameters().hasAttribute(WINDOW_SIZE_KEY) + && !validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) { + throw new UDFParameterNotValidException( + String.format( + "attribute \"%s\"/\"%s\" is required but was not provided.", + WINDOW_SIZE_KEY, TIME_INTERVAL_KEY)); + } + if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY) + && validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) { + throw new UDFParameterNotValidException( + String.format( + "use attribute \"%s\" or \"%s\" only one at a time.", + WINDOW_SIZE_KEY, TIME_INTERVAL_KEY)); + } + if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)) { + accessStrategy = AccessStrategy.SIZE_WINDOW; + } else { + accessStrategy = AccessStrategy.TIME_WINDOW; + } + + dataType = validator.getParameters().getDataType(0); + } + + @Override + public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) + throws MetadataException { + // set data type + configurations.setOutputDataType(dataType); + + // set access strategy + if (accessStrategy == AccessStrategy.SIZE_WINDOW) { + int windowSize = parameters.getInt(WINDOW_SIZE_KEY); + int slidingStep = parameters.getIntOrDefault(SLIDING_STEP_KEY, windowSize); + configurations.setAccessStrategy( + new SlidingSizeWindowAccessStrategy(windowSize, slidingStep)); + } else { + long timeInterval = parameters.getLong(TIME_INTERVAL_KEY); + long displayWindowBegin = + parameters.getLongOrDefault(DISPLAY_WINDOW_BEGIN_KEY, Long.MIN_VALUE); + long displayWindowEnd = parameters.getLongOrDefault(DISPLAY_WINDOW_END_KEY, Long.MAX_VALUE); + long slidingStep = parameters.getLongOrDefault(SLIDING_STEP_KEY, timeInterval); + configurations.setAccessStrategy( + new SlidingTimeWindowAccessStrategy( + timeInterval, slidingStep, displayWindowBegin, displayWindowEnd)); + } + } + + @Override + public void transform(RowWindow rowWindow, PointCollector collector) + throws UDFException, IOException { + switch (dataType) { + case INT32: + transformInt(rowWindow, collector); + break; + case INT64: + transformLong(rowWindow, collector); + break; + case FLOAT: + transformFloat(rowWindow, collector); + break; + case DOUBLE: + transformDouble(rowWindow, collector); + break; + default: + // This will not happen + throw new UDFInputSeriesDataTypeNotValidException( + 0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE); + } + } + + public void transformInt(RowWindow rowWindow, PointCollector collector) throws IOException { + if (rowWindow.windowSize() > 0) { // else empty window do nothing + int firstValue = rowWindow.getRow(0).getInt(0); + int lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getInt(0); + + int minValue = Math.min(firstValue, lastValue); + int maxValue = Math.max(firstValue, lastValue); + int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1; + int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1; + + for (int i = 1; i < rowWindow.windowSize() - 1; i++) { + int value = rowWindow.getRow(i).getInt(0); + if (value < minValue) { + minValue = value; + minIndex = i; + } + if (value > maxValue) { + maxValue = value; + maxIndex = i; + } + } + + Row row = rowWindow.getRow(0); + collector.putInt(row.getTime(), row.getInt(0)); + + int smallerIndex = Math.min(minIndex, maxIndex); + int largerIndex = Math.max(minIndex, maxIndex); + if (smallerIndex > 0) { + row = rowWindow.getRow(smallerIndex); + collector.putInt(row.getTime(), row.getInt(0)); + } + if (largerIndex > smallerIndex) { + row = rowWindow.getRow(largerIndex); + collector.putInt(row.getTime(), row.getInt(0)); + } + if (largerIndex < rowWindow.windowSize() - 1) { + row = rowWindow.getRow(rowWindow.windowSize() - 1); + collector.putInt(row.getTime(), row.getInt(0)); + } + } + } + + public void transformLong(RowWindow rowWindow, PointCollector collector) throws IOException { + if (rowWindow.windowSize() > 0) { // else empty window do nothing + long firstValue = rowWindow.getRow(0).getLong(0); + long lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getLong(0); + + long minValue = Math.min(firstValue, lastValue); + long maxValue = Math.max(firstValue, lastValue); + int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1; + int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1; + + for (int i = 1; i < rowWindow.windowSize() - 1; i++) { + long value = rowWindow.getRow(i).getLong(0); + if (value < minValue) { + minValue = value; + minIndex = i; + } + if (value > maxValue) { + maxValue = value; + maxIndex = i; + } + } + + Row row = rowWindow.getRow(0); + collector.putLong(row.getTime(), row.getLong(0)); + + int smallerIndex = Math.min(minIndex, maxIndex); + int largerIndex = Math.max(minIndex, maxIndex); + if (smallerIndex > 0) { + row = rowWindow.getRow(smallerIndex); + collector.putLong(row.getTime(), row.getLong(0)); + } + if (largerIndex > smallerIndex) { + row = rowWindow.getRow(largerIndex); + collector.putLong(row.getTime(), row.getLong(0)); + } + if (largerIndex < rowWindow.windowSize() - 1) { + row = rowWindow.getRow(rowWindow.windowSize() - 1); + collector.putLong(row.getTime(), row.getLong(0)); + } + } + } + + public void transformFloat(RowWindow rowWindow, PointCollector collector) throws IOException { + if (rowWindow.windowSize() > 0) { // else empty window do nothing + float firstValue = rowWindow.getRow(0).getFloat(0); + float lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getFloat(0); + + float minValue = Math.min(firstValue, lastValue); + float maxValue = Math.max(firstValue, lastValue); + int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1; + int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1; + + for (int i = 1; i < rowWindow.windowSize() - 1; i++) { + float value = rowWindow.getRow(i).getFloat(0); + if (value < minValue) { + minValue = value; + minIndex = i; + } + if (value > maxValue) { + maxValue = value; + maxIndex = i; + } + } + + Row row = rowWindow.getRow(0); + collector.putFloat(row.getTime(), row.getFloat(0)); + + int smallerIndex = Math.min(minIndex, maxIndex); + int largerIndex = Math.max(minIndex, maxIndex); + if (smallerIndex > 0) { + row = rowWindow.getRow(smallerIndex); + collector.putFloat(row.getTime(), row.getFloat(0)); + } + if (largerIndex > smallerIndex) { + row = rowWindow.getRow(largerIndex); + collector.putFloat(row.getTime(), row.getFloat(0)); + } + if (largerIndex < rowWindow.windowSize() - 1) { + row = rowWindow.getRow(rowWindow.windowSize() - 1); + collector.putFloat(row.getTime(), row.getFloat(0)); + } + } + } + + public void transformDouble(RowWindow rowWindow, PointCollector collector) throws IOException { + if (rowWindow.windowSize() > 0) { // else empty window do nothing + double firstValue = rowWindow.getRow(0).getDouble(0); + double lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getDouble(0); + + double minValue = Math.min(firstValue, lastValue); + double maxValue = Math.max(firstValue, lastValue); + int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1; + int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1; + + for (int i = 1; i < rowWindow.windowSize() - 1; i++) { + double value = rowWindow.getRow(i).getDouble(0); + if (value < minValue) { + minValue = value; + minIndex = i; + } + if (value > maxValue) { + maxValue = value; + maxIndex = i; + } + } + + Row row = rowWindow.getRow(0); + collector.putDouble(row.getTime(), row.getDouble(0)); + + int smallerIndex = Math.min(minIndex, maxIndex); + int largerIndex = Math.max(minIndex, maxIndex); + if (smallerIndex > 0) { + row = rowWindow.getRow(smallerIndex); + collector.putDouble(row.getTime(), row.getDouble(0)); + } + if (largerIndex > smallerIndex) { + row = rowWindow.getRow(largerIndex); + collector.putDouble(row.getTime(), row.getDouble(0)); + } + if (largerIndex < rowWindow.windowSize() - 1) { + row = rowWindow.getRow(rowWindow.windowSize() - 1); + collector.putDouble(row.getTime(), row.getDouble(0)); + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java index 7b2388af2d6..654221963f7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java @@ -29,6 +29,8 @@ import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.db.query.udf.api.exception.UDFException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.iotdb.tsfile.read.common.IOMonitor2.DataSetType; import java.io.IOException; @@ -100,6 +102,7 @@ public class UDTFM4MAC implements UDTF { @Override public void validate(UDFParameterValidator validator) throws UDFException { + IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet_M4_POINT; validator .validateInputSeriesNumber(1) .validateInputSeriesDataType( diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java new file mode 100644 index 00000000000..d7803eac00f --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.integration.m4; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Locale; + +import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.DISPLAY_WINDOW_BEGIN_KEY; +import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.DISPLAY_WINDOW_END_KEY; +import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.SLIDING_STEP_KEY; +import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.TIME_INTERVAL_KEY; +import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.WINDOW_SIZE_KEY; +import static org.junit.Assert.fail; + +public class MyTest5 { + + private static final String TIMESTAMP_STR = "Time"; + + private static String[] creationSqls = + new String[] { + "SET STORAGE GROUP TO root.vehicle.d0", + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=DOUBLE,ENCODING=PLAIN", + }; + + private final String d0s0 = "root.vehicle.d0.s0"; + + private static final String insertTemplate = + "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)"; + + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static boolean originalEnableCPV; + private static CompactionStrategy originalCompactionStrategy; + private static boolean originalUseChunkIndex; + + @Before + public void setUp() throws Exception { + TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN"); + originalCompactionStrategy = config.getCompactionStrategy(); + config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); + + originalEnableCPV = config.isEnableCPV(); + // config.setEnableCPV(false); // MOC + config.setEnableCPV(true); // CPV + + originalUseChunkIndex = TSFileDescriptor.getInstance().getConfig().isUseChunkIndex(); + TSFileDescriptor.getInstance().getConfig().setUseChunkIndex(false); + + EnvironmentUtils.envSetUp(); + Class.forName(Config.JDBC_DRIVER_NAME); + config.setTimestampPrecision("ms"); + } + + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanEnv(); + config.setCompactionStrategy(originalCompactionStrategy); + config.setEnableCPV(originalEnableCPV); + TSFileDescriptor.getInstance().getConfig().setUseChunkIndex(originalUseChunkIndex); + } + + @Test + public void testM4Function() { + // create timeseries + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + statement.execute("SET STORAGE GROUP TO root.m4"); + statement.execute("CREATE TIMESERIES root.m4.d1.s1 with datatype=double,encoding=PLAIN"); + statement.execute("CREATE TIMESERIES root.m4.d1.s2 with datatype=INT32,encoding=PLAIN"); + } catch (SQLException throwable) { + fail(throwable.getMessage()); + } + + // insert data + String insertTemplate = "INSERT INTO root.m4.d1(timestamp,%s)" + " VALUES(%d,%d)"; + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + // "root.m4.d1.s1" data illustration: + // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 1, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 2, 15)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 20, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 25, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 54, 3)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 120, 8)); + statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 5, 10)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 8, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 10, 30)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 20, 20)); + statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 27, 20)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 30, 40)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 35, 10)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 40, 20)); + statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 33, 9)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 45, 30)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 52, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 54, 18)); + statement.execute("FLUSH"); + + // "root.m4.d1.s2" data: constant value 1 + for (int i = 0; i < 100; i++) { + statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s2", i, 1)); + } + statement.execute("FLUSH"); + } catch (Exception e) { + e.printStackTrace(); + } + + // query tests + test_M4_firstWindowEmpty(); + test_M4_slidingTimeWindow(); + test_M4_slidingSizeWindow(); + test_M4_constantTimeSeries(); + } + + private void test_M4_firstWindowEmpty() { + String[] res = new String[] {"120,8.0"}; + + String sql = + String.format( + "select M4_TW(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.m4.d1", + TIME_INTERVAL_KEY, + 25, + SLIDING_STEP_KEY, + 25, + DISPLAY_WINDOW_BEGIN_KEY, + 75, + DISPLAY_WINDOW_END_KEY, + 150); + + try (Connection conn = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = conn.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + int count = 0; + while (resultSet.next()) { + String str = resultSet.getString(1) + "," + resultSet.getString(2); + Assert.assertEquals(res[count], str); + count++; + } + Assert.assertEquals(res.length, count); + } catch (SQLException throwable) { + fail(throwable.getMessage()); + } + } + + private void test_M4_slidingTimeWindow() { + String[] res = + new String[] { + "1,5.0", "10,30.0", "20,20.0", "25,8.0", "30,40.0", "45,30.0", "52,8.0", "54,18.0", + "120,8.0" + }; + + String sql = + String.format( + "select M4_TW(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.m4.d1", + TIME_INTERVAL_KEY, + 25, + SLIDING_STEP_KEY, + 25, + DISPLAY_WINDOW_BEGIN_KEY, + 0, + DISPLAY_WINDOW_END_KEY, + 150); + + try (Connection conn = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = conn.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + int count = 0; + while (resultSet.next()) { + String str = resultSet.getString(1) + "," + resultSet.getString(2); + Assert.assertEquals(res[count], str); + count++; + } + Assert.assertEquals(res.length, count); + } catch (SQLException throwable) { + fail(throwable.getMessage()); + } + } + + private void test_M4_slidingSizeWindow() { + String[] res = new String[] {"1,5.0", "30,40.0", "33,9.0", "35,10.0", "45,30.0", "120,8.0"}; + + String sql = + String.format( + "select M4_TW(s1,'%s'='%s','%s'='%s') from root.m4.d1", + WINDOW_SIZE_KEY, 10, SLIDING_STEP_KEY, 10); + + try (Connection conn = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = conn.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + int count = 0; + while (resultSet.next()) { + String str = resultSet.getString(1) + "," + resultSet.getString(2); + Assert.assertEquals(res[count], str); + count++; + } + Assert.assertEquals(res.length, count); + } catch (SQLException throwable) { + fail(throwable.getMessage()); + } + } + + private void test_M4_constantTimeSeries() { + /* Result: 0,1 24,1 25,1 49,1 50,1 74,1 75,1 99,1 */ + String sql = + String.format( + "select M4_TW(s2, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.m4.d1", + TIME_INTERVAL_KEY, + 25, + SLIDING_STEP_KEY, + 25, + DISPLAY_WINDOW_BEGIN_KEY, + 0, + DISPLAY_WINDOW_END_KEY, + 100); + + try (Connection conn = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = conn.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + int count = 0; + while (resultSet.next()) { + String expStr; + if (count % 2 == 0) { + expStr = 25 * (count / 2) + ",1"; + } else { + expStr = 25 * (count / 2) + 24 + ",1"; + } + String str = resultSet.getString(1) + "," + resultSet.getString(2); + Assert.assertEquals(expStr, str); + count++; + } + Assert.assertEquals(8, count); + } catch (SQLException throwable) { + fail(throwable.getMessage()); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java index 7541741b1f4..d9b436b8c5e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java @@ -24,7 +24,8 @@ public class IOMonitor2 { public enum DataSetType { // dataSet, executor, reader, file NONE, RawQueryDataSetWithoutValueFilter, - UDTFAlignByTimeDataSet, + UDTFAlignByTimeDataSet_M4_POINT, + UDTFAlignByTimeDataSet_M4_TIMEWINDOW, GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV, GroupByWithoutValueFilterDataSet_LocalGroupByExecutor
