This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9cb402f77e4b799826a426a5f13582a97054ebe3 Author: Lei Rui <[email protected]> AuthorDate: Sun Jul 14 15:09:55 2024 +0800 add --- .../resources/conf/iotdb-engine.properties | 4 +- .../groupby/GroupByWithoutValueFilterDataSet.java | 3 + .../groupby/LocalGroupByExecutorTri_FSW.java | 163 +++++++++++++++++++++ .../org/apache/iotdb/db/query/simpiece/FSW.java | 98 +++++++++++++ .../iotdb/db/query/simpiece/MySample_fsw.java | 78 ++++++++++ 5 files changed, 344 insertions(+), 2 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 18bcfd54fa5..7cc6cfcc97a 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -19,10 +19,10 @@ #################### ### enable Tri #################### -# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC +# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC, FSW enable_Tri=MinMax -# SimPiece segment error threshold +# segment error threshold for SimPiece, SC, FSW epsilon=100 #for diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 6a4f559c775..5818d4b6a04 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -441,6 +441,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } else if (CONFIG.getEnableTri().equals("SC")) { return new LocalGroupByExecutorTri_SC( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("FSW")) { + return new LocalGroupByExecutorTri_FSW( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else { logger.info("No matched enable_tri!"); return new LocalGroupByExecutor( diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_FSW.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_FSW.java new file mode 100644 index 00000000000..699f88dd824 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_FSW.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset.groupby; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.SeriesReader; +import org.apache.iotdb.db.query.simpiece.FSW; +import org.apache.iotdb.db.query.simpiece.Point; +import org.apache.iotdb.db.query.simpiece.TimeSeries; +import org.apache.iotdb.db.query.simpiece.TimeSeriesReader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Set; + +public class LocalGroupByExecutorTri_FSW implements GroupByExecutor { + + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + // Aggregate result buffer of this path + private final List<AggregateResult> results = new ArrayList<>(); + + TimeSeries timeSeries; + + double epsilon = CONFIG.getEpsilon(); + + public LocalGroupByExecutorTri_FSW( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + Filter timeFilter, + TsFileFilter fileFilter, + boolean ascending) + throws StorageEngineException, QueryProcessException { + // long start = System.nanoTime(); + + // get all data sources + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); + + // update filter by TTL + // this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + SeriesReader seriesReader = + new SeriesReader( + path, + allSensors, + // fix bug: here use the aggregation type as the series data type, + // not using pageReader.getAllSatisfiedPageData is ok + dataType, + context, + queryDataSource, + timeFilter, + null, + fileFilter, + ascending); + + try { + // : this might be bad to load all chunk metadata at first + List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); + futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); + // order futureChunkList by chunk startTime + futureChunkList.sort( + new Comparator<ChunkSuit4Tri>() { + public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) { + return ((Comparable) (o1.chunkMetadata.getStartTime())) + .compareTo(o2.chunkMetadata.getStartTime()); + } + }); + + GroupByFilter groupByFilter = (GroupByFilter) timeFilter; + long startTime = groupByFilter.getStartTime(); + long endTime = groupByFilter.getEndTime(); + + timeSeries = TimeSeriesReader.getTimeSeriesFromTsFiles(futureChunkList, startTime, endTime); + + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + } + + @Override + public void addAggregateResult(AggregateResult aggrResult) { + results.add(aggrResult); + } + + @Override + public List<AggregateResult> calcResult( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { + // group by curStartTime and curEndTime are not used in Sim-Piece segmentation + + StringBuilder series = new StringBuilder(); + + // clear result cache + for (AggregateResult result : results) { + result.reset(); + } + + // shrinking cone + int length = timeSeries.length(); + List<Point> points = timeSeries.data; + + // Shrinking cone + List<Point> reducedPoints = FSW.reducePoints(timeSeries.data, epsilon); + + for (Point p : reducedPoints) { + series.append(p.getValue()).append("[").append(p.getTimestamp()).append("]").append(","); + } + + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0)); + return results; + } + + @Override + public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) + throws IOException { + throw new IOException("no implemented"); + } + + @Override + public List<AggregateResult> calcResult(long curStartTime, long curEndTime) + throws IOException, QueryProcessException { + throw new IOException("no implemented"); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/FSW.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/FSW.java new file mode 100644 index 00000000000..4b2e0163e1a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/FSW.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.simpiece; + +import java.util.ArrayList; +import java.util.List; + +// Liu, Xiaoyan, Lin, Zhenjiang, Wang, Huaiqing, 2008. Novel online methods for time series +// segmentation. +// FSW: feasible space window +// adapted from https://github.com/zhourongleiden/AEPLA.git +public class FSW { + + public static List<Point> reducePoints(List<Point> points, double epsilon) { + int length = points.size(); + + // Precompute upper and lower bounds + double[] p_upper = new double[length]; + double[] p_lower = new double[length]; + for (int j = 0; j < length; j++) { + double v = points.get(j).getValue(); + p_upper[j] = v + epsilon; + p_lower[j] = v - epsilon; + } + + // init the first segment + int i = 0; + int seg_no = 0; + int csp_id = 0; + List<int[]> segmentPoint = new ArrayList<>(); + segmentPoint.add(new int[] {0, 0}); + double upSlope = Double.POSITIVE_INFINITY; + double lowSlope = Double.NEGATIVE_INFINITY; + + while (i < length - 1) { + i++; + upSlope = + Math.min( + upSlope, + ((p_upper[i] - points.get(segmentPoint.get(seg_no)[1]).getValue()) + / (points.get(i).getTimestamp() + - points.get(segmentPoint.get(seg_no)[1]).getTimestamp()))); + // (i - segmentPoint.get(seg_no)[1]))); + lowSlope = + Math.max( + lowSlope, + ((p_lower[i] - points.get(segmentPoint.get(seg_no)[1]).getValue()) + / (points.get(i).getTimestamp() + - points.get(segmentPoint.get(seg_no)[1]).getTimestamp()))); + // (i - segmentPoint.get(seg_no)[1]))); + if (upSlope < lowSlope) { + seg_no += 1; + segmentPoint.add(new int[] {seg_no, csp_id}); + i = csp_id; + upSlope = Double.POSITIVE_INFINITY; + lowSlope = Double.NEGATIVE_INFINITY; + } else { + double s = + (points.get(i).getValue() - points.get(segmentPoint.get(seg_no)[1]).getValue()) + / (points.get(i).getTimestamp() + - points.get(segmentPoint.get(seg_no)[1]).getTimestamp()); + // (i - segmentPoint.get(seg_no)[1]); + if (s >= lowSlope && s <= upSlope) { + csp_id = i; + } + } + } + + // deal with the last segment + if (segmentPoint.get(seg_no)[1] < length - 1) { + seg_no += 1; + segmentPoint.add(new int[] {seg_no, length - 1}); + } + + List<Point> result = new ArrayList<>(); + for (int[] ints : segmentPoint) { + result.add(points.get(ints[1])); + } + return result; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample_fsw.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample_fsw.java new file mode 100644 index 00000000000..854962e96a6 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample_fsw.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece; + +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.List; + +public class MySample_fsw { + + public static void main(String[] args) { + String fileDir = "D:\\desktop\\NISTPV\\"; + boolean series = true; // 从1开始编号列而不是时间戳列 + String[] datasetNameList = + new String[] { + // "NISTPV-Ground-2015-Qloss_Ah", + "NISTPV-Ground-2015-Pyra1_Wm2", + // "NISTPV-Ground-2015-RTD_C_3" + }; + + int[] noutList = new int[] {0}; + + double[] r = new double[] {0.1, 0.5, 1.3}; + for (int y = 0; y < datasetNameList.length; y++) { + String datasetName = datasetNameList[y]; + // int start = (int) (10000000 / 2 - 2500000 * r[y]); // 从0开始计数 + // int end = (int) (10000000 / 2 + 2500000 * (1 - r[y])); + // int N = end - start; // -1 for all + + int start = 0; + int end = 10000; + int N = end - start; + + for (int nout : noutList) { + // apply Sim-Piece on the input file, outputting nout points saved in csvFile + boolean hasHeader = false; + try (FileInputStream inputStream = new FileInputStream(fileDir + datasetName + ".csv")) { + String delimiter = ","; + TimeSeries ts = + TimeSeriesReader.getMyTimeSeries( + inputStream, delimiter, false, N, start, hasHeader, series); + + double epsilon = 0.1; + List<Point> reducedPoints = FSW.reducePoints(ts.data, epsilon); + try (PrintWriter writer = + new PrintWriter( + new FileWriter(datasetName + "-" + N + "-" + reducedPoints.size() + "-sc.csv"))) { + for (Point p : reducedPoints) { + writer.println(p.getTimestamp() + "," + p.getValue()); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } +}
