This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 49cd2898b935f888216119d78e60a2594ab26b14 Author: Lei Rui <[email protected]> AuthorDate: Wed Jul 5 16:52:43 2023 +0800 runnable --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 78 +++++---- .../apache/iotdb/db/integration/m4/MyTest5.java | 2 +- .../file/metadata/statistics/Statistics.java | 180 ++++++++------------- .../file/metadata/statistics/ValueIndex.java | 9 +- .../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 39 ++--- .../iotdb/tsfile/read/common/ValuePoint.java | 2 +- .../iotdb/tsfile/read/reader/page/PageReader.java | 151 +++++++++-------- .../iotdb/tsfile/encoding/SDTEncoderTest.java | 29 ++-- 8 files changed, 237 insertions(+), 253 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index fd95d95c257..5223ec6bde2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -19,14 +19,6 @@ package org.apache.iotdb.db.query.dataset.groupby; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -53,9 +45,19 @@ import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Pair; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + /** * Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), * max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). Requirements: (1) Don't change the @@ -301,9 +303,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { /** * @param curStartTime closed - * @param curEndTime open - * @param startTime closed - * @param endTime open + * @param curEndTime open + * @param startTime closed + * @param endTime open */ @Override public List<AggregateResult> calcResult( @@ -354,11 +356,12 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { while (currentChunkList.size() > 0) { // loop 1 // sorted by bottomValue, find BP candidate set // TODO double check the sort order logic for different aggregations - currentChunkList.sort((o1, o2) -> { - return ((Comparable) (o1.getStatistics().getMinValue())).compareTo( - o2.getStatistics().getMinValue()); - // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata - }); + currentChunkList.sort( + (o1, o2) -> { + return ((Comparable) (o1.getStatistics().getMinValue())) + .compareTo(o2.getStatistics().getMinValue()); + // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata + }); // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata Object value = currentChunkList.get(0).getStatistics().getMinValue(); List<ChunkSuit4CPV> candidateSet = new ArrayList<>(); @@ -374,10 +377,15 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // TODO check, whether nonLazyLoad remove affects candidateSet List<ChunkSuit4CPV> nonLazyLoad = new ArrayList<>(candidateSet); // TODO double check the sort order logic for version - nonLazyLoad.sort((o1, o2) -> new MergeReaderPriority(o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) - .compareTo(new MergeReaderPriority(o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader()))); + nonLazyLoad.sort( + (o1, o2) -> + new MergeReaderPriority( + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) + .compareTo( + new MergeReaderPriority( + o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader()))); while (true) { // loop 2 // if there is no chunk for lazy loading, then load all chunks in candidateSet, // and apply deleteIntervals, deleting BP no matter out of deletion or update @@ -471,7 +479,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(4) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; // finished } else if (!isUpdate) { // verify whether the candidate point is updated @@ -500,7 +508,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(4) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; // finished } else { // the candidate point is updated, then label the chunk as already lazy loaded, @@ -567,8 +575,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) .compareTo( new MergeReaderPriority( o1.getChunkMetadata().getVersion(), @@ -601,7 +609,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()); } // chunk data read operation (c): get all data points -// chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV); + // chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV); chunkSuit4CPV.getPageReader().updateTP_withValueIndex(chunkSuit4CPV); // TODO // check if empty if (chunkSuit4CPV.statistics.getCount() == 0) { @@ -673,7 +681,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(5) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; // finished } else if (!isUpdate) { // verify whether the candidate point is updated @@ -702,7 +710,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(5) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; // finished } else { // the candidate point is updated, then label the chunk as already lazy loaded, @@ -749,8 +757,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return res; } else { return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) .compareTo( new MergeReaderPriority( o1.getChunkMetadata().getVersion(), @@ -827,11 +835,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(0) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); results .get(2) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; } } @@ -860,8 +868,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return res; } else { return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) .compareTo( new MergeReaderPriority( o1.getChunkMetadata().getVersion(), @@ -939,11 +947,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results .get(1) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); results .get(3) .updateResultUsingValues( - new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; } } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java index d7803eac00f..253dc40dd16 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest5.java @@ -99,7 +99,7 @@ public class MyTest5 { Statement statement = connection.createStatement()) { statement.execute("SET STORAGE GROUP TO root.m4"); statement.execute("CREATE TIMESERIES root.m4.d1.s1 with datatype=double,encoding=PLAIN"); - statement.execute("CREATE TIMESERIES root.m4.d1.s2 with datatype=INT32,encoding=PLAIN"); + statement.execute("CREATE TIMESERIES root.m4.d1.s2 with datatype=INT64,encoding=PLAIN"); } catch (SQLException throwable) { fail(throwable.getMessage()); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 5c0e87cf93d..f823fc2e94f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -18,12 +18,6 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Objects; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; import org.apache.iotdb.tsfile.encoding.decoder.DeltaBinaryDecoder.IntDeltaDecoder; import org.apache.iotdb.tsfile.encoding.decoder.DoublePrecisionDecoderV2; @@ -34,15 +28,23 @@ import org.apache.iotdb.tsfile.read.common.ValuePoint; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Objects; + /** * This class is used for recording statistic information of each measurement in a delta file. While * writing processing, the processor records the statistics information. Statistics includes - * maximum, minimum and null value count up to version 0.0.1.<br> Each data type extends this - * Statistic as super class.<br> + * maximum, minimum and null value count up to version 0.0.1.<br> + * Each data type extends this Statistic as super class.<br> * <br> * For the statistics in the Unseq file TimeSeriesMetadata, only firstValue, lastValue, startTime * and endTime can be used.</br> @@ -55,9 +57,7 @@ public abstract class Statistics<T> { */ protected boolean isEmpty = true; - /** - * number of time-value points - */ + /** number of time-value points */ private int count = 0; private long startTime = Long.MAX_VALUE; @@ -67,9 +67,7 @@ public abstract class Statistics<T> { public ValueIndex valueIndex = new ValueIndex(); - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ final String OPERATION_NOT_SUPPORT_FORMAT = "%s statistics does not support operation: %s"; /** @@ -147,14 +145,14 @@ public abstract class Statistics<T> { valueIndex.learn(); // ensure executed once and only once int byteLen = 0; byteLen += ReadWriteIOUtils.write(valueIndex.idxOut.size(), outputStream); - outputStream.write( - valueIndex.idxOut.getBuf(), 0, valueIndex.idxOut.size()); // NOTE len is important - byteLen += valueIndex.idxOut.getBuf().length; + // NOTE use size instead of buffer is important + outputStream.write(valueIndex.idxOut.getBuf(), 0, valueIndex.idxOut.size()); + byteLen += valueIndex.idxOut.size(); byteLen += ReadWriteIOUtils.write(valueIndex.valueOut.size(), outputStream); - outputStream.write( - valueIndex.valueOut.getBuf(), 0, valueIndex.valueOut.size()); // NOTE len is important - byteLen += valueIndex.valueOut.getBuf().length; + // NOTE use size instead of buffer is important + outputStream.write(valueIndex.valueOut.getBuf(), 0, valueIndex.valueOut.size()); + byteLen += valueIndex.valueOut.size(); byteLen += ReadWriteIOUtils.write(valueIndex.errorBound, outputStream); @@ -180,23 +178,17 @@ public abstract class Statistics<T> { abstract int serializeStats(OutputStream outputStream) throws IOException; - /** - * read data from the inputStream. - */ + /** read data from the inputStream. */ public abstract void deserialize(InputStream inputStream) throws IOException; public abstract void deserialize(ByteBuffer byteBuffer); // public abstract void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes); - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public abstract MinMaxInfo<T> getMinInfo(); - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public abstract MinMaxInfo<T> getMaxInfo(); public abstract T getMinValue(); @@ -207,14 +199,10 @@ public abstract class Statistics<T> { public abstract T getMaxValue(); - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public abstract long getBottomTimestamp(); - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public abstract long getTopTimestamp(); public abstract T getFirstValue(); @@ -290,9 +278,7 @@ public abstract class Statistics<T> { updateStats(value); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long time, int value) { count++; if (time < this.startTime) { @@ -307,9 +293,7 @@ public abstract class Statistics<T> { updateStats(value, time); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long time, long value) { count++; if (time < this.startTime) { @@ -323,9 +307,7 @@ public abstract class Statistics<T> { updateStats(value, time); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long time, float value) { count++; if (time < this.startTime) { @@ -339,9 +321,7 @@ public abstract class Statistics<T> { updateStats(value, time); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long time, double value) { count++; if (time < this.startTime) { @@ -379,9 +359,7 @@ public abstract class Statistics<T> { updateStats(values, batchSize); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long[] time, int[] values, int batchSize) { count += batchSize; if (time[0] < startTime) { @@ -395,9 +373,7 @@ public abstract class Statistics<T> { updateStats(values, time, batchSize); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long[] time, long[] values, int batchSize) { count += batchSize; if (time[0] < startTime) { @@ -411,9 +387,7 @@ public abstract class Statistics<T> { updateStats(values, time, batchSize); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long[] time, float[] values, int batchSize) { count += batchSize; if (time[0] < startTime) { @@ -427,9 +401,7 @@ public abstract class Statistics<T> { updateStats(values, time, batchSize); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void update(long[] time, double[] values, int batchSize) { count += batchSize; if (time[0] < startTime) { @@ -465,14 +437,10 @@ public abstract class Statistics<T> { isEmpty = empty; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public abstract void updateMinInfo(T val, long timestamp); - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public abstract void updateMaxInfo(T val, long timestamp); void updateStats(boolean value) { @@ -529,30 +497,22 @@ public abstract class Statistics<T> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void updateStats(int value, long timestamp) { throw new UnsupportedOperationException(); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void updateStats(long value, long timestamp) { throw new UnsupportedOperationException(); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void updateStats(float value, long timestamp) { throw new UnsupportedOperationException(); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void updateStats(double value, long timestamp) { throw new UnsupportedOperationException(); } @@ -565,30 +525,22 @@ public abstract class Statistics<T> { throw new UnsupportedOperationException(); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ void updateStats(int[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ void updateStats(long[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ void updateStats(float[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ void updateStats(double[] values, long[] timestamps, int batchSize) { throw new UnsupportedOperationException(); } @@ -601,7 +553,7 @@ public abstract class Statistics<T> { * @param min min timestamp * @param max max timestamp * @author Yuyuan Kang This method with two parameters is only used by {@code unsequence} which - * updates/inserts/deletes timestamp. + * updates/inserts/deletes timestamp. */ public void updateStats(long min, long bottomTimestamp, long max, long topTimestamp) { throw new UnsupportedOperationException(); @@ -635,23 +587,23 @@ public abstract class Statistics<T> { // add the first point valueIndex.modelPointIdx_list.add(1); switch (getType()) { -// case INT32: -// int intV = (int) getFirstValue(); -// valueIndex.modelPointVal_list.add((double) intV); -// valueIndex.sortedModelPoints.add(new ValuePoint(1, (double) intV)); -// break; + // case INT32: + // int intV = (int) getFirstValue(); + // valueIndex.modelPointVal_list.add((double) intV); + // valueIndex.sortedModelPoints.add(new ValuePoint(1, (double) intV)); + // break; case INT64: - long longV = (long) getFirstValue(); + long longV = (Long) getFirstValue(); valueIndex.modelPointVal_list.add((double) longV); valueIndex.sortedModelPoints.add(new ValuePoint(1, (double) longV)); break; -// case FLOAT: -// float floatV = (float) getFirstValue(); -// valueIndex.modelPointVal_list.add((double) floatV); -// valueIndex.sortedModelPoints.add(new ValuePoint(1, (double) floatV)); -// break; + // case FLOAT: + // float floatV = (float) getFirstValue(); + // valueIndex.modelPointVal_list.add((double) floatV); + // valueIndex.sortedModelPoints.add(new ValuePoint(1, (double) floatV)); + // break; case DOUBLE: - double doubleV = (double) getFirstValue(); + double doubleV = (Double) getFirstValue(); valueIndex.modelPointVal_list.add(doubleV); valueIndex.sortedModelPoints.add(new ValuePoint(1, doubleV)); break; @@ -693,23 +645,23 @@ public abstract class Statistics<T> { if (count >= 2) { // otherwise only one point no need to store again valueIndex.modelPointIdx_list.add(count); switch (getType()) { -// case INT32: -// int intV = (int) getLastValue(); -// valueIndex.modelPointVal_list.add((double) intV); -// valueIndex.sortedModelPoints.add(new ValuePoint(count, (double) intV)); -// break; + // case INT32: + // int intV = (int) getLastValue(); + // valueIndex.modelPointVal_list.add((double) intV); + // valueIndex.sortedModelPoints.add(new ValuePoint(count, (double) intV)); + // break; case INT64: - long longV = (long) getLastValue(); + long longV = (Long) getLastValue(); valueIndex.modelPointVal_list.add((double) longV); valueIndex.sortedModelPoints.add(new ValuePoint(count, (double) longV)); break; -// case FLOAT: -// float floatV = (float) getLastValue(); -// valueIndex.modelPointVal_list.add((double) floatV); -// valueIndex.sortedModelPoints.add(new ValuePoint(count, (double) floatV)); -// break; + // case FLOAT: + // float floatV = (float) getLastValue(); + // valueIndex.modelPointVal_list.add((double) floatV); + // valueIndex.sortedModelPoints.add(new ValuePoint(count, (double) floatV)); + // break; case DOUBLE: - double doubleV = (double) getLastValue(); + double doubleV = (Double) getLastValue(); valueIndex.modelPointVal_list.add(doubleV); valueIndex.sortedModelPoints.add(new ValuePoint(count, doubleV)); break; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/ValueIndex.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/ValueIndex.java index eb506480840..6cefa6088ec 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/ValueIndex.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/ValueIndex.java @@ -1,15 +1,17 @@ package org.apache.iotdb.tsfile.file.metadata.statistics; -import java.util.ArrayList; -import java.util.List; import org.apache.iotdb.tsfile.encoding.encoder.DeltaBinaryEncoder.IntDeltaEncoder; import org.apache.iotdb.tsfile.encoding.encoder.DoublePrecisionEncoderV2; import org.apache.iotdb.tsfile.encoding.encoder.SDTEncoder; import org.apache.iotdb.tsfile.read.common.ValuePoint; import org.apache.iotdb.tsfile.utils.PublicBAOS; + import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList; +import java.util.ArrayList; +import java.util.List; + public class ValueIndex { private DoubleArrayList values = new DoubleArrayList(); @@ -22,7 +24,8 @@ public class ValueIndex { public IntArrayList modelPointIdx_list = new IntArrayList(); public DoubleArrayList modelPointVal_list = new DoubleArrayList(); - public List<ValuePoint> sortedModelPoints = new ArrayList<>(); // sorted by value in ascending order + public List<ValuePoint> sortedModelPoints = + new ArrayList<>(); // sorted by value in ascending order // this is necessary, otherwise serialized twice by timeseriesMetadata and chunkMetadata // causing learn() executed more than once!! diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java index 3b5fd251521..4c1e205781e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java @@ -19,7 +19,6 @@ package org.apache.iotdb.tsfile.read.common; -import java.io.IOException; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -33,10 +32,14 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.StepRegress; import org.apache.iotdb.tsfile.read.common.IOMonitor2.Operation; import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import java.io.IOException; + public class ChunkSuit4CPV { private ChunkMetadata chunkMetadata; // fixed info, including version, dataType, stepRegress - public int modelPointsCursor = 1; // starting from 0, pointing to the right end of the model segment, moving forward as processing time spans + public int modelPointsCursor = + 1; // starting from 0, pointing to the right end of the model segment, moving forward as + // processing time spans public Statistics statistics; // dynamically updated, includes FP/LP/BP/TP info @@ -282,10 +285,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setStartTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); @@ -328,10 +331,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setStartTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); @@ -399,10 +402,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setEndTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); @@ -451,10 +454,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setEndTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ValuePoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ValuePoint.java index 2e672d7a134..62831c43196 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ValuePoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ValuePoint.java @@ -38,4 +38,4 @@ public class ValuePoint implements Comparable<ValuePoint> { public String toString() { return "(" + index + "," + value + ")"; } -} \ No newline at end of file +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index 979678ea5b7..db378202f56 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -18,11 +18,6 @@ */ package org.apache.iotdb.tsfile.read.reader.page; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.header.PageHeader; @@ -46,39 +41,35 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + public class PageReader implements IPageReader { private PageHeader pageHeader; protected TSDataType dataType; - /** - * decoder for value column - */ + /** decoder for value column */ protected Decoder valueDecoder; - /** - * decoder for time column - */ + /** decoder for time column */ protected Decoder timeDecoder; - /** - * time column in memory - */ + /** time column in memory */ public ByteBuffer timeBuffer; - /** - * value column in memory - */ + /** value column in memory */ public ByteBuffer valueBuffer; public int timeBufferLength; protected Filter filter; - /** - * A list of deleted intervals. - */ + /** A list of deleted intervals. */ private List<TimeRange> deleteIntervalList; private int deleteCursor = 0; @@ -122,9 +113,7 @@ public class PageReader implements IPageReader { valueBuffer.position(timeBufferLength); } - /** - * the chunk partially overlaps in time with the current M4 interval Ii - */ + /** the chunk partially overlaps in time with the current M4 interval Ii */ public void split4CPV( long startTime, long endTime, @@ -137,10 +126,10 @@ public class PageReader implements IPageReader { // endTime is excluded so -1 int numberOfSpans = (int) - Math.floor( - (Math.min(chunkMetadata.getEndTime(), endTime - 1) - curStartTime) - * 1.0 - / interval) + Math.floor( + (Math.min(chunkMetadata.getEndTime(), endTime - 1) - curStartTime) + * 1.0 + / interval) + 1; for (int n = 0; n < numberOfSpans; n++) { long leftEndIncluded = curStartTime + n * interval; @@ -178,20 +167,23 @@ public class PageReader implements IPageReader { } public void updateTP_withValueIndex(ChunkSuit4CPV chunkSuit4CPV) { - // NOTE: get valueIndex from chunkSuit4CPV.getChunkMetadata().getStatistics(), not chunkSuit4CPV.getStatistics()! + // NOTE: get valueIndex from chunkSuit4CPV.getChunkMetadata().getStatistics(), not + // chunkSuit4CPV.getStatistics()! ValueIndex valueIndex = chunkSuit4CPV.getChunkMetadata().getStatistics().valueIndex; // step 1: find threshold - // iterate SDT points from value big to small to find the first point not deleted boolean isFound = false; double foundValue = 0; - for (ValuePoint valuePoint : valueIndex.sortedModelPoints) { + // iterate SDT points from value big to small to find the first point not deleted + for (int n = valueIndex.sortedModelPoints.size() - 1; n >= 0; n--) { // NOTE from big to small + ValuePoint valuePoint = valueIndex.sortedModelPoints.get(n); int idx = valuePoint.index; // index starting from 1 int pos = idx - 1; // pos starting from 0 long time = timeBuffer.getLong(pos * 8); // check if deleted - deleteCursor = 0; // TODO check - if ((pos >= chunkSuit4CPV.startPos) && (pos <= chunkSuit4CPV.endPos) && !isDeleted(time)) { + if ((pos >= chunkSuit4CPV.startPos) + && (pos <= chunkSuit4CPV.endPos) + && !isDeleted_NoHistoryDeleteCursor(time)) { // startPos&endPos conveys the virtual deletes of the current M4 time span isFound = true; foundValue = valuePoint.value; @@ -208,23 +200,31 @@ public class PageReader implements IPageReader { // increment global chunkSuit4CPV.modelPointsCursor int idx2; // note that the first and last points of a chunk are stored in model points - // there must exist idx2-1 >= startPos, otherwise this chunk won't be processed for the current time span - // there must exist idx1-1 <= endPos, otherwise this chunk won't be processed for the current time span + // there must exist idx2-1 >= startPos, otherwise this chunk won't be processed for the current + // time span while ((idx2 = valueIndex.modelPointIdx_list.get(chunkSuit4CPV.modelPointsCursor)) - 1 < chunkSuit4CPV.startPos) { // TODO check // -1 because idx starting from 1 while pos starting from 0 chunkSuit4CPV.modelPointsCursor++; + // pointing to the right end of the first model segment that passes the left endpoint of the + // current time span } - // increment local cursor starting from chunkSuit4CPV.modelPointsCursor for iterating model segments for the current time span - // do not increment modelPointsCursor because the model segments for this time span may be iterated multiple times - int localCursor = chunkSuit4CPV.modelPointsCursor; + // increment local cursor starting from chunkSuit4CPV.modelPointsCursor for iterating model + // segments for the current time span + // do not increment modelPointsCursor because the model segments for this time span may be + // iterated multiple times + int localCursor = + chunkSuit4CPV.modelPointsCursor; // pointing to the right end of the model segment List<Integer> prune_intervals_start = new ArrayList<>(); List<Integer> prune_intervals_end = new ArrayList<>(); int interval_start = -1; int interval_end = -1; int idx1; - while ((idx1 = valueIndex.modelPointIdx_list.get(localCursor - 1)) - 1 - <= chunkSuit4CPV.endPos) { + // there must exist idx1-1 <= endPos, otherwise this chunk won't be processed for the current + // time span + while (localCursor < valueIndex.modelPointIdx_list.size() + && (idx1 = valueIndex.modelPointIdx_list.get(localCursor - 1)) - 1 + <= chunkSuit4CPV.endPos) { idx2 = valueIndex.modelPointIdx_list.get(localCursor); double v1_UB = valueIndex.modelPointVal_list.get(localCursor - 1) + valueIndex.errorBound; double v2_UB = valueIndex.modelPointVal_list.get(localCursor) + valueIndex.errorBound; @@ -242,8 +242,8 @@ public class PageReader implements IPageReader { (int) Math.floor((threshold_LB - v1_UB) * (idx2 - idx1) / (v2_UB - v1_UB) + idx1)); interval_start = -1; // discontinuous } else if (v1_UB >= threshold_LB && v2_UB < threshold_LB) { - interval_start = (int) Math.ceil( - (threshold_LB - v1_UB) * (idx2 - idx1) / (v2_UB - v1_UB) + idx1); + interval_start = + (int) Math.ceil((threshold_LB - v1_UB) * (idx2 - idx1) / (v2_UB - v1_UB) + idx1); interval_end = idx2; // continuous } localCursor++; @@ -273,31 +273,34 @@ public class PageReader implements IPageReader { int prune_idx2 = prune_intervals_end.get(prune_intervals_end.size() - 1); if (prune_idx2 - 1 >= search_endPos) { // -1 for included, -1 for starting from 0 - search_endPos = Math.min(search_endPos, - prune_intervals_start.get(prune_intervals_start.size() - 1) - 1 - 1); + search_endPos = + Math.min( + search_endPos, prune_intervals_start.get(prune_intervals_start.size() - 1) - 1 - 1); prune_intervals_start.remove(prune_intervals_start.size() - 1); prune_intervals_end.remove(prune_intervals_end.size() - 1); } } // add search_endPos+1 to the end of prune_intervals_start // turning into search_intervals_end (excluded endpoints) - prune_intervals_start.add(search_endPos + 1); + // note that idx&prune_interval&search_interval starting from 1, pos starting from 0 + prune_intervals_start.add(search_endPos + 1 + 1); // +1 for starting from 1, +1 for excluded // add search_startPos-1 to the start of prune_intervals_end // turning into search_intervals_start (excluded endpoints) - prune_intervals_end.add(0, search_startPos - 1); + prune_intervals_end.add(0, search_startPos + 1 - 1); // +1 for starting from 1, +1 for excluded // step 4: search unpruned intervals // TODO deal with normal delete intervals if (dataType == TSDataType.DOUBLE) { double candidateTPvalue = -1; long candidateTPtime = -1; + // note that idx&prune_interval&search_interval starting from 1, pos starting from 0 for (int i = 0; i < prune_intervals_start.size(); i++) { int search_interval_start = prune_intervals_end.get(i) + 1; // included int search_interval_end = prune_intervals_start.get(i) - 1; // included - for (int j = search_interval_start; j <= search_interval_end; j++) { // starting from 1 - double v = valueBuffer.getDouble(timeBufferLength + (j - 1) * 8); - long t = timeBuffer.getLong((j - 1) * 8); - if (v > candidateTPvalue && !isDeleted(t)) { + for (int j = search_interval_start; j <= search_interval_end; j++) { // idx starting from 1 + double v = valueBuffer.getDouble(timeBufferLength + (j - 1) * 8); // pos starting from 0 + long t = timeBuffer.getLong((j - 1) * 8); // pos starting from 0 + if (v > candidateTPvalue && !isDeleted_NoHistoryDeleteCursor(t)) { candidateTPvalue = v; candidateTPtime = t; } @@ -305,7 +308,7 @@ public class PageReader implements IPageReader { } chunkSuit4CPV.statistics.setMaxInfo(new MinMaxInfo(candidateTPvalue, candidateTPtime)); } else if (dataType == TSDataType.INT64) { - long candidateTPvalue = -1; + long candidateTPvalue = -1; // NOTE for TP long candidateTPtime = -1; for (int i = 0; i < prune_intervals_start.size(); i++) { int search_interval_start = prune_intervals_end.get(i) + 1; // included @@ -313,7 +316,7 @@ public class PageReader implements IPageReader { for (int j = search_interval_start; j <= search_interval_end; j++) { // starting from 1 long v = valueBuffer.getLong(timeBufferLength + (j - 1) * 8); long t = timeBuffer.getLong((j - 1) * 8); - if (v > candidateTPvalue && !isDeleted(t)) { + if (v > candidateTPvalue && !isDeleted_NoHistoryDeleteCursor(t)) { candidateTPvalue = v; candidateTPtime = t; } @@ -335,9 +338,9 @@ public class PageReader implements IPageReader { case INT64: statistics = new LongStatistics(); break; -// case FLOAT: -// statistics = new FloatStatistics(); -// break; + // case FLOAT: + // statistics = new FloatStatistics(); + // break; case DOUBLE: statistics = new DoubleStatistics(); break; @@ -361,16 +364,18 @@ public class PageReader implements IPageReader { // only updateStats, actually only need to update BP and TP } break; -// case FLOAT: -// float aFloat = valueBuffer.getFloat(timeBufferLength + pos * 8); -// if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) { -// // update statistics of chunkMetadata1 -// statistics.updateStats(aFloat, timestamp); -// count++; -// // ATTENTION: do not use update() interface which will also update StepRegress! -// // only updateStats, actually only need to update BP and TP -// } -// break; + // case FLOAT: + // float aFloat = valueBuffer.getFloat(timeBufferLength + pos * 8); + // if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, + // aFloat))) { + // // update statistics of chunkMetadata1 + // statistics.updateStats(aFloat, timestamp); + // count++; + // // ATTENTION: do not use update() interface which will also update + // StepRegress! + // // only updateStats, actually only need to update BP and TP + // } + // break; case DOUBLE: double aDouble = valueBuffer.getDouble(timeBufferLength + pos * 8); if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) { @@ -394,9 +399,7 @@ public class PageReader implements IPageReader { IOMonitor2.addMeasure(Operation.SEARCH_ARRAY_c_genBPTP, System.nanoTime() - start); } - /** - * @return the returned BatchData may be empty, but never be null - */ + /** @return the returned BatchData may be empty, but never be null */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { @@ -493,6 +496,20 @@ public class PageReader implements IPageReader { return false; } + protected boolean isDeleted_NoHistoryDeleteCursor(long timestamp) { + deleteCursor = 0; + while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) { + if (deleteIntervalList.get(deleteCursor).contains(timestamp)) { + return true; + } else if (deleteIntervalList.get(deleteCursor).getMax() < timestamp) { + deleteCursor++; + } else { + return false; + } + } + return false; + } + public static boolean isDeleted(long timestamp, List<TimeRange> deleteIntervalList) { int deleteCursor = 0; while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) { diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java index f859ddab83d..43cdc6d6ae5 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/SDTEncoderTest.java @@ -19,7 +19,10 @@ package org.apache.iotdb.tsfile.encoding; -import static org.junit.Assert.assertEquals; +import org.apache.iotdb.tsfile.encoding.encoder.SDTEncoder; +import org.apache.iotdb.tsfile.read.common.ValuePoint; + +import org.junit.Test; import java.io.BufferedReader; import java.io.FileReader; @@ -27,9 +30,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.iotdb.tsfile.encoding.encoder.SDTEncoder; -import org.apache.iotdb.tsfile.read.common.ValuePoint; -import org.junit.Test; + +import static org.junit.Assert.assertEquals; public class SDTEncoderTest { @@ -233,8 +235,8 @@ public class SDTEncoderTest { String csvData = "D:\\full-game\\BallSpeed.csv"; // double[] eList = new double[] {500000, 400000, 300000, 200000, 160000, 100000, 50000, // 10000}; - double[] eList = new double[]{160000}; - int[] startList = new int[]{1, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000}; + double[] eList = new double[] {160000}; + int[] startList = new int[] {1, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000}; List<Double> elapsedTime_withValueIndex_list = new ArrayList<>(); List<Double> elapsedTime_withoutValueIndex_list = new ArrayList<>(); List<Double> traversedComplexity_list = new ArrayList<>(); @@ -302,12 +304,12 @@ public class SDTEncoderTest { // ValuePoint[] myArray = (ValuePoint[]) points.toArray(); // Arrays.sort(points.toArray()); -// points.sort( -// new Comparator<ValuePoint>() { -// public int compare(ValuePoint o1, ValuePoint o2) { -// return ((Comparable) (o1.value)).compareTo(o2.value); -// } -// }); + // points.sort( + // new Comparator<ValuePoint>() { + // public int compare(ValuePoint o1, ValuePoint o2) { + // return ((Comparable) (o1.value)).compareTo(o2.value); + // } + // }); Collections.sort(points); long startTime = System.nanoTime(); @@ -407,8 +409,7 @@ public class SDTEncoderTest { elapsedTime_withoutValueIndex += elapsedTime; // System.out.println("search without value index: " + elapsedTime / 1000000.0 + " // ms"); - System.out.println("TP=(" + candidateTPidx_raw + "," + candidateTPvalue_raw + - ")"); + System.out.println("TP=(" + candidateTPidx_raw + "," + candidateTPvalue_raw + ")"); } elapsedTime_withValueIndex_list.add( elapsedTime_withValueIndex / startList.length / 1000000.0);
