This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 05c9ab13cf79958881537008144b8e95099a6bc7 Author: Lei Rui <[email protected]> AuthorDate: Tue Jan 24 17:18:43 2023 +0800 stepRegress write --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 64 ++-- tsfile/pom.xml | 5 + .../file/metadata/statistics/Statistics.java | 176 ++++++++-- .../file/metadata/statistics/StepRegress.java | 383 +++++++++++++++++++++ .../statistics/TimeExactOrderStatistics.java | 193 +++++++++++ 5 files changed, 753 insertions(+), 68 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index 59feabe4e1..68ab1b055a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -19,6 +19,14 @@ package org.apache.iotdb.db.query.dataset.groupby; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -28,7 +36,6 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.filter.TsFileFilter; import org.apache.iotdb.db.query.reader.series.SeriesReader; -import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -51,15 +58,6 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; - /** * Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), * max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). Requirements: (1) Don't change the @@ -84,7 +82,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { private TSDataType tsDataType; - private PriorityMergeReader mergeReader; +// private PriorityMergeReader mergeReader; public LocalGroupByExecutor4CPV( PartialPath path, @@ -97,7 +95,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { throws StorageEngineException, QueryProcessException { this.tsDataType = dataType; - this.mergeReader = new PriorityMergeReader(); +// this.mergeReader = new PriorityMergeReader(); // get all data sources QueryDataSource queryDataSource = @@ -199,9 +197,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { /** * @param curStartTime closed - * @param curEndTime open - * @param startTime closed - * @param endTime open + * @param curEndTime open + * @param startTime closed + * @param endTime open */ @Override public List<AggregateResult> calcResult( @@ -226,7 +224,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return results; } - /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */ + /** + * 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 + */ private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) { if (chunkSuit4CPV.getBatchData() != null) { BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false); @@ -323,8 +323,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) .compareTo( new MergeReaderPriority( o1.getChunkMetadata().getVersion(), @@ -411,7 +411,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 是被overlap,则partial scan所有这些overlap的块 @@ -455,7 +455,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 找到这样的点,于是标记candidate point所在块为lazy @@ -521,8 +521,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) .compareTo( new MergeReaderPriority( o1.getChunkMetadata().getVersion(), @@ -609,7 +609,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 是被overlap,则partial scan所有这些overlap的块 @@ -653,7 +653,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 找到这样的点,于是标记candidate point所在块为lazy @@ -701,8 +701,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return res; } else { return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) .compareTo( new MergeReaderPriority( o1.getChunkMetadata().getVersion(), @@ -767,11 +767,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(0) .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); results .get(2) .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); return; } } @@ -798,8 +798,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return res; } else { return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) .compareTo( new MergeReaderPriority( o1.getChunkMetadata().getVersion(), @@ -864,11 +864,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(1) .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); results .get(3) .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); return; } } diff --git a/tsfile/pom.xml b/tsfile/pom.xml index f26c930f1a..dcddf19cf9 100644 --- a/tsfile/pom.xml +++ b/tsfile/pom.xml @@ -37,6 +37,11 @@ <tsfile.ut.skip>${tsfile.test.skip}</tsfile.ut.skip> </properties> <dependencies> + <dependency> + <groupId>org.eclipse.collections</groupId> + <artifactId>eclipse-collections</artifactId> + <version>10.4.0</version> + </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 947805c660..397fed5e5c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -18,27 +18,26 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - +import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Objects; - /** * This class is used for recording statistic information of each measurement in a delta file. While * writing processing, the processor records the statistics information. Statistics includes - * maximum, minimum and null value count up to version 0.0.1.<br> - * Each data type extends this Statistic as super class.<br> + * maximum, minimum and null value count up to version 0.0.1.<br> Each data type extends this + * Statistic as super class.<br> * <br> * For the statistics in the Unseq file TimeSeriesMetadata, only firstValue, lastValue, startTime * and endTime can be used.</br> @@ -51,13 +50,19 @@ public abstract class Statistics<T> { */ protected boolean isEmpty = true; - /** number of time-value points */ + /** + * number of time-value points + */ private int count = 0; private long startTime = Long.MAX_VALUE; private long endTime = Long.MIN_VALUE; - /** @author Yuyuan Kang */ + private StepRegress stepRegress = new StepRegress(); + + /** + * @author Yuyuan Kang + */ final String OPERATION_NOT_SUPPORT_FORMAT = "%s statistics does not support operation: %s"; /** @@ -119,34 +124,59 @@ public abstract class Statistics<T> { byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream); byteLen += ReadWriteIOUtils.write(startTime, outputStream); byteLen += ReadWriteIOUtils.write(endTime, outputStream); + // TODO serialize stepRegress + byteLen += serializeStepRegress(outputStream); // value statistics of different data type byteLen += serializeStats(outputStream); return byteLen; } + int serializeStepRegress(OutputStream outputStream) throws IOException { + int byteLen = 0; + stepRegress.learn(); // TODO ensure excuted once and only once + byteLen += ReadWriteIOUtils.write(stepRegress.getSlope(), outputStream); // K + DoubleArrayList segmentKeys = stepRegress.getSegmentKeys(); + // t1 is startTime, tm is endTime, so no need serialize t1 and tm + byteLen += ReadWriteIOUtils.write(segmentKeys.size(), outputStream); // m + for (int i = 1; i < segmentKeys.size() - 1; i++) { // t2,t3,...,tm-1 + byteLen += ReadWriteIOUtils.write(segmentKeys.get(i), outputStream); + } + return byteLen; + } + abstract int serializeStats(OutputStream outputStream) throws IOException; - /** read data from the inputStream. */ + /** + * read data from the inputStream. + */ public abstract void deserialize(InputStream inputStream) throws IOException; public abstract void deserialize(ByteBuffer byteBuffer); // public abstract void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes); - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public abstract MinMaxInfo<T> getMinInfo(); - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public abstract MinMaxInfo<T> getMaxInfo(); public abstract T getMinValue(); public abstract T getMaxValue(); - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public abstract long getBottomTimestamp(); - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public abstract long getTopTimestamp(); public abstract T getFirstValue(); @@ -193,6 +223,9 @@ public abstract class Statistics<T> { // must be sure no overlap between two statistics this.count += stats.count; mergeStatisticsValue(stats); + // TODO M4-LSM assumes that there is always only one page in a chunk + // TODO M4-LSM if there are more than one chunk in a time series, then access each chunkMetadata anyway + this.stepRegress = stats.stepRegress; isEmpty = false; } else { String thisClass = this.getClass().toString(); @@ -212,9 +245,12 @@ public abstract class Statistics<T> { } count++; updateStats(value); + updateStepRegress(time); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long time, int value) { if (time < this.startTime) { startTime = time; @@ -224,9 +260,12 @@ public abstract class Statistics<T> { } count++; updateStats(value, time); + updateStepRegress(time); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long time, long value) { if (time < this.startTime) { startTime = time; @@ -236,9 +275,12 @@ public abstract class Statistics<T> { } count++; updateStats(value, time); + updateStepRegress(time); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long time, float value) { if (time < this.startTime) { startTime = time; @@ -248,9 +290,12 @@ public abstract class Statistics<T> { } count++; updateStats(value, time); + updateStepRegress(time); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long time, double value) { if (time < this.startTime) { startTime = time; @@ -260,6 +305,7 @@ public abstract class Statistics<T> { } count++; updateStats(value, time); + updateStepRegress(time); } public void update(long time, Binary value) { @@ -271,6 +317,7 @@ public abstract class Statistics<T> { } count++; updateStats(value); + updateStepRegress(time); } public void update(long[] time, boolean[] values, int batchSize) { @@ -282,9 +329,12 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, batchSize); + updateStepRegress(time, batchSize); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long[] time, int[] values, int batchSize) { if (time[0] < startTime) { startTime = time[0]; @@ -294,9 +344,12 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, time, batchSize); + updateStepRegress(time, batchSize); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long[] time, long[] values, int batchSize) { if (time[0] < startTime) { startTime = time[0]; @@ -306,9 +359,12 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, time, batchSize); + updateStepRegress(time, batchSize); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long[] time, float[] values, int batchSize) { if (time[0] < startTime) { startTime = time[0]; @@ -318,9 +374,12 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, time, batchSize); + updateStepRegress(time, batchSize); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void update(long[] time, double[] values, int batchSize) { if (time[0] < startTime) { startTime = time[0]; @@ -330,6 +389,7 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, time, batchSize); + updateStepRegress(time, batchSize); } public void update(long[] time, Binary[] values, int batchSize) { @@ -341,6 +401,7 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, batchSize); + updateStepRegress(time, batchSize); } protected abstract void mergeStatisticsValue(Statistics stats); @@ -353,32 +414,54 @@ public abstract class Statistics<T> { isEmpty = empty; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public abstract void updateMinInfo(T val, long timestamp); - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public abstract void updateMaxInfo(T val, long timestamp); void updateStats(boolean value) { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + void updateStepRegress(long timestamp) { + stepRegress.insert(timestamp); + } + + void updateStepRegress(long[] timestamps, int batchSize) { + for (int i = 0; i < batchSize; i++) { + updateStepRegress(timestamps[i]); + } + } + + /** + * @author Yuyuan Kang + */ void updateStats(int value, long timestamp) { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ void updateStats(long value, long timestamp) { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ void updateStats(float value, long timestamp) { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ void updateStats(double value, long timestamp) { throw new UnsupportedOperationException(); } @@ -391,22 +474,30 @@ public abstract class Statistics<T> { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ void updateStats(int[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ void updateStats(long[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ void updateStats(float[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ void updateStats(double[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } @@ -416,10 +507,10 @@ public abstract class Statistics<T> { } /** - * @author Yuyuan Kang This method with two parameters is only used by {@code unsequence} which - * updates/inserts/deletes timestamp. * @param min min timestamp * @param max max timestamp + * @author Yuyuan Kang This method with two parameters is only used by {@code unsequence} which + * updates/inserts/deletes timestamp. */ public void updateStats(long min, long bottomTimestamp, long max, long topTimestamp) { throw new UnsupportedOperationException(); @@ -441,11 +532,24 @@ public abstract class Statistics<T> { statistics.setCount(ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); statistics.setStartTime(ReadWriteIOUtils.readLong(buffer)); statistics.setEndTime(ReadWriteIOUtils.readLong(buffer)); + statistics.deserializeStepRegress(buffer); // TODO statistics.deserialize(buffer); statistics.isEmpty = false; return statistics; } + void deserializeStepRegress(ByteBuffer byteBuffer) { + this.stepRegress.setSlope(ReadWriteIOUtils.readDouble(byteBuffer)); //K + int m = ReadWriteIOUtils.readInt(byteBuffer); // m + DoubleArrayList segmentKeys = new DoubleArrayList(); + segmentKeys.add(this.startTime); // t1 + for (int i = 0; i < m - 2; i++) { // t2,t3,...,tm-1 + segmentKeys.add(ReadWriteIOUtils.readDouble(byteBuffer)); + } + segmentKeys.add(this.endTime); + this.stepRegress.setSegmentKeys(segmentKeys); + } + public long getStartTime() { return startTime; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java new file mode 100644 index 0000000000..96d2736fb3 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.file.metadata.statistics; + +import java.io.IOException; +import java.util.Arrays; +import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; +import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList; +import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList; + +public class StepRegress { + + private double slope; + + // when learning parameters, we first determine segmentIntercepts and then determine segmentKeys; + // when using functions, we read segmentKeys and then infer segmentIntercepts. + // fix that the first segment is always tilt, + // so for indexes starting from 0, even id is tilt, odd id is level. + private DoubleArrayList segmentIntercepts = new DoubleArrayList(); // b1,b2,...,bm-1 + + // fix that the first segment [t1,t2) is always tilt, + // so t2=t1 in fact means that the first status is level + private DoubleArrayList segmentKeys = new DoubleArrayList(); // t1,t2,...,tm + // TODO deal with the last key tm + + private LongArrayList timestamps = new LongArrayList(); // Pi.t + private LongArrayList intervals = new LongArrayList(); // Pi+1.t-Pi.t + + enum IntervalType { + tilt, + level + } + + private IntArrayList intervalsType = new IntArrayList(); + private long previousTimestamp = -1; + + private double mean = 0; // mean of intervals + private double stdDev = 0; // standard deviation of intervals + private long count = 0; + private double sumX2 = 0.0; + private double sumX1 = 0.0; + + private double median = 0; // median of intervals + private double mad = 0; // median absolute deviation of intervals + TimeExactOrderStatistics statistics = new TimeExactOrderStatistics(); + + /** + * load data, record timestamps and intervals, preparing to calculate mean,std,median,mad along + * the way + */ + public void insert(long timestamp) { + timestamps.add(timestamp); // record + if (previousTimestamp > 0) { + long delta = timestamp - previousTimestamp; + intervals.add(delta); // record + // prepare for mean and stdDev + count++; + sumX1 += delta; + sumX2 += delta * delta; + // prepare for median and mad + statistics.insert(delta); + } + previousTimestamp = timestamp; + } + + private void initForLearn() { + this.mean = getMean(); + this.stdDev = getStdDev(); + this.median = getMedian(); + this.mad = getMad(); + this.slope = 1 / this.median; + this.segmentKeys.add(timestamps.get(0)); // t1 + this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1 + } + + /** + * learn the parameters of the step regression function for the loaded data. + */ + public void learn() throws IOException { + initForLearn(); + + int tiltLatestSegmentID = 0; + IntervalType previousIntervalType = IntervalType.tilt; + + for (int i = 0; i < intervals.size(); i++) { + long delta = intervals.get(i); + + // the current point (t,pos) focused, where t is the left endpoint of the current interval. + long t = timestamps.get(i); + int pos = i + 1; + // the next point (t,pos), where t is the right endpoint of the current interval. + long nextT = timestamps.get(i + 1); + int nextPos = i + 2; + + // 1) determine the type of the current interval + // level condition: big interval && the right endpoint of the interval is under the latest tilt line. + // Note the right endpoint, not the left endpoint. + // "the right endpoint of the interval is under the latest tilt line" is to ensure the + // monotonically decreasing order of tilt intercepts (the last interval running through the last point + // is handled using post-processing to avoid disorder of tilt intercepts) + boolean isLevel = isBigInterval(delta) && (nextPos < slope * nextT + segmentIntercepts.get( + tiltLatestSegmentID)); + // to avoid TLTLTLTL... causing trivial segments, add extra rule for tilt + if (!isLevel) { + if (previousIntervalType == IntervalType.level) { // when previous interval is level + if (i < intervals.size() - 1) { // when having next interval + long nextDelta = intervals.get(i + 1); + if (isBigInterval(nextDelta) && (nextPos + 1 + < slope * timestamps.get(i + 2) + segmentIntercepts.get( + tiltLatestSegmentID))) { // when next interval is also level + isLevel = true; // then fix type from tilt to level, LTL=>LLL + } + } + } + } + + // 2) determine if starting a new segment + if (isLevel) { + intervalsType.add(IntervalType.level.ordinal()); + if (previousIntervalType == IntervalType.tilt) { // else do nothing, still level + // [[[translate from tilt to level]]] + previousIntervalType = IntervalType.level; + // 3) to determine the intercept, let the level function run through (t,pos) + double intercept = pos; // b2i=pos + // 4) to determine the segment key, let the level function and the previous tilt function intersect + segmentKeys.add((intercept - segmentIntercepts.getLast()) / slope); // x2i=(b2i-b2i-1)/K + // then add intercept to segmentIntercepts, do not change the order of codes here + segmentIntercepts.add( + intercept); // TODO debug if the first status is actually level works + } + // deal with the last interval to make sure the last point is hit + // TODO create examples to debug this + if (i == intervals.size() - 1) { + // 3) to determine the intercept, let the level function run through (timestamps.getLast(),timestamps.size()) + double intercept = timestamps.size(); // b2i=pos + // 4) to determine the segment key, let the level function and the previous tilt function intersect + // Note that here is rewrite instead of add. + // Note taht here is not getLast + segmentKeys.set(segmentKeys.size() - 1, + (intercept - segmentIntercepts.get(segmentIntercepts.size() - 2)) + / slope); // x2i=(b2i-b2i-1)/K TODO debug here not getLast! + // then add intercept to segmentIntercepts, do not change the order of codes here + // Note that here is rewrite instead of add. + segmentIntercepts.set(segmentIntercepts.size() - 1, intercept); + } + } else { + intervalsType.add(IntervalType.tilt.ordinal()); + if (previousIntervalType == IntervalType.level) { // else do nothing, still tilt + // [[[translate form level to tilt]]] + previousIntervalType = IntervalType.tilt; + // 3) to determine the intercept, let the tilt function run through (t,pos) + double intercept = pos - slope * t; // b2i+1=pos-K*t + // 4) to determine the segment key, let the level function and the previous tilt function intersect + segmentKeys.add((segmentIntercepts.getLast() - intercept) / slope); // x2i+1=(b2i-b2i+1)/K + // then add intercept to segmentIntercepts, do not change the order of codes here + segmentIntercepts.add(intercept); + // remember to update tiltLatestSegmentID + tiltLatestSegmentID += 2; + } + // deal with the last interval to make sure the last point is hit + // TODO create examples to debug this + if (i == intervals.size() - 1) { + if (segmentIntercepts.size() == 1) { // all TTTTTT, only one segment info + // remove all segment info, and directly connect the first and the last point + this.slope = + (timestamps.size() - 1.0) / (timestamps.getLast() - timestamps.getFirst()); + this.segmentKeys = new DoubleArrayList(); + this.segmentIntercepts = new DoubleArrayList(); + this.segmentKeys.add(timestamps.get(0)); // t1 + this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1 + } else { + // 3) to determine the intercept, let the tilt function run through (timestamps.getLast(),timestamps.size()) + double intercept = timestamps.size() - slope * timestamps.getLast(); // b2i+1=pos-K*t + // 4) to determine the segment key, let the level function and the previous tilt function intersect + // Note that here is rewrite instead of add. + // Note taht here is not getLast + segmentKeys.set(segmentKeys.size() - 1, + (segmentIntercepts.get(segmentIntercepts.size() - 2) - intercept) + / slope); // x2i+1=(b2i-b2i+1)/K TODO debug here not getLast! + // then add intercept to segmentIntercepts, do not change the order of codes here + // Note that here is rewrite instead of add. + segmentIntercepts.set(segmentIntercepts.size() - 1, intercept); + + // now check to remove possible disorders + // search from back to front to find the first tilt intercept that is equal to or larger than the current intercept + int start = segmentIntercepts.size() - 3; // TODO debug + // TODO consider only one T + boolean equals = false; + for (; start >= 0; start -= 2) { + // note the step is 2, only tilt intercept, no level intercept + if (segmentIntercepts.get(start) == intercept) { + equals = true; + break; + } + if (segmentIntercepts.get(start) > intercept) { + equals = false; + break; + } + } + if (start < 0) { // TODO bug consider when start<0, i.e., not found: connecting directly + // remove all segment info, and directly connect the first and the last point + this.slope = + (timestamps.size() - 1.0) / (timestamps.getLast() - timestamps.getFirst()); + this.segmentKeys = new DoubleArrayList(); + this.segmentIntercepts = new DoubleArrayList(); + this.segmentKeys.add(timestamps.get(0)); // t1 + this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1 + } else if (start < segmentIntercepts.size() - 3) { + if (!equals) { + // remove all segment information after start+1 id, i.e., remove from start+2~end + // note that the level after start tilt is kept since equals=false. + segmentIntercepts = DoubleArrayList.newListWith( + Arrays.copyOfRange(segmentIntercepts.toArray(), 0, start + 2)); + segmentKeys = DoubleArrayList.newListWith( + Arrays.copyOfRange(segmentKeys.toArray(), 0, start + 2)); + + // Add new segment info for TL&T + // 4) to determine the segment key, let the level function and the previous tilt function intersect + // Note that here is add and getLast again! + segmentKeys.add( + (segmentIntercepts.getLast() - intercept) / slope); // x2i+1=(b2i-b2i+1)/K + // then add intercept to segmentIntercepts, do not change the order of codes here + // Note that here is add and getLast again! + segmentIntercepts.add(intercept); + } else { + // remove all segment information after start id, i.e., remove from start+1~end + // note that the level after start tilt is NOT kept since equal==true + segmentIntercepts = DoubleArrayList.newListWith( + Arrays.copyOfRange(segmentIntercepts.toArray(), 0, start + 1)); + segmentKeys = DoubleArrayList.newListWith( + Arrays.copyOfRange(segmentKeys.toArray(), 0, start + 1)); + // TODO debug the first status is level, b1 + } + } + // otherwise start==segmentIntercepts.size()-3 means result is already ready, no disorder to handle + } + } + } + } + segmentKeys.add(timestamps.getLast()); // tm + + checkOrder(segmentIntercepts); + } + + /** + * For id starting from 0, since we fix that the first status is always tilt, then intercepts with + * even id should be monotonically decreasing, and intercepts with odd id should be monotonically + * increasing. + */ + private void checkOrder(DoubleArrayList segmentIntercepts) throws IOException { + double tiltIntercept = Double.MAX_VALUE; + double levelIntercept = Double.MIN_VALUE; + for (int i = 0; i < segmentIntercepts.size(); i++) { + double intercept = segmentIntercepts.get(i); + if (i % 2 == 0) { + if (intercept >= tiltIntercept) { + throw new IOException(String.format("disorder of tilt intercepts!: i=%s", i)); + } + tiltIntercept = intercept; + } else { + if (intercept <= levelIntercept) { + throw new IOException(String.format("disorder of level intercepts!: i=%s", i)); + } + levelIntercept = intercept; + } + } + } + + private boolean isBigInterval(long interval) { + int bigIntervalParam = 3; + return interval > this.mean + bigIntervalParam * this.stdDev; + } + + public double getMedian() { + return statistics.getMedian(); + } + + public double getMad() { + return statistics.getMad(); + } + + public double getMean() { // sample mean + return sumX1 / count; + } + + public double getStdDev() { // sample standard deviation + double std = Math.sqrt(this.sumX2 / this.count - Math.pow(this.sumX1 / this.count, 2)); + return Math.sqrt(Math.pow(std, 2) * this.count / (this.count - 1)); + } + + public DoubleArrayList getSegmentIntercepts() { + return segmentIntercepts; + } + + public double getSlope() { + return slope; + } + + public void setSlope(double slope) { + this.slope = slope; + } + + public void setSegmentKeys(DoubleArrayList segmentKeys) { + this.segmentKeys = segmentKeys; + } + + public DoubleArrayList getSegmentKeys() { + return segmentKeys; + } + + public IntArrayList getIntervalsType() { + return intervalsType; + } + + public LongArrayList getIntervals() { + return intervals; + } + + public LongArrayList getTimestamps() { + return timestamps; + } + + /** + * infer m-1 intercepts b1,b2,...,bm-1 given the slope and m segmentKeys t1,t2,...,tm (tm is not + * used) + */ + public static DoubleArrayList inferInterceptsFromSegmentKeys(double slope, + DoubleArrayList segmentKeys) { + DoubleArrayList segmentIntercepts = new DoubleArrayList(); + segmentIntercepts.add(1 - slope * segmentKeys.get(0)); // b1=1-K*t1 + for (int i = 1; i < segmentKeys.size() - 1; i++) { // b2,b3,...,bm-1 + if (i % 2 == 0) { // b2i+1=b2i-1-K*(t2i+1-t2i) + double b = segmentIntercepts.get(segmentIntercepts.size() - 2); + segmentIntercepts.add(b - slope * (segmentKeys.get(i) - segmentKeys.get(i - 1))); + } else { // b2i=K*t2i+b2i-1 + double b = segmentIntercepts.getLast(); + segmentIntercepts.add(slope * segmentKeys.get(i) + b); + } + } + return segmentIntercepts; + } + + /** + * @param t input + * @return output the value of the step regression function f(t) + */ + public double infer(double t) throws IOException { + if (t < segmentKeys.get(0) || t > segmentKeys.getLast()) { + throw new IOException( + String.format("t out of range. input within [%s,%s]", segmentKeys.get(0), + segmentKeys.getLast())); + } + int seg = 0; + for (; seg < segmentKeys.size() - 1; seg++) { + if (t <= segmentKeys.get(seg + 1)) { // t < the right end of the segment interval + break; + } + } + // we have fixed that the first status is always tilt, + // so for indexes starting from 0, even id is tilt, odd id is level. + if (seg % 2 == 0) { // tilt + return slope * t + segmentIntercepts.get(seg); + } else { + return segmentIntercepts.get(seg); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java new file mode 100644 index 0000000000..58e2fa0bf6 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeExactOrderStatistics.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.file.metadata.statistics; + +import java.util.NoSuchElementException; +import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; +import org.eclipse.collections.impl.list.mutable.primitive.FloatArrayList; +import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList; +import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList; + +/** + * Util for computing median, MAD, percentile + */ +public class TimeExactOrderStatistics { + + private LongArrayList longArrayList; + + public TimeExactOrderStatistics() { + longArrayList = new LongArrayList(); + } + + public void insert(long timestamp) { + longArrayList.add(timestamp); + } + + public double getMedian() { + return getMedian(longArrayList); + } + + public double getMad() { + return getMad(longArrayList); + } + + public String getPercentile(double phi) { + return Long.toString(getPercentile(longArrayList, phi)); + } + + public static double getMedian(FloatArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + if (nums.size() % 2 == 0) { + return ((nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0); + } else { + return nums.get((nums.size() - 1) / 2); + } + } + } + + public static double getMad(FloatArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + double median = getMedian(nums); + DoubleArrayList dal = new DoubleArrayList(); + for (int i = 0; i < nums.size(); ++i) { + dal.set(i, Math.abs(nums.get(i) - median)); + } + return getMedian(dal); + } + } + + public static float getPercentile(FloatArrayList nums, double phi) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + return nums.get((int) Math.ceil(nums.size() * phi)); + } + } + + public static double getMedian(DoubleArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + if (nums.size() % 2 == 0) { + return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0; + } else { + return nums.get((nums.size() - 1) / 2); + } + } + } + + public static double getMad(DoubleArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + double median = getMedian(nums); + DoubleArrayList dal = new DoubleArrayList(); + for (int i = 0; i < nums.size(); ++i) { + dal.set(i, Math.abs(nums.get(i) - median)); + } + return getMedian(dal); + } + } + + public static double getPercentile(DoubleArrayList nums, double phi) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + return nums.get((int) Math.ceil(nums.size() * phi)); + } + } + + public static double getMedian(IntArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + if (nums.size() % 2 == 0) { + return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0; + } else { + return nums.get((nums.size() - 1) / 2); + } + } + } + + public static double getMad(IntArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + double median = getMedian(nums); + DoubleArrayList dal = new DoubleArrayList(); + for (int i = 0; i < nums.size(); ++i) { + dal.set(i, Math.abs(nums.get(i) - median)); + } + return getMedian(dal); + } + } + + public static int getPercentile(IntArrayList nums, double phi) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + return nums.get((int) Math.ceil(nums.size() * phi)); + } + } + + public static double getMedian(LongArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + if (nums.size() % 2 == 0) { + return (nums.get(nums.size() / 2) + nums.get(nums.size() / 2 - 1)) / 2.0; + } else { + return nums.get((nums.size() - 1) / 2); + } + } + } + + public static double getMad(LongArrayList nums) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + double median = getMedian(nums); + DoubleArrayList dal = DoubleArrayList.newWithNValues(nums.size(), 0); + for (int i = 0; i < nums.size(); ++i) { + dal.set(i, Math.abs(nums.get(i) - median)); + } + return getMedian(dal); + } + } + + public static long getPercentile(LongArrayList nums, double phi) { + if (nums.isEmpty()) { + throw new NoSuchElementException(); + } else { + nums.sortThis(); + return nums.get((int) Math.ceil(nums.size() * phi)); + } + } +}
