This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit be822bc3a670c353ff9392e2e6f570a8554ac611 Author: Lei Rui <[email protected]> AuthorDate: Tue Jul 4 17:35:54 2023 +0800 add --- .../encoding/decoder/DeltaBinaryDecoder.java | 2 +- .../iotdb/tsfile/file/header/PageHeader.java | 3 +- .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 2 +- .../tsfile/file/metadata/TimeseriesMetadata.java | 3 +- .../file/metadata/statistics/Statistics.java | 154 +++++++++++++++++---- .../file/metadata/statistics/ValueIndex.java | 100 +++++++++++++ .../iotdb/tsfile/encoding/SDTEncoderTest.java | 12 +- 7 files changed, 243 insertions(+), 33 deletions(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/DeltaBinaryDecoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/DeltaBinaryDecoder.java index 7f50cd4bb6a..3f225d354d7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/DeltaBinaryDecoder.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/DeltaBinaryDecoder.java @@ -72,7 +72,7 @@ public abstract class DeltaBinaryDecoder extends Decoder { } @Override - public boolean hasNext(ByteBuffer buffer) throws IOException { + public boolean hasNext(ByteBuffer buffer) { return (nextReadIndex < readIntTotalCount) || buffer.remaining() > 0; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java index 178a0a5ee46..2bc1ad7751b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java @@ -60,7 +60,8 @@ public class PageHeader { return new PageHeader(uncompressedSize, compressedSize, statistics); } - public static PageHeader deserializeFrom(ByteBuffer buffer, TSDataType dataType) { + public static PageHeader deserializeFrom(ByteBuffer buffer, TSDataType dataType) + throws IOException { int uncompressedSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); int compressedSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); Statistics statistics = Statistics.deserialize(buffer, dataType); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index 60494e223af..1770618b1a6 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -189,7 +189,7 @@ public class ChunkMetadata implements Comparable<ChunkMetadata> { * @return ChunkMetaData object */ public static ChunkMetadata deserializeFrom( - ByteBuffer buffer, TimeseriesMetadata timeseriesMetadata) { + ByteBuffer buffer, TimeseriesMetadata timeseriesMetadata) throws IOException { ChunkMetadata chunkMetaData = new ChunkMetadata(); chunkMetaData.measurementUid = timeseriesMetadata.getMeasurementId(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java index 7344c2e77c1..f8bea3156b4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java @@ -93,7 +93,8 @@ public class TimeseriesMetadata implements Accountable { this.chunkMetadataList = new ArrayList<>(timeseriesMetadata.chunkMetadataList); } - public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) { + public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) + throws IOException { TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(buffer)); timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readVarIntString(buffer)); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 1d22cd29d3b..18e50e7be27 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -18,6 +18,9 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.encoding.decoder.DeltaBinaryDecoder.IntDeltaDecoder; +import org.apache.iotdb.tsfile.encoding.decoder.DoublePrecisionDecoderV2; import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -60,6 +63,8 @@ public abstract class Statistics<T> { private StepRegress stepRegress = new StepRegress(); + public ValueIndex valueIndex = new ValueIndex(); + /** @author Yuyuan Kang */ final String OPERATION_NOT_SUPPORT_FORMAT = "%s statistics does not support operation: %s"; @@ -109,10 +114,13 @@ public abstract class Statistics<T> { public abstract TSDataType getType(); + @Deprecated public int getSerializedSize() { return ReadWriteForEncodingUtils.uVarIntSize(count) // count + 16 // startTime, endTime + getStatsSize(); + // stepRegress not counted + // value index not counted } public abstract int getStatsSize(); @@ -122,20 +130,40 @@ public abstract class Statistics<T> { byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream); byteLen += ReadWriteIOUtils.write(startTime, outputStream); byteLen += ReadWriteIOUtils.write(endTime, outputStream); - // TODO serialize stepRegress + // serialize stepRegress byteLen += serializeStepRegress(outputStream); + // TODO serialize value index + byteLen += serializeValueIndex(outputStream); // value statistics of different data type byteLen += serializeStats(outputStream); return byteLen; } + int serializeValueIndex(OutputStream outputStream) throws IOException { + valueIndex.learn(); // TODO ensure executed once and only once + int byteLen = 0; + byteLen += ReadWriteIOUtils.write(valueIndex.idxOut.size(), outputStream); + outputStream.write( + valueIndex.idxOut.getBuf(), 0, valueIndex.idxOut.size()); // NOTE len is important + byteLen += valueIndex.idxOut.getBuf().length; + + byteLen += ReadWriteIOUtils.write(valueIndex.valueOut.size(), outputStream); + outputStream.write( + valueIndex.valueOut.getBuf(), 0, valueIndex.valueOut.size()); // NOTE len is important + byteLen += valueIndex.valueOut.getBuf().length; + + byteLen += ReadWriteIOUtils.write(valueIndex.errorBound, outputStream); + + return byteLen; + } + /** * slope, m: the number of segment keys, m-2 segment keys in between when m>=2. The first and the * last segment keys are not serialized here, because they are minTime and endTime respectively. */ int serializeStepRegress(OutputStream outputStream) throws IOException { + stepRegress.learn(); // TODO ensure executed once and only once int byteLen = 0; - stepRegress.learn(); // TODO ensure excuted once and only once byteLen += ReadWriteIOUtils.write(stepRegress.getSlope(), outputStream); // K DoubleArrayList segmentKeys = stepRegress.getSegmentKeys(); // t1 is startTime, tm is endTime, so no need serialize t1 and tm @@ -223,6 +251,9 @@ public abstract class Statistics<T> { // TODO M4-LSM if there are more than one chunk in a time series, then access each // chunkMetadata anyway this.stepRegress = stats.stepRegress; + // TODO + this.valueIndex = stats.valueIndex; + isEmpty = false; } else { String thisClass = this.getClass().toString(); @@ -233,6 +264,7 @@ public abstract class Statistics<T> { } } + @Deprecated public void update(long time, boolean value) { if (time < this.startTime) { startTime = time; @@ -242,61 +274,66 @@ public abstract class Statistics<T> { } count++; updateStats(value); - updateStepRegress(time); } /** @author Yuyuan Kang */ public void update(long time, int value) { + count++; if (time < this.startTime) { startTime = time; } if (time > this.endTime) { endTime = time; } - count++; - updateStats(value, time); + // update time index updateStepRegress(time); + updateValueIndex(value); + updateStats(value, time); } /** @author Yuyuan Kang */ public void update(long time, long value) { + count++; if (time < this.startTime) { startTime = time; } if (time > this.endTime) { endTime = time; } - count++; - updateStats(value, time); updateStepRegress(time); + updateValueIndex(value); + updateStats(value, time); } /** @author Yuyuan Kang */ public void update(long time, float value) { + count++; if (time < this.startTime) { startTime = time; } if (time > this.endTime) { endTime = time; } - count++; - updateStats(value, time); updateStepRegress(time); + updateValueIndex(value); + updateStats(value, time); } /** @author Yuyuan Kang */ public void update(long time, double value) { + count++; if (time < this.startTime) { startTime = time; } if (time > this.endTime) { endTime = time; } - count++; - updateStats(value, time); updateStepRegress(time); + updateValueIndex(value); + updateStats(value, time); } + @Deprecated public void update(long time, Binary value) { if (time < startTime) { startTime = time; @@ -306,9 +343,9 @@ public abstract class Statistics<T> { } count++; updateStats(value); - updateStepRegress(time); } + @Deprecated public void update(long[] time, boolean[] values, int batchSize) { if (time[0] < startTime) { startTime = time[0]; @@ -318,61 +355,65 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, batchSize); - updateStepRegress(time, batchSize); } /** @author Yuyuan Kang */ public void update(long[] time, int[] values, int batchSize) { + count += batchSize; if (time[0] < startTime) { startTime = time[0]; } if (time[batchSize - 1] > this.endTime) { endTime = time[batchSize - 1]; } - count += batchSize; - updateStats(values, time, batchSize); updateStepRegress(time, batchSize); + updateValueIndex(values, batchSize); + updateStats(values, time, batchSize); } /** @author Yuyuan Kang */ public void update(long[] time, long[] values, int batchSize) { + count += batchSize; if (time[0] < startTime) { startTime = time[0]; } if (time[batchSize - 1] > this.endTime) { endTime = time[batchSize - 1]; } - count += batchSize; - updateStats(values, time, batchSize); updateStepRegress(time, batchSize); + updateValueIndex(values, batchSize); + updateStats(values, time, batchSize); } /** @author Yuyuan Kang */ public void update(long[] time, float[] values, int batchSize) { + count += batchSize; if (time[0] < startTime) { startTime = time[0]; } if (time[batchSize - 1] > this.endTime) { endTime = time[batchSize - 1]; } - count += batchSize; - updateStats(values, time, batchSize); updateStepRegress(time, batchSize); + updateValueIndex(values, batchSize); + updateStats(values, time, batchSize); } /** @author Yuyuan Kang */ public void update(long[] time, double[] values, int batchSize) { + count += batchSize; if (time[0] < startTime) { startTime = time[0]; } if (time[batchSize - 1] > this.endTime) { endTime = time[batchSize - 1]; } - count += batchSize; - updateStats(values, time, batchSize); updateStepRegress(time, batchSize); + updateValueIndex(values, batchSize); + updateStats(values, time, batchSize); } + @Deprecated public void update(long[] time, Binary[] values, int batchSize) { if (time[0] < startTime) { startTime = time[0]; @@ -382,7 +423,6 @@ public abstract class Statistics<T> { } count += batchSize; updateStats(values, batchSize); - updateStepRegress(time, batchSize); } protected abstract void mergeStatisticsValue(Statistics stats); @@ -415,6 +455,46 @@ public abstract class Statistics<T> { } } + void updateValueIndex(int value) { + valueIndex.insert(value); + } + + void updateValueIndex(long value) { + valueIndex.insert(value); + } + + void updateValueIndex(float value) { + valueIndex.insert(value); + } + + void updateValueIndex(double value) { + valueIndex.insert(value); + } + + void updateValueIndex(int[] values, int batchSize) { + for (int i = 0; i < batchSize; i++) { + updateValueIndex(values[i]); + } + } + + void updateValueIndex(long[] values, int batchSize) { + for (int i = 0; i < batchSize; i++) { + updateValueIndex(values[i]); + } + } + + void updateValueIndex(float[] values, int batchSize) { + for (int i = 0; i < batchSize; i++) { + updateValueIndex(values[i]); + } + } + + void updateValueIndex(double[] values, int batchSize) { + for (int i = 0; i < batchSize; i++) { + updateValueIndex(values[i]); + } + } + /** @author Yuyuan Kang */ public void updateStats(int value, long timestamp) { throw new UnsupportedOperationException(); @@ -477,6 +557,7 @@ public abstract class Statistics<T> { throw new UnsupportedOperationException(); } + @Deprecated public static Statistics deserialize(InputStream inputStream, TSDataType dataType) throws IOException { Statistics statistics = getStatsByType(dataType); @@ -488,17 +569,42 @@ public abstract class Statistics<T> { return statistics; } - public static Statistics deserialize(ByteBuffer buffer, TSDataType dataType) { + public static Statistics deserialize(ByteBuffer buffer, TSDataType dataType) throws IOException { Statistics statistics = getStatsByType(dataType); statistics.setCount(ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); statistics.setStartTime(ReadWriteIOUtils.readLong(buffer)); statistics.setEndTime(ReadWriteIOUtils.readLong(buffer)); - statistics.deserializeStepRegress(buffer); // TODO + statistics.deserializeStepRegress(buffer); + statistics.deserializeValueIndex(buffer); // TODO statistics.deserialize(buffer); statistics.isEmpty = false; return statistics; } + void deserializeValueIndex(ByteBuffer buffer) throws IOException { + int idxSize = ReadWriteIOUtils.readInt(buffer); + ByteBuffer idxBuffer = buffer.slice(); + idxBuffer.limit(idxSize); + buffer.position(buffer.position() + idxSize); + Decoder idxDecoder = new IntDeltaDecoder(); + while (idxDecoder.hasNext(idxBuffer)) { + int idx = idxDecoder.readInt(idxBuffer); + valueIndex.modelPointIdx_list.add(idx); + } + + int valueSize = ReadWriteIOUtils.readInt(buffer); // valueOut.size() + ByteBuffer valueBuffer = buffer.slice(); + valueBuffer.limit(valueSize); + buffer.position(buffer.position() + valueSize); + Decoder valueDecoder = new DoublePrecisionDecoderV2(); + while (valueDecoder.hasNext(valueBuffer)) { + double value = valueDecoder.readDouble(valueBuffer); + valueIndex.modelPointVal_list.add(value); + } + + valueIndex.errorBound = ReadWriteIOUtils.readDouble(buffer); + } + void deserializeStepRegress(ByteBuffer byteBuffer) { this.stepRegress.setSlope(ReadWriteIOUtils.readDouble(byteBuffer)); // K int m = ReadWriteIOUtils.readInt(byteBuffer); // m diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/ValueIndex.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/ValueIndex.java new file mode 100644 index 00000000000..24deea0a871 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/ValueIndex.java @@ -0,0 +1,100 @@ +package org.apache.iotdb.tsfile.file.metadata.statistics; + +import org.apache.iotdb.tsfile.encoding.encoder.DeltaBinaryEncoder.IntDeltaEncoder; +import org.apache.iotdb.tsfile.encoding.encoder.DoublePrecisionEncoderV2; +import org.apache.iotdb.tsfile.encoding.encoder.SDTEncoder; +import org.apache.iotdb.tsfile.utils.PublicBAOS; + +import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; +import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList; + +public class ValueIndex { + + private DoubleArrayList values = new DoubleArrayList(); + public SDTEncoder sdtEncoder = new SDTEncoder(); + public double errorBound = 0; // =std*2 =2*std.compDeviation + public IntDeltaEncoder idxEncoder = new IntDeltaEncoder(); + public DoublePrecisionEncoderV2 valueEncoder = new DoublePrecisionEncoderV2(); + public PublicBAOS idxOut = new PublicBAOS(); + public PublicBAOS valueOut = new PublicBAOS(); + public int lastIdx = 0; + public int lastIntValue = 0; + public long lastLongValue = 0; + public float lastFloatValue = 0; + public double lastDoubleValue = 0; + public IntArrayList modelPointIdx_list = new IntArrayList(); + public DoubleArrayList modelPointVal_list = new DoubleArrayList(); + + // this is necessary, otherwise serialized twice by timeseriesMetadata and chunkMetadata + // causing learn() executed more than once!! + private boolean isLearned = false; + + private double stdDev = 0; // standard deviation of intervals + private long count = 0; + private double sumX2 = 0.0; + private double sumX1 = 0.0; + + public void insert(int value) { + values.add((double) value); + count++; + sumX1 += (double) value; + sumX2 += (double) value * (double) value; + } + + public void insert(long value) { + values.add((double) value); + count++; + sumX1 += (double) value; + sumX2 += (double) value * (double) value; + } + + public void insert(float value) { + values.add((double) value); + count++; + sumX1 += (double) value; + sumX2 += (double) value * (double) value; + } + + public void insert(double value) { + values.add(value); + count++; + sumX1 += value; + sumX2 += value * value; + } + + public double getStdDev() { // sample standard deviation + double std = Math.sqrt(this.sumX2 / this.count - Math.pow(this.sumX1 / this.count, 2)); + return Math.sqrt(Math.pow(std, 2) * this.count / (this.count - 1)); + } + + private void initForLearn() { + this.stdDev = getStdDev(); + this.errorBound = 2 * stdDev; + this.sdtEncoder.setCompDeviation(errorBound / 2); // equals stdDev is best + } + + public void learn() { + if (isLearned) { + // this is necessary, otherwise serialized twice by timeseriesMetadata and chunkMetadata + // causing learn() executed more than once!! + return; + } + isLearned = true; + initForLearn(); // set self-adapting CompDeviation for sdtEncoder + int pos = 0; + for (double v : values.toArray()) { + pos++; // starting from 1 + if (sdtEncoder.encodeDouble(pos, v)) { + idxEncoder.encode((int) sdtEncoder.getTime(), idxOut); + valueEncoder.encode(sdtEncoder.getDoubleValue(), valueOut); + } + } + // add the last point except the first point + if (values.size() >= 2) { // means there is last point except the first point + idxEncoder.encode(pos, idxOut); + valueEncoder.encode(values.getLast(), valueOut); + } + idxEncoder.flush(idxOut); + valueEncoder.flush(valueOut); + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java index 0a21c23f605..07450c8beae 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java @@ -233,7 +233,9 @@ public class SDTEncoderTest { public void mytest2() throws Exception { // when e=2*std=160000 is the best String csvData = "D:\\full-game\\BallSpeed.csv"; - double[] eList = new double[] {500000, 400000, 300000, 200000, 160000, 100000, 50000, 10000}; + // double[] eList = new double[] {500000, 400000, 300000, 200000, 160000, 100000, 50000, + // 10000}; + double[] eList = new double[] {160000}; int[] startList = new int[] {1, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000}; List<Double> elapsedTime_withValueIndex_list = new ArrayList<>(); List<Double> elapsedTime_withoutValueIndex_list = new ArrayList<>(); @@ -295,8 +297,8 @@ public class SDTEncoderTest { // System.out.println("v=" + valueList + ";"); // System.out.println("at=" + selectTimestamps + ";"); // System.out.println("av=" + selectValues + ";"); - // System.out.println("v.size=" + valueList.size()); - // System.out.println("av.size=" + selectTimestamps.size()); + System.out.println("v.size=" + valueList.size()); + System.out.println("av.size=" + selectTimestamps.size()); // System.out.println("start test------"); @@ -379,7 +381,7 @@ public class SDTEncoderTest { elapsedTime_withValueIndex += elapsedTime; // System.out.println("search with value index: " + elapsedTime / 1000000.0 + " ms"); // System.out.println("TP=(" + candidateTPidx + "," + candidateTPvalue + ")"); - // System.out.println("search interval number=" + prune_intervals_end.size()); + System.out.println("search interval number=" + prune_intervals_end.size()); int traversedPoints = 0; for (int i = 0; i < prune_intervals_start.size(); i++) { int search_interval_start = prune_intervals_end.get(i) + 1; // included @@ -389,7 +391,7 @@ public class SDTEncoderTest { } } traversedComplexity += traversedPoints; - // System.out.println("number of traversed points: " + traversedPoints); + System.out.println("number of traversed points: " + traversedPoints); // System.out.println("start test------"); startTime = System.nanoTime();
