This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch improvedAlign_for_expr in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 600450285eb58533f2f7254cb4030b19b5479922 Author: Tian Jiang <[email protected]> AuthorDate: Fri Nov 18 19:32:55 2022 +0800 refactor statistics --- .../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 3 +- .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 3 +- .../tsfile/write/monitor/WriteStatistics.java | 93 ++++++++++++++++++++++ .../apache/iotdb/tsfile/write/page/PageWriter.java | 81 ++++++++++--------- .../iotdb/tsfile/write/page/TimePageWriter.java | 19 ++--- .../iotdb/tsfile/write/page/ValuePageWriter.java | 52 +++++++----- .../tsfile/write/writer/TimePageWriterTest.java | 8 +- .../tsfile/write/writer/ValuePageWriterTest.java | 18 +++-- 8 files changed, 197 insertions(+), 80 deletions(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java index bd37840d79..39d324037f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java @@ -94,7 +94,8 @@ public class TimeChunkWriter { // init statistics for this chunk and page this.statistics = new TimeStatistics(); - this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType)); + this.pageWriter = + new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType), measurementId); } public void write(long time) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java index 4ba7cba079..9df5b85d71 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java @@ -98,7 +98,8 @@ public class ValueChunkWriter { this.statistics = Statistics.getStatsByType(dataType); this.pageWriter = - new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType); + new ValuePageWriter( + valueEncoder, ICompressor.getCompressor(compressionType), dataType, measurementId); } public void write(long time, long value, boolean isNull) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java new file mode 100644 index 0000000000..e50fc0e92e --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.monitor; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class WriteStatistics { + public static final WriteStatistics INSTANCE = new WriteStatistics(); + public static final String LABEL_TIME = "time"; + public static final String LABEL_VALUE = "value"; + + private final Map<String, Statistic> statisticMap = new ConcurrentHashMap<>(); + + private WriteStatistics() {} + + public void dump(String filePath) throws IOException { + try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) { + for (Entry<String, Statistic> entry : statisticMap.entrySet()) { + String label = entry.getKey(); + Statistic statistic = entry.getValue(); + writer.write( + String.format( + "%s,%d,%d,%d", + label, + statistic.rawSize.get(), + statistic.encodedSize.get(), + statistic.compressedSize.get())); + writer.newLine(); + } + } + } + + public void update(String measurementId, long timeSize, long valueSize, StatisticType type) { + WriteStatistics.INSTANCE.update(WriteStatistics.LABEL_TIME, timeSize, type); + WriteStatistics.INSTANCE.update(WriteStatistics.LABEL_VALUE, valueSize, type); + WriteStatistics.INSTANCE.update( + measurementId + "-" + WriteStatistics.LABEL_TIME, timeSize, type); + WriteStatistics.INSTANCE.update( + measurementId + "-" + WriteStatistics.LABEL_VALUE, valueSize, type); + } + + public void update(String label, long delta, StatisticType type) { + statisticMap.computeIfAbsent(label, l -> new Statistic()).update(delta, type); + } + + private static class Statistic { + private final AtomicLong rawSize = new AtomicLong(); + private final AtomicLong encodedSize = new AtomicLong(); + private final AtomicLong compressedSize = new AtomicLong(); + + private void update(long delta, StatisticType type) { + switch (type) { + case rawSize: + rawSize.addAndGet(delta); + break; + case encodedSize: + encodedSize.addAndGet(delta); + break; + case compressedSize: + compressedSize.addAndGet(delta); + break; + } + } + } + + public enum StatisticType { + rawSize, + encodedSize, + compressedSize; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java index 097f81edbf..711f20b757 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java @@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.write.monitor.WriteStatistics; +import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; @@ -36,7 +38,6 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.concurrent.atomic.AtomicLong; /** * This writer is used to write time-value into a page. It consists of a time encoder, a value @@ -44,15 +45,10 @@ import java.util.concurrent.atomic.AtomicLong; */ public class PageWriter { - public static final AtomicLong timeRawSize = new AtomicLong(); - public static final AtomicLong timeEncodedSize = new AtomicLong(); - public static final AtomicLong valueRawSize = new AtomicLong(); - public static final AtomicLong valueEncodedSize = new AtomicLong(); - public static final AtomicLong compressedSize = new AtomicLong(); - private static final Logger logger = LoggerFactory.getLogger(PageWriter.class); private ICompressor compressor; + private String measurementId; // time private Encoder timeEncoder; @@ -75,6 +71,7 @@ public class PageWriter { this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder()); this.statistics = Statistics.getStatsByType(measurementSchema.getType()); this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor()); + this.measurementId = measurementSchema.getMeasurementId(); } private PageWriter(Encoder timeEncoder, Encoder valueEncoder) { @@ -89,8 +86,7 @@ public class PageWriter { timeEncoder.encode(time, timeOut); valueEncoder.encode(value, valueOut); statistics.update(time, value); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(1); + WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Byte.BYTES, StatisticType.rawSize); } /** write a time value pair into encoder */ @@ -98,8 +94,7 @@ public class PageWriter { timeEncoder.encode(time, timeOut); valueEncoder.encode(value, valueOut); statistics.update(time, value); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(Short.BYTES); + WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Short.BYTES, StatisticType.rawSize); } /** write a time value pair into encoder */ @@ -107,8 +102,8 @@ public class PageWriter { timeEncoder.encode(time, timeOut); valueEncoder.encode(value, valueOut); statistics.update(time, value); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(Integer.BYTES); + WriteStatistics.INSTANCE.update( + measurementId, Long.BYTES, Integer.BYTES, StatisticType.rawSize); } /** write a time value pair into encoder */ @@ -116,8 +111,7 @@ public class PageWriter { timeEncoder.encode(time, timeOut); valueEncoder.encode(value, valueOut); statistics.update(time, value); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(Long.BYTES); + WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Long.BYTES, StatisticType.rawSize); } /** write a time value pair into encoder */ @@ -125,8 +119,7 @@ public class PageWriter { timeEncoder.encode(time, timeOut); valueEncoder.encode(value, valueOut); statistics.update(time, value); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(Float.BYTES); + WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Float.BYTES, StatisticType.rawSize); } /** write a time value pair into encoder */ @@ -134,8 +127,7 @@ public class PageWriter { timeEncoder.encode(time, timeOut); valueEncoder.encode(value, valueOut); statistics.update(time, value); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(Double.BYTES); + WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Double.BYTES, StatisticType.rawSize); } /** write a time value pair into encoder */ @@ -143,8 +135,8 @@ public class PageWriter { timeEncoder.encode(time, timeOut); valueEncoder.encode(value, valueOut); statistics.update(time, value); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(value.getLength()); + WriteStatistics.INSTANCE.update( + measurementId, Long.BYTES, value.getLength(), StatisticType.rawSize); } /** write time series into encoder */ @@ -153,8 +145,8 @@ public class PageWriter { timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } - timeRawSize.addAndGet(Long.BYTES * batchSize); - valueRawSize.addAndGet(batchSize); + WriteStatistics.INSTANCE.update( + measurementId, (long) Long.BYTES * batchSize, batchSize, StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -164,8 +156,11 @@ public class PageWriter { timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } - timeRawSize.addAndGet(Long.BYTES * batchSize); - valueRawSize.addAndGet(Integer.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, + (long) Long.BYTES * batchSize, + (long) Integer.BYTES * batchSize, + StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -175,8 +170,11 @@ public class PageWriter { timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } - timeRawSize.addAndGet(Long.BYTES * batchSize); - valueRawSize.addAndGet(Long.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, + (long) Long.BYTES * batchSize, + (long) Long.BYTES * batchSize, + StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -186,8 +184,11 @@ public class PageWriter { timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } - timeRawSize.addAndGet(Long.BYTES * batchSize); - valueRawSize.addAndGet(Float.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, + (long) Long.BYTES * batchSize, + (long) Float.BYTES * batchSize, + StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -197,8 +198,11 @@ public class PageWriter { timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); } - timeRawSize.addAndGet(Long.BYTES * batchSize); - valueRawSize.addAndGet(Double.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, + (long) Long.BYTES * batchSize, + (long) Double.BYTES * batchSize, + StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -207,8 +211,8 @@ public class PageWriter { for (int i = 0; i < batchSize; i++) { timeEncoder.encode(timestamps[i], timeOut); valueEncoder.encode(values[i], valueOut); - timeRawSize.addAndGet(Long.BYTES); - valueRawSize.addAndGet(values[i].getLength()); + WriteStatistics.INSTANCE.update( + measurementId, Long.BYTES, values[i].getLength(), StatisticType.rawSize); } statistics.update(timestamps, values, batchSize); } @@ -220,8 +224,8 @@ public class PageWriter { } /** - * getUncompressedBytes return data what it has been written in form of <code> - * size of time list, time list, value list</code> + * getUncompressedBytes return data what it has been written in form of <code> size of time list, + * time list, value list</code> * * @return a new readable ByteBuffer whose position is 0. */ @@ -232,8 +236,8 @@ public class PageWriter { buffer.put(timeOut.getBuf(), 0, timeOut.size()); buffer.put(valueOut.getBuf(), 0, valueOut.size()); buffer.flip(); - timeEncodedSize.addAndGet(timeOut.size()); - valueEncodedSize.addAndGet(valueOut.size()); + WriteStatistics.INSTANCE.update( + measurementId, timeOut.size(), valueOut.size(), StatisticType.encodedSize); return buffer; } @@ -262,7 +266,8 @@ public class PageWriter { compressor.compress( pageData.array(), pageData.position(), uncompressedSize, compressedBytes); } - PageWriter.compressedSize.addAndGet(compressedSize); + WriteStatistics.INSTANCE.update( + measurementId, compressedSize, compressedSize, StatisticType.compressedSize); // write the page header to IOWriter int sizeWithoutStatistic = 0; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java index 1d1cd8369f..23c1b197bc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java @@ -24,6 +24,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.write.monitor.WriteStatistics; +import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.concurrent.atomic.AtomicLong; /** * This writer is used to write time into a page. It consists of a time encoder and respective @@ -41,10 +42,8 @@ import java.util.concurrent.atomic.AtomicLong; public class TimePageWriter { private static final Logger logger = LoggerFactory.getLogger(TimePageWriter.class); - public static final AtomicLong timeRawSize = new AtomicLong(); - public static final AtomicLong timeEncodedSize = new AtomicLong(); - public static final AtomicLong timeCompressedSize = new AtomicLong(); + private final String measurementId; private final ICompressor compressor; // time @@ -57,18 +56,19 @@ public class TimePageWriter { */ private TimeStatistics statistics; - public TimePageWriter(Encoder timeEncoder, ICompressor compressor) { + public TimePageWriter(Encoder timeEncoder, ICompressor compressor, String measurementId) { this.timeOut = new PublicBAOS(); this.timeEncoder = timeEncoder; this.statistics = new TimeStatistics(); this.compressor = compressor; + this.measurementId = measurementId; } /** write a time into encoder */ public void write(long time) { timeEncoder.encode(time, timeOut); statistics.update(time); - timeRawSize.addAndGet(Long.BYTES); + WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, 0, StatisticType.rawSize); } /** write time series into encoder */ @@ -76,7 +76,8 @@ public class TimePageWriter { for (int i = 0; i < batchSize; i++) { timeEncoder.encode(timestamps[i], timeOut); } - timeRawSize.addAndGet(batchSize * Long.BYTES); + WriteStatistics.INSTANCE.update( + measurementId, (long) Long.BYTES * batchSize, 0, StatisticType.rawSize); statistics.update(timestamps, batchSize); } @@ -96,7 +97,7 @@ public class TimePageWriter { ByteBuffer buffer = ByteBuffer.allocate(timeOut.size()); buffer.put(timeOut.getBuf(), 0, timeOut.size()); buffer.flip(); - timeEncodedSize.addAndGet(timeOut.size()); + WriteStatistics.INSTANCE.update(measurementId, timeOut.size(), 0, StatisticType.encodedSize); return buffer; } @@ -125,7 +126,7 @@ public class TimePageWriter { compressor.compress( pageData.array(), pageData.position(), uncompressedSize, compressedBytes); } - timeCompressedSize.addAndGet(compressedSize); + WriteStatistics.INSTANCE.update(measurementId, compressedSize, 0, StatisticType.compressedSize); // write the page header to IOWriter int sizeWithoutStatistic = 0; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java index aca4a450ad..b6c252d221 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java @@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.write.monitor.WriteStatistics; +import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +37,6 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.concurrent.atomic.AtomicLong; /** * This writer is used to write value into a page. It consists of a value encoder and respective @@ -43,10 +44,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ValuePageWriter { - public static final AtomicLong valueRawSize = new AtomicLong(); - public static final AtomicLong valueEncodedSize = new AtomicLong(); - public static final AtomicLong valueCompressedSize = new AtomicLong(); - private static final Logger logger = LoggerFactory.getLogger(ValuePageWriter.class); private final ICompressor compressor; @@ -67,9 +64,11 @@ public class ValuePageWriter { private final PublicBAOS bitmapOut; + private String measurementId; private static final int MASK = 1 << 7; - public ValuePageWriter(Encoder valueEncoder, ICompressor compressor, TSDataType dataType) { + public ValuePageWriter( + Encoder valueEncoder, ICompressor compressor, TSDataType dataType, String measurementId) { this.valueOut = new PublicBAOS(); this.bitmap = 0; this.size = 0; @@ -77,13 +76,15 @@ public class ValuePageWriter { this.valueEncoder = valueEncoder; this.statistics = Statistics.getStatsByType(dataType); this.compressor = compressor; + this.measurementId = measurementId; } /** write a time value pair into encoder */ public void write(long time, boolean value, boolean isNull) { setBit(isNull); if (!isNull) { - valueRawSize.addAndGet(1); + WriteStatistics.INSTANCE.update(measurementId, 0, 1, StatisticType.rawSize); + WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, 0, StatisticType.encodedSize); valueEncoder.encode(value, valueOut); statistics.update(time, value); } @@ -93,7 +94,7 @@ public class ValuePageWriter { public void write(long time, short value, boolean isNull) { setBit(isNull); if (!isNull) { - valueRawSize.addAndGet(Short.BYTES); + WriteStatistics.INSTANCE.update(measurementId, 0, Short.BYTES, StatisticType.rawSize); valueEncoder.encode(value, valueOut); statistics.update(time, value); } @@ -103,7 +104,7 @@ public class ValuePageWriter { public void write(long time, int value, boolean isNull) { setBit(isNull); if (!isNull) { - valueRawSize.addAndGet(Integer.BYTES); + WriteStatistics.INSTANCE.update(measurementId, 0, Integer.BYTES, StatisticType.rawSize); valueEncoder.encode(value, valueOut); statistics.update(time, value); } @@ -113,7 +114,7 @@ public class ValuePageWriter { public void write(long time, long value, boolean isNull) { setBit(isNull); if (!isNull) { - valueRawSize.addAndGet(Long.BYTES); + WriteStatistics.INSTANCE.update(measurementId, 0, Long.BYTES, StatisticType.rawSize); valueEncoder.encode(value, valueOut); statistics.update(time, value); } @@ -123,7 +124,7 @@ public class ValuePageWriter { public void write(long time, float value, boolean isNull) { setBit(isNull); if (!isNull) { - valueRawSize.addAndGet(Float.BYTES); + WriteStatistics.INSTANCE.update(measurementId, 0, Float.BYTES, StatisticType.rawSize); valueEncoder.encode(value, valueOut); statistics.update(time, value); } @@ -133,7 +134,7 @@ public class ValuePageWriter { public void write(long time, double value, boolean isNull) { setBit(isNull); if (!isNull) { - valueRawSize.addAndGet(Double.BYTES); + WriteStatistics.INSTANCE.update(measurementId, 0, Double.BYTES, StatisticType.rawSize); valueEncoder.encode(value, valueOut); statistics.update(time, value); } @@ -143,7 +144,7 @@ public class ValuePageWriter { public void write(long time, Binary value, boolean isNull) { setBit(isNull); if (!isNull) { - valueRawSize.addAndGet(value.getLength()); + WriteStatistics.INSTANCE.update(measurementId, 0, value.getLength(), StatisticType.rawSize); valueEncoder.encode(value, valueOut); statistics.update(time, value); } @@ -165,7 +166,7 @@ public class ValuePageWriter { for (int i = 0; i < batchSize; i++) { valueEncoder.encode(values[i], valueOut); } - valueRawSize.addAndGet(batchSize); + WriteStatistics.INSTANCE.update(measurementId, 0, batchSize, StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -174,7 +175,8 @@ public class ValuePageWriter { for (int i = 0; i < batchSize; i++) { valueEncoder.encode(values[i], valueOut); } - valueRawSize.addAndGet(Integer.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, 0, Integer.BYTES * batchSize, StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -183,7 +185,8 @@ public class ValuePageWriter { for (int i = 0; i < batchSize; i++) { valueEncoder.encode(values[i], valueOut); } - valueRawSize.addAndGet(Long.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, 0, (long) Long.BYTES * batchSize, StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -192,7 +195,8 @@ public class ValuePageWriter { for (int i = 0; i < batchSize; i++) { valueEncoder.encode(values[i], valueOut); } - valueRawSize.addAndGet(Float.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, 0, Float.BYTES * batchSize, StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -201,7 +205,8 @@ public class ValuePageWriter { for (int i = 0; i < batchSize; i++) { valueEncoder.encode(values[i], valueOut); } - valueRawSize.addAndGet(Double.BYTES * batchSize); + WriteStatistics.INSTANCE.update( + measurementId, 0, Double.BYTES * batchSize, StatisticType.rawSize); statistics.update(timestamps, values, batchSize); } @@ -209,7 +214,8 @@ public class ValuePageWriter { public void write(long[] timestamps, Binary[] values, int batchSize) { for (int i = 0; i < batchSize; i++) { valueEncoder.encode(values[i], valueOut); - valueRawSize.addAndGet(values[i].getLength()); + WriteStatistics.INSTANCE.update( + measurementId, 0, values[i].getLength(), StatisticType.rawSize); } statistics.update(timestamps, values, batchSize); } @@ -230,7 +236,11 @@ public class ValuePageWriter { */ public ByteBuffer getUncompressedBytes() throws IOException { prepareEndWriteOnePage(); - valueEncodedSize.addAndGet(Integer.BYTES + bitmapOut.size() + valueOut.size()); + WriteStatistics.INSTANCE.update( + measurementId, + 0, + Integer.BYTES + bitmapOut.size() + valueOut.size(), + StatisticType.encodedSize); ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + bitmapOut.size() + valueOut.size()); buffer.putInt(size); buffer.put(bitmapOut.getBuf(), 0, bitmapOut.size()); @@ -271,7 +281,7 @@ public class ValuePageWriter { compressor.compress( pageData.array(), pageData.position(), uncompressedSize, compressedBytes); } - valueCompressedSize.addAndGet(compressedSize); + WriteStatistics.INSTANCE.update(measurementId, 0, compressedSize, StatisticType.compressedSize); // write the page header to IOWriter int sizeWithoutStatistic = 0; diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java index cab975c9ab..a28f4d2619 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java @@ -45,7 +45,7 @@ public class TimePageWriterTest { public void testWrite() { Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, ""); try { pageWriter.write(1L); assertEquals(8, pageWriter.estimateMaxMemSize()); @@ -68,7 +68,7 @@ public class TimePageWriterTest { public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() { Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, ""); PublicBAOS publicBAOS = new PublicBAOS(); try { pageWriter.write(1L); @@ -99,7 +99,7 @@ public class TimePageWriterTest { public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() { Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, ""); PublicBAOS publicBAOS = new PublicBAOS(); try { pageWriter.write(1L); @@ -135,7 +135,7 @@ public class TimePageWriterTest { public void testWritePageHeaderAndDataIntoBuffWithSnappy() { Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY); - TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, ""); PublicBAOS publicBAOS = new PublicBAOS(); try { pageWriter.write(1L); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java index a43159fba1..fc6cc272fa 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java @@ -46,7 +46,8 @@ public class ValuePageWriterTest { public void testWrite1() { Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + ValuePageWriter pageWriter = + new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, ""); try { pageWriter.write(1L, 1.0f, false); assertEquals(9, pageWriter.estimateMaxMemSize()); @@ -69,7 +70,8 @@ public class ValuePageWriterTest { public void testWrite2() { Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + ValuePageWriter pageWriter = + new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, ""); try { for (int time = 1; time <= 16; time++) { pageWriter.write(time, (float) time, time % 4 == 0); @@ -99,7 +101,8 @@ public class ValuePageWriterTest { public void testWrite3() { Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + ValuePageWriter pageWriter = + new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, ""); try { for (int time = 1; time <= 20; time++) { pageWriter.write(time, (float) time, time % 4 == 0); @@ -130,7 +133,8 @@ public class ValuePageWriterTest { public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() { Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + ValuePageWriter pageWriter = + new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, ""); PublicBAOS publicBAOS = new PublicBAOS(); try { for (int time = 1; time <= 20; time++) { @@ -178,7 +182,8 @@ public class ValuePageWriterTest { public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() { Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); - ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + ValuePageWriter pageWriter = + new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, ""); PublicBAOS publicBAOS = new PublicBAOS(); try { for (int time = 1; time <= 20; time++) { @@ -237,7 +242,8 @@ public class ValuePageWriterTest { public void testWritePageHeaderAndDataIntoBuffWithSnappy() { Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0); ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY); - ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT); + ValuePageWriter pageWriter = + new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, ""); PublicBAOS publicBAOS = new PublicBAOS(); try { for (int time = 1; time <= 20; time++) {
