This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 00003e52f15a9d8aa83a165f35348ed95439b06f Author: Lei Rui <[email protected]> AuthorDate: Sat Jul 13 16:23:23 2024 +0800 s --- .../resources/conf/iotdb-engine.properties | 5 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 + .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 3 + .../groupby/GroupByWithoutValueFilterDataSet.java | 6 +- .../groupby/LocalGroupByExecutorTri_ILTS.java | 4 - .../groupby/LocalGroupByExecutorTri_LTTB.java | 4 - .../groupby/LocalGroupByExecutorTri_SimPiece.java | 199 +++++++++++++++ .../db/query/simpiece/Encoding/FloatEncoder.java | 38 +++ .../db/query/simpiece/Encoding/IntEncoder.java | 46 ++++ .../db/query/simpiece/Encoding/UIntEncoder.java | 48 ++++ .../simpiece/Encoding/VariableByteEncoder.java | 91 +++++++ .../apache/iotdb/db/query/simpiece/MySample.java | 117 +++++++++ .../org/apache/iotdb/db/query/simpiece/Point.java | 41 +++ .../apache/iotdb/db/query/simpiece/SimPiece.java | 278 +++++++++++++++++++++ .../iotdb/db/query/simpiece/SimPieceSegment.java | 63 +++++ .../apache/iotdb/db/query/simpiece/TimeSeries.java | 40 +++ .../iotdb/db/query/simpiece/TimeSeriesReader.java | 166 ++++++++++++ 17 files changed, 1149 insertions(+), 10 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 1e935c8bdba..60c5df8bf15 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -19,9 +19,12 @@ #################### ### enable Tri #################### -# MinMax, MinMaxLTTB, M4, LTTB, ILTS +# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece enable_Tri=MinMax +# SimPiece segment error threshold +epsilon=100 + #for p1t=0 p1v=0 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 765f9e2ed05..e78243dd9f1 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -435,6 +435,8 @@ public class IoTDBConfig { private String enableTri = "ILTS"; // MinMax, MinMaxLTTB, M4, LTTB, ILTS + private double epsilon = 100; // for SimPiece + private long p1t; private double p1v; private long pnt; @@ -1486,6 +1488,14 @@ public class IoTDBConfig { return enableTri; } + public double getEpsilon() { + return epsilon; + } + + public void setEpsilon(double epsilon) { + this.epsilon = epsilon; + } + public void setEnablePerformanceTracing(boolean enablePerformanceTracing) { this.enablePerformanceTracing = enablePerformanceTracing; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 8f8fd7b31df..70c573e1e1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -540,6 +540,9 @@ public class IoTDBDescriptor { properties.getProperty("enable_CPV", Boolean.toString(conf.isEnableCPV())).trim())); conf.setEnableTri(properties.getProperty("enable_Tri", conf.getEnableTri()).trim()); + conf.setEpsilon( + Double.parseDouble( + properties.getProperty("epsilon", Double.toString(conf.getEpsilon())))); conf.setP1t(Long.parseLong(properties.getProperty("p1t", Long.toString(conf.getP1t())))); conf.setP1v( Double.parseDouble(properties.getProperty("p1v", Double.toString(conf.getP1v())))); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index d1d42ac41aa..7bc5433d854 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -137,7 +137,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { if (CONFIG.getEnableTri().equals("MinMax") || CONFIG.getEnableTri().equals("M4") || CONFIG.getEnableTri().equals("LTTB") - || CONFIG.getEnableTri().equals("ILTS")) { + || CONFIG.getEnableTri().equals("ILTS") + || CONFIG.getEnableTri().equals("SimPiece")) { return nextWithoutConstraintTri_allInOne(); } else if (CONFIG.getEnableTri().equals("MinMaxLTTB")) { return nextWithoutConstraintTri_MinMaxLTTB(); @@ -431,6 +432,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } return new LocalGroupByExecutorTri_ILTS( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("SimPiece")) { + return new LocalGroupByExecutorTri_SimPiece( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else { logger.info("No matched enable_tri!"); return new LocalGroupByExecutor( diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java index d4c6122fbbe..b22a7d8d9a9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java @@ -44,9 +44,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -59,7 +56,6 @@ import java.util.Set; public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); // Aggregate result buffer of this path private final List<AggregateResult> results = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java index a8e4fe77f2e..cba1543aff8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java @@ -43,9 +43,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -57,7 +54,6 @@ import java.util.Set; public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); // Aggregate result buffer of this path private final List<AggregateResult> results = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_SimPiece.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_SimPiece.java new file mode 100644 index 00000000000..0a9b4fbefa5 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_SimPiece.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset.groupby; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.SeriesReader; +import org.apache.iotdb.db.query.simpiece.SimPiece; +import org.apache.iotdb.db.query.simpiece.SimPieceSegment; +import org.apache.iotdb.db.query.simpiece.TimeSeries; +import org.apache.iotdb.db.query.simpiece.TimeSeriesReader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Set; + +public class LocalGroupByExecutorTri_SimPiece implements GroupByExecutor { + + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + // Aggregate result buffer of this path + private final List<AggregateResult> results = new ArrayList<>(); + + TimeSeries timeSeries; + + double epsilon = CONFIG.getEpsilon(); + + public LocalGroupByExecutorTri_SimPiece( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + Filter timeFilter, + TsFileFilter fileFilter, + boolean ascending) + throws StorageEngineException, QueryProcessException { + // long start = System.nanoTime(); + + // get all data sources + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); + + // update filter by TTL + // this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + SeriesReader seriesReader = + new SeriesReader( + path, + allSensors, + // fix bug: here use the aggregation type as the series data type, + // not using pageReader.getAllSatisfiedPageData is ok + dataType, + context, + queryDataSource, + timeFilter, + null, + fileFilter, + ascending); + + try { + // : this might be bad to load all chunk metadata at first + List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); + futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); + // order futureChunkList by chunk startTime + futureChunkList.sort( + new Comparator<ChunkSuit4Tri>() { + public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) { + return ((Comparable) (o1.chunkMetadata.getStartTime())) + .compareTo(o2.chunkMetadata.getStartTime()); + } + }); + + GroupByFilter groupByFilter = (GroupByFilter) timeFilter; + long startTime = groupByFilter.getStartTime(); + long endTime = groupByFilter.getEndTime(); + + timeSeries = TimeSeriesReader.getTimeSeriesFromTsFiles(futureChunkList, startTime, endTime); + + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + } + + @Override + public void addAggregateResult(AggregateResult aggrResult) { + results.add(aggrResult); + } + + @Override + public List<AggregateResult> calcResult( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { + // group by curStartTime and curEndTime are not used in Sim-Piece segmentation + + StringBuilder series = new StringBuilder(); + + // clear result cache + for (AggregateResult result : results) { + result.reset(); + } + + // + // series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); + + SimPiece simPiece = new SimPiece(timeSeries.data, epsilon); + List<SimPieceSegment> segments = simPiece.segments; + segments.sort(Comparator.comparingLong(SimPieceSegment::getInitTimestamp)); + for (int i = 0; i < segments.size() - 1; i++) { + // end point of this segment + double v = + (segments.get(i + 1).getInitTimestamp() - segments.get(i).getInitTimestamp()) + * segments.get(i).getA() + + segments.get(i).getB(); + + series + .append(segments.get(i).getB()) // start point of this segment + .append("[") + .append(segments.get(i).getInitTimestamp()) + .append("]") + .append(",") + .append(v) // end point of this segment + .append("[") + .append(segments.get(i + 1).getInitTimestamp()) + .append("]") + .append(","); + } + // the two end points of the last segment + double v = + (simPiece.lastTimeStamp - segments.get(segments.size() - 1).getInitTimestamp()) + * segments.get(segments.size() - 1).getA() + + segments.get(segments.size() - 1).getB(); + + series + .append(segments.get(segments.size() - 1).getB()) + .append("[") + .append(segments.get(segments.size() - 1).getInitTimestamp()) + .append("]") + .append(",") + .append(v) + .append("[") + .append(simPiece.lastTimeStamp) + .append("]") + .append(","); + + // + // series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); + + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0)); + + return results; + } + + @Override + public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) + throws IOException { + throw new IOException("no implemented"); + } + + @Override + public List<AggregateResult> calcResult(long curStartTime, long curEndTime) + throws IOException, QueryProcessException { + throw new IOException("no implemented"); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/FloatEncoder.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/FloatEncoder.java new file mode 100644 index 00000000000..f8811760cf9 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/FloatEncoder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece.Encoding; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class FloatEncoder { + public static void write(float number, ByteArrayOutputStream outputStream) throws IOException { + int intBits = Float.floatToIntBits(number); + IntEncoder.write(intBits, outputStream); + } + + public static float read(ByteArrayInputStream inputStream) throws IOException { + int number = IntEncoder.read(inputStream); + return Float.intBitsToFloat(number); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/IntEncoder.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/IntEncoder.java new file mode 100644 index 00000000000..40d913abd2b --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/IntEncoder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece.Encoding; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class IntEncoder { + public static void write(int number, ByteArrayOutputStream outputStream) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt(number); + outputStream.write(buffer.array()); + } + + public static int read(ByteArrayInputStream inputStream) throws IOException { + byte[] byteArray = new byte[Integer.BYTES]; + int k = inputStream.read(byteArray); + if (k != Integer.BYTES) throw new IOException(); + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.put(byteArray); + buffer.flip(); + + return buffer.getInt(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/UIntEncoder.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/UIntEncoder.java new file mode 100644 index 00000000000..f7f25168a08 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/UIntEncoder.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece.Encoding; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class UIntEncoder { + public static void write(long number, ByteArrayOutputStream outputStream) throws IOException { + if (number > Math.pow(2, 8 * 4) - 1 || number < 0) + throw new UnsupportedOperationException("Can't save number " + number + " as unsigned int"); + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt((int) (number & 0xffffffffL)); + outputStream.write(buffer.array()); + } + + public static long read(ByteArrayInputStream inputStream) throws IOException { + byte[] byteArray = new byte[Integer.BYTES]; + int k = inputStream.read(byteArray); + if (k != Integer.BYTES) throw new IOException(); + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.put(byteArray); + buffer.flip(); + + return buffer.getInt() & 0xffffffffL; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/VariableByteEncoder.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/VariableByteEncoder.java new file mode 100644 index 00000000000..93115f461a0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Encoding/VariableByteEncoder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece.Encoding; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +/* + * Source code by: + * https://github.com/lemire/JavaFastPFOR/blob/master/src/main/java/me/lemire/integercompression/VariableByte.java + */ +public class VariableByteEncoder { + + private static byte extract7bits(int i, long val) { + return (byte) ((val >> (7 * i)) & ((1 << 7) - 1)); + } + + private static byte extract7bitsmaskless(int i, long val) { + return (byte) ((val >> (7 * i))); + } + + public static void write(int number, ByteArrayOutputStream outputStream) { + final long val = number & 0xFFFFFFFFL; + + if (val < (1 << 7)) { + outputStream.write((byte) (val | (1 << 7))); + } else if (val < (1 << 14)) { + outputStream.write(extract7bits(0, val)); + outputStream.write((byte) (extract7bitsmaskless(1, (val)) | (1 << 7))); + } else if (val < (1 << 21)) { + outputStream.write(extract7bits(0, val)); + outputStream.write(extract7bits(1, val)); + outputStream.write((byte) (extract7bitsmaskless(2, (val)) | (1 << 7))); + } else if (val < (1 << 28)) { + outputStream.write(extract7bits(0, val)); + outputStream.write(extract7bits(1, val)); + outputStream.write(extract7bits(2, val)); + outputStream.write((byte) (extract7bitsmaskless(3, (val)) | (1 << 7))); + } else { + outputStream.write(extract7bits(0, val)); + outputStream.write(extract7bits(1, val)); + outputStream.write(extract7bits(2, val)); + outputStream.write(extract7bits(3, val)); + outputStream.write((byte) (extract7bitsmaskless(4, (val)) | (1 << 7))); + } + } + + public static int read(ByteArrayInputStream inputStream) { + byte in; + int number; + + in = (byte) inputStream.read(); + number = in & 0x7F; + if (in < 0) return number; + + in = (byte) inputStream.read(); + number = ((in & 0x7F) << 7) | number; + if (in < 0) return number; + + in = (byte) inputStream.read(); + number = ((in & 0x7F) << 14) | number; + if (in < 0) return number; + + in = (byte) inputStream.read(); + number = ((in & 0x7F) << 21) | number; + if (in < 0) return number; + + number = (((byte) inputStream.read() & 0x7F) << 28) | number; + + return number; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample.java new file mode 100644 index 00000000000..a54d60ac1d1 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece; + +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Comparator; +import java.util.List; + +public class MySample { + + public static void main(String[] args) { + String fileDir = "D:\\desktop\\NISTPV\\"; + boolean series = true; // 从1开始编号列而不是时间戳列 + String[] datasetNameList = + new String[] { + "NISTPV-Ground-2015-Qloss_Ah", + "NISTPV-Ground-2015-Pyra1_Wm2", + "NISTPV-Ground-2015-RTD_C_3" + }; + int[] noutList = new int[] {100}; + double[] r = new double[] {0.1, 0.5, 1.3}; + for (int y = 0; y < datasetNameList.length; y++) { + String datasetName = datasetNameList[y]; + int start = (int) (10000000 / 2 - 2500000 * r[y]); // 从0开始计数 + int end = (int) (10000000 / 2 + 2500000 * (1 - r[y])); + int N = end - start; // -1 for all + + for (int nout : noutList) { + // apply Sim-Piece on the input file, outputting nout points saved in csvFile + boolean hasHeader = false; + try (FileInputStream inputStream = new FileInputStream(fileDir + datasetName + ".csv")) { + String delimiter = ","; + TimeSeries ts = + TimeSeriesReader.getMyTimeSeries( + inputStream, delimiter, false, N, start, hasHeader, series); + double epsilon = getSimPieceParam(nout, ts, 1e-6); + System.out.println(datasetName + ": n=" + N + ",m=" + nout + ",epsilon=" + epsilon); + SimPiece simPiece = new SimPiece(ts.data, epsilon); + List<SimPieceSegment> segments = simPiece.segments; + segments.sort(Comparator.comparingLong(SimPieceSegment::getInitTimestamp)); + try (PrintWriter writer = + new PrintWriter(new FileWriter(datasetName + "-" + N + "-" + nout + ".csv"))) { + for (int i = 0; i < segments.size() - 1; i++) { + // start point of this segment + writer.println(segments.get(i).getInitTimestamp() + "," + segments.get(i).getB()); + // end point of this segment + double v = + (segments.get(i + 1).getInitTimestamp() - segments.get(i).getInitTimestamp()) + * segments.get(i).getA() + + segments.get(i).getB(); + writer.println(segments.get(i + 1).getInitTimestamp() + "," + v); + } + // the two end points of the last segment + writer.println( + segments.get(segments.size() - 1).getInitTimestamp() + + "," + + segments.get(segments.size() - 1).getB()); + double v = + (simPiece.lastTimeStamp - segments.get(segments.size() - 1).getInitTimestamp()) + * segments.get(segments.size() - 1).getA() + + segments.get(segments.size() - 1).getB(); + writer.println(simPiece.lastTimeStamp + "," + v); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + public static double getSimPieceParam(int nout, TimeSeries ts, double accuracy) + throws IOException { + double epsilon = ts.range * 0.001; + while (true) { + SimPiece simPiece = new SimPiece(ts.data, epsilon); + if (simPiece.segments.size() * 2 > nout) { // note *2 for disjoint + epsilon *= 2; + } else { + break; + } + } + double left = epsilon / 2; + double right = epsilon; + while (Math.abs(right - left) > accuracy) { + double mid = (left + right) / 2; + SimPiece simPiece = new SimPiece(ts.data, mid); + if (simPiece.segments.size() * 2 > nout) { // note *2 for disjoint + left = mid; + } else { + right = mid; + } + } + return (left + right) / 2; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/Point.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Point.java new file mode 100644 index 00000000000..e8d96065a0c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/Point.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece; + +public class Point { + + private final long timestamp; + private final double value; + + public Point(long timestamp, double value) { + this.timestamp = timestamp; + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public double getValue() { + return value; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/SimPiece.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/SimPiece.java new file mode 100644 index 00000000000..a7c491f81c9 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/SimPiece.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece; + +import org.apache.iotdb.db.query.simpiece.Encoding.FloatEncoder; +import org.apache.iotdb.db.query.simpiece.Encoding.UIntEncoder; +import org.apache.iotdb.db.query.simpiece.Encoding.VariableByteEncoder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.*; + +public class SimPiece { + public ArrayList<SimPieceSegment> segments; + + private double epsilon; + public long lastTimeStamp; + + public SimPiece(List<Point> points, double epsilon) throws IOException { + if (points.isEmpty()) throw new IOException(); + + this.epsilon = epsilon; + this.lastTimeStamp = points.get(points.size() - 1).getTimestamp(); + this.segments = mergePerB(compress(points)); + } + + // public SimPiece(byte[] bytes, boolean variableByte, boolean zstd) throws IOException { + // readByteArray(bytes, variableByte, zstd); + // } + + private double quantization(double value) { + return Math.round(value / epsilon) * epsilon; + } + + private int createSegment(int startIdx, List<Point> points, ArrayList<SimPieceSegment> segments) { + long initTimestamp = points.get(startIdx).getTimestamp(); + double b = quantization(points.get(startIdx).getValue()); + if (startIdx + 1 == points.size()) { + segments.add(new SimPieceSegment(initTimestamp, -Double.MAX_VALUE, Double.MAX_VALUE, b)); + return startIdx + 1; + } + double aMax = + ((points.get(startIdx + 1).getValue() + epsilon) - b) + / (points.get(startIdx + 1).getTimestamp() - initTimestamp); + double aMin = + ((points.get(startIdx + 1).getValue() - epsilon) - b) + / (points.get(startIdx + 1).getTimestamp() - initTimestamp); + if (startIdx + 2 == points.size()) { + segments.add(new SimPieceSegment(initTimestamp, aMin, aMax, b)); + return startIdx + 2; + } + + for (int idx = startIdx + 2; idx < points.size(); idx++) { + double upValue = points.get(idx).getValue() + epsilon; + double downValue = points.get(idx).getValue() - epsilon; + + double upLim = aMax * (points.get(idx).getTimestamp() - initTimestamp) + b; + double downLim = aMin * (points.get(idx).getTimestamp() - initTimestamp) + b; + if ((downValue > upLim || upValue < downLim)) { + segments.add(new SimPieceSegment(initTimestamp, aMin, aMax, b)); + return idx; + } + + if (upValue < upLim) + aMax = Math.max((upValue - b) / (points.get(idx).getTimestamp() - initTimestamp), aMin); + if (downValue > downLim) + aMin = Math.min((downValue - b) / (points.get(idx).getTimestamp() - initTimestamp), aMax); + } + segments.add(new SimPieceSegment(initTimestamp, aMin, aMax, b)); + + return points.size(); + } + + private ArrayList<SimPieceSegment> compress(List<Point> points) { + ArrayList<SimPieceSegment> segments = new ArrayList<>(); + int currentIdx = 0; + while (currentIdx < points.size()) currentIdx = createSegment(currentIdx, points, segments); + + return segments; + } + + private ArrayList<SimPieceSegment> mergePerB(ArrayList<SimPieceSegment> segments) { + double aMinTemp = -Double.MAX_VALUE; + double aMaxTemp = Double.MAX_VALUE; + double b = Double.NaN; + ArrayList<Long> timestamps = new ArrayList<>(); + ArrayList<SimPieceSegment> mergedSegments = new ArrayList<>(); + + segments.sort( + Comparator.comparingDouble(SimPieceSegment::getB) + .thenComparingDouble(SimPieceSegment::getA)); + for (int i = 0; i < segments.size(); i++) { + if (b != segments.get(i).getB()) { + if (timestamps.size() == 1) + mergedSegments.add(new SimPieceSegment(timestamps.get(0), aMinTemp, aMaxTemp, b)); + else { + for (Long timestamp : timestamps) + mergedSegments.add(new SimPieceSegment(timestamp, aMinTemp, aMaxTemp, b)); + } + timestamps.clear(); + timestamps.add(segments.get(i).getInitTimestamp()); + aMinTemp = segments.get(i).getAMin(); + aMaxTemp = segments.get(i).getAMax(); + b = segments.get(i).getB(); + continue; + } + if (segments.get(i).getAMin() <= aMaxTemp && segments.get(i).getAMax() >= aMinTemp) { + timestamps.add(segments.get(i).getInitTimestamp()); + aMinTemp = Math.max(aMinTemp, segments.get(i).getAMin()); + aMaxTemp = Math.min(aMaxTemp, segments.get(i).getAMax()); + } else { + if (timestamps.size() == 1) mergedSegments.add(segments.get(i - 1)); + else { + for (long timestamp : timestamps) + mergedSegments.add(new SimPieceSegment(timestamp, aMinTemp, aMaxTemp, b)); + } + timestamps.clear(); + timestamps.add(segments.get(i).getInitTimestamp()); + aMinTemp = segments.get(i).getAMin(); + aMaxTemp = segments.get(i).getAMax(); + } + } + if (!timestamps.isEmpty()) { + if (timestamps.size() == 1) + mergedSegments.add(new SimPieceSegment(timestamps.get(0), aMinTemp, aMaxTemp, b)); + else { + for (long timestamp : timestamps) + mergedSegments.add(new SimPieceSegment(timestamp, aMinTemp, aMaxTemp, b)); + } + } + + return mergedSegments; + } + + public List<Point> decompress() { + segments.sort(Comparator.comparingLong(SimPieceSegment::getInitTimestamp)); + List<Point> points = new ArrayList<>(); + long currentTimeStamp = segments.get(0).getInitTimestamp(); + + for (int i = 0; i < segments.size() - 1; i++) { + while (currentTimeStamp < segments.get(i + 1).getInitTimestamp()) { + points.add( + new Point( + currentTimeStamp, + segments.get(i).getA() * (currentTimeStamp - segments.get(i).getInitTimestamp()) + + segments.get(i).getB())); + currentTimeStamp++; + } + } + + while (currentTimeStamp <= lastTimeStamp) { + points.add( + new Point( + currentTimeStamp, + segments.get(segments.size() - 1).getA() + * (currentTimeStamp - segments.get(segments.size() - 1).getInitTimestamp()) + + segments.get(segments.size() - 1).getB())); + currentTimeStamp++; + } + + return points; + } + + private void toByteArrayPerBSegments( + ArrayList<SimPieceSegment> segments, boolean variableByte, ByteArrayOutputStream outStream) + throws IOException { + TreeMap<Integer, HashMap<Double, ArrayList<Long>>> input = new TreeMap<>(); + for (SimPieceSegment segment : segments) { + double a = segment.getA(); + int b = (int) Math.round(segment.getB() / epsilon); + long t = segment.getInitTimestamp(); + if (!input.containsKey(b)) input.put(b, new HashMap<>()); + if (!input.get(b).containsKey(a)) input.get(b).put(a, new ArrayList<>()); + input.get(b).get(a).add(t); + } + + VariableByteEncoder.write(input.size(), outStream); + if (input.isEmpty()) return; + int previousB = input.firstKey(); + VariableByteEncoder.write(previousB, outStream); + for (Map.Entry<Integer, HashMap<Double, ArrayList<Long>>> bSegments : input.entrySet()) { + VariableByteEncoder.write(bSegments.getKey() - previousB, outStream); + previousB = bSegments.getKey(); + VariableByteEncoder.write(bSegments.getValue().size(), outStream); + for (Map.Entry<Double, ArrayList<Long>> aSegment : bSegments.getValue().entrySet()) { + FloatEncoder.write(aSegment.getKey().floatValue(), outStream); + if (variableByte) Collections.sort(aSegment.getValue()); + VariableByteEncoder.write(aSegment.getValue().size(), outStream); + long previousTS = 0; + for (Long timestamp : aSegment.getValue()) { + if (variableByte) VariableByteEncoder.write((int) (timestamp - previousTS), outStream); + else UIntEncoder.write(timestamp, outStream); + previousTS = timestamp; + } + } + } + } + + // + // public byte[] toByteArray(boolean variableByte, boolean zstd) throws IOException { + // ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + // byte[] bytes; + // + // FloatEncoder.write((float) epsilon, outStream); + // + // toByteArrayPerBSegments(segments, variableByte, outStream); + // + // if (variableByte) VariableByteEncoder.write((int) lastTimeStamp, outStream); + // else UIntEncoder.write(lastTimeStamp, outStream); + // + // if (zstd) bytes = Zstd.compress(outStream.toByteArray()); + // else bytes = outStream.toByteArray(); + // + // outStream.close(); + // + // return bytes; + // } + + private ArrayList<SimPieceSegment> readMergedPerBSegments( + boolean variableByte, ByteArrayInputStream inStream) throws IOException { + ArrayList<SimPieceSegment> segments = new ArrayList<>(); + long numB = VariableByteEncoder.read(inStream); + if (numB == 0) return segments; + int previousB = VariableByteEncoder.read(inStream); + for (int i = 0; i < numB; i++) { + int b = VariableByteEncoder.read(inStream) + previousB; + previousB = b; + int numA = VariableByteEncoder.read(inStream); + for (int j = 0; j < numA; j++) { + float a = FloatEncoder.read(inStream); + int numTimestamps = VariableByteEncoder.read(inStream); + long timestamp = 0; + for (int k = 0; k < numTimestamps; k++) { + if (variableByte) timestamp += VariableByteEncoder.read(inStream); + else timestamp = UIntEncoder.read(inStream); + segments.add(new SimPieceSegment(timestamp, a, (float) (b * epsilon))); + } + } + } + + return segments; + } + + // private void readByteArray(byte[] input, boolean variableByte, boolean zstd) throws + // IOException { + // byte[] binary; + // if (zstd) binary = Zstd.decompress(input, input.length * 2); //TODO: How to know apriori + // original size? + // else binary = input; + // ByteArrayInputStream inStream = new ByteArrayInputStream(binary); + // + // this.epsilon = FloatEncoder.read(inStream); + // this.segments = readMergedPerBSegments(variableByte, inStream); + // if (variableByte) this.lastTimeStamp = VariableByteEncoder.read(inStream); + // else this.lastTimeStamp = UIntEncoder.read(inStream); + // inStream.close(); + // } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/SimPieceSegment.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/SimPieceSegment.java new file mode 100644 index 00000000000..e443f06027e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/SimPieceSegment.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece; + +public class SimPieceSegment { + + private final long initTimestamp; + private final double aMin; + private final double aMax; + private final double a; + private final double b; + + public SimPieceSegment(long initTimestamp, double a, double b) { + this(initTimestamp, a, a, b); + } + + public SimPieceSegment(long initTimestamp, double aMin, double aMax, double b) { + this.initTimestamp = initTimestamp; + this.aMin = aMin; + this.aMax = aMax; + this.a = (aMin + aMax) / 2; + this.b = b; + } + + public long getInitTimestamp() { + return initTimestamp; + } + + public double getAMin() { + return aMin; + } + + public double getAMax() { + return aMax; + } + + public double getA() { + return a; + } + + public double getB() { + return b; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/TimeSeries.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/TimeSeries.java new file mode 100644 index 00000000000..31ab7307e20 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/TimeSeries.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece; + +import java.util.List; + +public class TimeSeries { + public List<Point> data; + public double range; + public int size; + + public TimeSeries(List<Point> data, double range) { + this.data = data; + this.range = range; + this.size = data.size() * (4 + 4); + } + + public int length() { + return data.size(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/TimeSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/simpiece/TimeSeriesReader.java new file mode 100644 index 00000000000..129baeefeda --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/TimeSeriesReader.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git + +package org.apache.iotdb.db.query.simpiece; + +import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class TimeSeriesReader { + + public static TimeSeries getTimeSeriesFromTsFiles( + List<ChunkSuit4Tri> chunkSuit4TriList, long startTime, long endTime) throws IOException { + // assume chunkSuit4TriList already sorted in increasing time order + ArrayList<Point> ts = new ArrayList<>(); + double max = Double.MIN_VALUE; + double min = Double.MAX_VALUE; + + for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { + ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; + long chunkMinTime = chunkMetadata.getStartTime(); + long chunkMaxTime = chunkMetadata.getEndTime(); + if (chunkMaxTime < startTime) { + continue; + } else if (chunkMinTime >= endTime) { + break; + } else { + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( + chunkSuit4Tri.chunkMetadata, + null); // note do not assign to chunkSuit4Tri.pageReader + for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; + long timestamp = pageReader.timeBuffer.getLong(j * 8); + if (timestamp < startTime) { + continue; + } else if (timestamp >= endTime) { + break; + } else { // rightStartTime<=t<rightEndTime + ByteBuffer valueBuffer = pageReader.valueBuffer; + double value = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); + ts.add(new Point(timestamp, value)); + max = Math.max(max, value); + min = Math.min(min, value); + } + } + } + } + return new TimeSeries(ts, max - min); + } + + public static TimeSeries getTimeSeries(InputStream inputStream, String delimiter, boolean gzip) { + ArrayList<Point> ts = new ArrayList<>(); + double max = Double.MIN_VALUE; + double min = Double.MAX_VALUE; + + try { + if (gzip) { + inputStream = new GZIPInputStream(inputStream); + } + Reader decoder = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(decoder); + + String line; + while ((line = bufferedReader.readLine()) != null) { + String[] elements = line.split(delimiter); + long timestamp = Long.parseLong(elements[0]); + double value = Double.parseDouble(elements[1]); + ts.add(new Point(timestamp, value)); + + max = Math.max(max, value); + min = Math.min(min, value); + } + } catch (IOException e) { + e.printStackTrace(); + } + + return new TimeSeries(ts, max - min); + } + + public static TimeSeries getMyTimeSeries( + InputStream inputStream, + String delimiter, + boolean gzip, + int N, + int startRow, + boolean hasHeader, + boolean seriesTimeColumn) { + // N<0 means read all lines + + ArrayList<Point> ts = new ArrayList<>(); + double max = Double.MIN_VALUE; + double min = Double.MAX_VALUE; + + try { + if (gzip) { + inputStream = new GZIPInputStream(inputStream); + } + Reader decoder = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(decoder); + + String line; + if (hasHeader) { + bufferedReader.readLine(); + } + int startCnt = 0; + while (startCnt < startRow && (line = bufferedReader.readLine()) != null) { + startCnt++; + } + if (startCnt < startRow) { + throw new IOException("not enough rows!"); + } + int cnt = 0; + while ((cnt < N || N < 0) && (line = bufferedReader.readLine()) != null) { + String[] elements = line.split(delimiter); + long timestamp; + if (!seriesTimeColumn) { + timestamp = (long) Double.parseDouble(elements[0]); + } else { + timestamp = cnt + 1; + } + double value = Double.parseDouble(elements[1]); + ts.add(new Point(timestamp, value)); + + max = Math.max(max, value); + min = Math.min(min, value); + cnt++; + } + } catch (IOException e) { + e.printStackTrace(); + } + + return new TimeSeries(ts, max - min); + } +}
