This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch vectorMemTable in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56d92eb416866d86d8217d5a2f0de1b425b8237f Merge: eb42aed 9c72690 Author: HTHou <[email protected]> AuthorDate: Thu Mar 11 14:05:44 2021 +0800 merge Vector .../iotdb/cluster/client/DataClientProvider.java | 4 +- .../cluster/client/async/AsyncClientPool.java | 34 ++- .../iotdb/cluster/client/sync/SyncClientPool.java | 59 +++-- .../iotdb/cluster/client/sync/SyncDataClient.java | 9 +- .../iotdb/cluster/client/sync/SyncMetaClient.java | 10 +- .../exception/BadSeedUrlFormatException.java | 3 +- .../apache/iotdb/cluster/metadata/CMManager.java | 78 ++---- .../apache/iotdb/cluster/metadata/MetaPuller.java | 14 +- .../iotdb/cluster/query/ClusterPlanExecutor.java | 37 +-- .../cluster/query/aggregate/ClusterAggregator.java | 14 +- .../cluster/query/fill/ClusterPreviousFill.java | 14 +- .../query/groupby/RemoteGroupByExecutor.java | 27 +- .../query/last/ClusterLastQueryExecutor.java | 14 +- .../cluster/query/reader/ClusterReaderFactory.java | 14 +- .../iotdb/cluster/query/reader/DataSourceInfo.java | 12 +- .../apache/iotdb/cluster/server/ClientServer.java | 11 +- .../cluster/server/PullSnapshotHintService.java | 9 +- .../cluster/server/service/BaseSyncService.java | 15 +- .../cluster/server/service/DataSyncService.java | 7 +- .../cluster/server/service/MetaSyncService.java | 5 +- .../cluster/client/DataClientProviderTest.java | 136 ++++++++++ .../cluster/client/sync/SyncClientPoolTest.java | 17 +- .../cluster/client/sync/SyncDataClientTest.java | 51 ++++ .../cluster/client/sync/SyncMetaClientTest.java | 47 ++++ server/src/assembly/resources/conf/iotdb-env.bat | 5 + server/src/assembly/resources/conf/iotdb-env.sh | 3 + .../iotdb/db/engine/flush/MemTableFlushTask.java | 12 +- .../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 12 +- .../java/org/apache/iotdb/db/utils/MergeUtils.java | 27 +- .../iotdb/tsfile/file/header/ChunkHeader.java | 15 +- .../file/metadata/statistics/Statistics.java | 94 ++----- .../file/metadata/statistics/TimeStatistics.java | 161 ++++++++++++ .../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 4 + .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 18 +- .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 24 +- .../iotdb/tsfile/write/chunk/IChunkWriter.java | 15 +- .../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 255 ++++++++++++++++++ .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 252 ++++++++++++++++++ .../tsfile/write/chunk/VectorChunkWriterImpl.java | 202 ++++++++++++++ .../apache/iotdb/tsfile/write/page/PageWriter.java | 1 + .../iotdb/tsfile/write/page/TimePageWriter.java | 177 +++++++++++++ .../page/{PageWriter.java => ValuePageWriter.java} | 160 ++++++----- .../write/record/datapoint/BooleanDataPoint.java | 2 +- .../write/record/datapoint/DoubleDataPoint.java | 2 +- .../write/record/datapoint/FloatDataPoint.java | 2 +- .../write/record/datapoint/IntDataPoint.java | 2 +- .../write/record/datapoint/LongDataPoint.java | 2 +- .../write/record/datapoint/StringDataPoint.java | 2 +- .../tsfile/write/schema/IMeasurementSchema.java | 34 ++- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 22 +- .../iotdb/tsfile/write/TsFileIOWriterTest.java | 3 +- .../write/writer/RestorableTsFileIOWriterTest.java | 5 +- .../tsfile/write/writer/TestTsFileOutput.java | 70 +++++ .../tsfile/write/writer/TimeChunkWriterTest.java | 111 ++++++++ .../tsfile/write/writer/TimePageWriterTest.java | 171 ++++++++++++ .../tsfile/write/writer/ValueChunkWriterTest.java | 109 ++++++++ .../tsfile/write/writer/ValuePageWriterTest.java | 291 +++++++++++++++++++++ .../write/writer/VectorChunkWriterImplTest.java | 178 +++++++++++++ .../write/writer/VectorMeasurementSchemaStub.java | 80 ++++++ 59 files changed, 2722 insertions(+), 442 deletions(-) diff --cc server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index dcd82ab,caa0893..c1bc5a9 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@@ -173,29 -173,23 +173,29 @@@ public class MemTableFlushTask switch (dataType) { case BOOLEAN: - seriesWriterImpl.write(time, tvPairs.getBoolean(i)); + seriesWriterImpl.write(time, tvPairs.getBoolean(i), false); break; case INT32: - seriesWriterImpl.write(time, tvPairs.getInt(i)); + seriesWriterImpl.write(time, tvPairs.getInt(i), false); break; case INT64: - seriesWriterImpl.write(time, tvPairs.getLong(i)); + seriesWriterImpl.write(time, tvPairs.getLong(i), false); break; case FLOAT: - seriesWriterImpl.write(time, tvPairs.getFloat(i)); + seriesWriterImpl.write(time, tvPairs.getFloat(i), false); break; case DOUBLE: - seriesWriterImpl.write(time, tvPairs.getDouble(i)); + seriesWriterImpl.write(time, tvPairs.getDouble(i), false); break; case TEXT: - seriesWriterImpl.write(time, tvPairs.getBinary(i)); + seriesWriterImpl.write(time, tvPairs.getBinary(i), false); break; + case VECTOR: + // TODO: +// for ( : tvPairs.getVector(i)) { +// seriesWriterImpl.write(time, tvPairs.getVector(i)[], get); +// } + break; default: LOGGER.error( "Storage group {} does not support data type: {}", storageGroup, dataType); diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 1ec2001,1c1ba7f..759a751 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@@ -77,6 -77,8 +77,8 @@@ public abstract class Statistics<T> return new DoubleStatistics(); case FLOAT: return new FloatStatistics(); - case Vector: ++ case VECTOR: + return new TimeStatistics(); default: throw new UnknownColumnTypeException(type.toString()); } diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java index 0000000,74bd701..e812166 mode 000000,100644..100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java @@@ -1,0 -1,161 +1,161 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.tsfile.file.metadata.statistics; + + import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.nio.ByteBuffer; + + public class TimeStatistics extends Statistics { + + static final int TIME_STATISTICS_FIXED_RAM_SIZE = 40; + + @Override + public TSDataType getType() { - return TSDataType.Vector; ++ return TSDataType.VECTOR; + } + + @Override + public int getStatsSize() { + return 0; + } + + @Override + public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) { + throw new StatisticsClassException("Time statistics does not support: set min max from bytes"); + } + + @Override + public Long getMinValue() { + throw new StatisticsClassException("Time statistics does not support: min value"); + } + + @Override + public Long getMaxValue() { + throw new StatisticsClassException("Time statistics does not support: max value"); + } + + @Override + public Long getFirstValue() { + throw new StatisticsClassException("Time statistics does not support: first value"); + } + + @Override + public Long getLastValue() { + throw new StatisticsClassException("Time statistics does not support: last value"); + } + + @Override + public double getSumDoubleValue() { + throw new StatisticsClassException("Time statistics does not support: double sum"); + } + + @Override + public long getSumLongValue() { + throw new StatisticsClassException("Time statistics does not support: long sum"); + } + + @Override + void updateStats(long value) { + throw new StatisticsClassException("Time statistics does not support: update stats"); + } + + @Override + void updateStats(long[] values, int batchSize) { + throw new StatisticsClassException("Time statistics does not support: update stats"); + } + + @Override + public void updateStats(long minValue, long maxValue) { + throw new StatisticsClassException("Time statistics does not support: update stats"); + } + + @Override + public long calculateRamSize() { + return TIME_STATISTICS_FIXED_RAM_SIZE; + } + + @Override + protected void mergeStatisticsValue(Statistics stats) {} + + @Override + public byte[] getMinValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get min value bytes"); + } + + @Override + public byte[] getMaxValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get max value bytes"); + } + + @Override + public byte[] getFirstValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get first value bytes"); + } + + @Override + public byte[] getLastValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get last value bytes"); + } + + @Override + public byte[] getSumValueBytes() { + throw new StatisticsClassException("Time statistics does not support: get sum value bytes"); + } + + @Override + public ByteBuffer getMinValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get min value bytes"); + } + + @Override + public ByteBuffer getMaxValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get max value buffer"); + } + + @Override + public ByteBuffer getFirstValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get first value buffer"); + } + + @Override + public ByteBuffer getLastValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get last value buffer"); + } + + @Override + public ByteBuffer getSumValueBuffer() { + throw new StatisticsClassException("Time statistics does not support: get sum value buffer"); + } + + @Override + public int serializeStats(OutputStream outputStream) { + return 0; + } + + @Override + public void deserialize(InputStream inputStream) throws IOException {} + + @Override + public void deserialize(ByteBuffer byteBuffer) {} + } diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java index 0000000,df9ded9..522eff5 mode 000000,100644..100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java @@@ -1,0 -1,255 +1,255 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.tsfile.write.chunk; + + import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; + import org.apache.iotdb.tsfile.compress.ICompressor; + import org.apache.iotdb.tsfile.encoding.encoder.Encoder; + import org.apache.iotdb.tsfile.file.header.ChunkHeader; + import org.apache.iotdb.tsfile.file.header.PageHeader; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; + import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics; + import org.apache.iotdb.tsfile.utils.PublicBAOS; + import org.apache.iotdb.tsfile.write.page.TimePageWriter; + import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.io.IOException; + + public class TimeChunkWriter { + + private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class); + + private final String measurementId; + + private final TSEncoding encodingType; + + private final CompressionType compressionType; + + /** all pages of this chunk. */ + private final PublicBAOS pageBuffer; + + private int numOfPages; + + /** write data into current page */ + private TimePageWriter pageWriter; + + /** page size threshold. */ + private final long pageSizeThreshold; + + private final int maxNumberOfPointsInPage; + + /** value count in current page. */ + private int valueCountInOnePageForNextCheck; + + // initial value for valueCountInOnePageForNextCheck + private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500; + + /** statistic of this chunk. */ + private TimeStatistics statistics; + + /** first page info */ + private int sizeWithoutStatistic; + + private Statistics<?> firstPageStatistics; + + public TimeChunkWriter( + String measurementId, + CompressionType compressionType, + TSEncoding encodingType, + Encoder timeEncoder) { + this.measurementId = measurementId; + this.encodingType = encodingType; + this.compressionType = compressionType; + this.pageBuffer = new PublicBAOS(); + + this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + this.maxNumberOfPointsInPage = + TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); + // initial check of memory usage. So that we have enough data to make an initial prediction + this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + + // init statistics for this chunk and page + this.statistics = new TimeStatistics(); + + this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType)); + } + + public void write(long time) { + pageWriter.write(time); + } + + public void write(long[] timestamps, int batchSize) { + pageWriter.write(timestamps, batchSize); + } + + /** + * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it + * to pageBuffer + */ + public boolean checkPageSizeAndMayOpenANewPage() { + if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) { + logger.debug("current line count reaches the upper bound, write page {}", measurementId); + return true; + } else if (pageWriter.getPointNumber() + >= valueCountInOnePageForNextCheck) { // need to check memory size + // not checking the memory used for every value + long currentPageSize = pageWriter.estimateMaxMemSize(); + if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold + // we will write the current page + logger.debug( + "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}", + measurementId, + pageSizeThreshold, + currentPageSize, + pageWriter.getPointNumber()); + valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + return true; + } else { + // reset the valueCountInOnePageForNextCheck for the next page + valueCountInOnePageForNextCheck = + (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber()); + } + } + return false; + } + + public void writePageToPageBuffer() { + try { + if (numOfPages == 0) { // record the firstPageStatistics + this.firstPageStatistics = pageWriter.getStatistics(); + this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true); + } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer + byte[] b = pageBuffer.toByteArray(); + pageBuffer.reset(); + pageBuffer.write(b, 0, this.sizeWithoutStatistic); + firstPageStatistics.serialize(pageBuffer); + pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic); + pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false); + firstPageStatistics = null; + } else { + pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false); + } + + // update statistics of this chunk + numOfPages++; + this.statistics.mergeStatistics(pageWriter.getStatistics()); + } catch (IOException e) { + logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e); + } finally { + // clear start time stamp for next initializing + pageWriter.reset(); + } + } + + public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException { + sealCurrentPage(); + writeAllPagesOfChunkToTsFile(tsfileWriter); + + // reinit this chunk writer + pageBuffer.reset(); + numOfPages = 0; + firstPageStatistics = null; + this.statistics = new TimeStatistics(); + } + + public long estimateMaxSeriesMemSize() { + return pageBuffer.size() + + pageWriter.estimateMaxMemSize() + + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics() + + pageWriter.getStatistics().getSerializedSize(); + } + + public long getCurrentChunkSize() { + if (pageBuffer.size() == 0) { + return 0; + } + // return the serialized size of the chunk header + all pages + return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size()) + + (long) pageBuffer.size(); + } + + public void sealCurrentPage() { + if (pageWriter != null && pageWriter.getPointNumber() > 0) { + writePageToPageBuffer(); + } + } + + public void clearPageWriter() { + pageWriter = null; + } + + public int getNumOfPages() { + return numOfPages; + } + + public TSDataType getDataType() { - return TSDataType.Vector; ++ return TSDataType.VECTOR; + } + + /** + * write the page to specified IOWriter. + * + * @param writer the specified IOWriter + * @throws IOException exception in IO + */ + public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException { + if (statistics.getCount() == 0) { + return; + } + + // start to write this column chunk + writer.startFlushChunk( + measurementId, + compressionType, - TSDataType.Vector, ++ TSDataType.VECTOR, + encodingType, + statistics, + pageBuffer.size(), + numOfPages, + 0x80); + + long dataOffset = writer.getPos(); + + // write all pages of this column + writer.writeBytesToStream(pageBuffer); + + int dataSize = (int) (writer.getPos() - dataOffset); + if (dataSize != pageBuffer.size()) { + throw new IOException( + "Bytes written is inconsistent with the size of data: " + + dataSize + + " !=" + + " " + + pageBuffer.size()); + } + + writer.endCurrentChunk(); + } + + /** only used for test */ + public PublicBAOS getPageBuffer() { + return pageBuffer; + } + } diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java index 0000000,af71ecd..8f1e907 mode 000000,100644..100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java @@@ -1,0 -1,202 +1,202 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.tsfile.write.chunk; + + import org.apache.iotdb.tsfile.encoding.encoder.Encoder; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + import org.apache.iotdb.tsfile.utils.Binary; + import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.List; + + public class VectorChunkWriterImpl implements IChunkWriter { + + private final TimeChunkWriter timeChunkWriter; + private final List<ValueChunkWriter> valueChunkWriterList; + private int valueIndex; + + /** @param schema schema of this measurement */ + public VectorChunkWriterImpl(IMeasurementSchema schema) { + timeChunkWriter = + new TimeChunkWriter( + schema.getMeasurementId(), + schema.getCompressor(), + schema.getTimeTSEncoding(), + schema.getTimeEncoder()); + + List<String> valueMeasurementIdList = schema.getValueMeasurementIdList(); + List<TSDataType> valueTSDataTypeList = schema.getValueTSDataTypeList(); + List<TSEncoding> valueTSEncodingList = schema.getValueTSEncodingList(); + List<Encoder> valueEncoderList = schema.getValueEncoderList(); + + valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size()); + for (int i = 0; i < valueMeasurementIdList.size(); i++) { + valueChunkWriterList.add( + new ValueChunkWriter( + valueMeasurementIdList.get(i), + schema.getCompressor(), + valueTSDataTypeList.get(i), + valueTSEncodingList.get(i), + valueEncoderList.get(i))); + } + + this.valueIndex = 0; + } + + @Override + public void write(long time, int value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, long value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, boolean value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, float value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, double value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time, Binary value, boolean isNull) { + valueChunkWriterList.get(valueIndex++).write(time, value, isNull); + } + + @Override + public void write(long time) { + valueIndex = 0; + timeChunkWriter.write(time); + if (checkPageSizeAndMayOpenANewPage()) { + writePageToPageBuffer(); + } + } + + // TODO tsfile write interface + @Override + public void write(long[] timestamps, int[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(long[] timestamps, long[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(long[] timestamps, boolean[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(long[] timestamps, float[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(long[] timestamps, double[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(long[] timestamps, Binary[] values, int batchSize) { + throw new UnsupportedOperationException(); + } + + /** + * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it + * to pageBuffer + */ + private boolean checkPageSizeAndMayOpenANewPage() { + return timeChunkWriter.checkPageSizeAndMayOpenANewPage(); + } + + private void writePageToPageBuffer() { + timeChunkWriter.writePageToPageBuffer(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.writePageToPageBuffer(); + } + } + + @Override + public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException { + timeChunkWriter.writeToFileWriter(tsfileWriter); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.writeToFileWriter(tsfileWriter); + } + } + + @Override + public long estimateMaxSeriesMemSize() { + long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize(); + } + return estimateMaxSeriesMemSize; + } + + @Override + public long getCurrentChunkSize() { + long currentChunkSize = timeChunkWriter.getCurrentChunkSize(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + currentChunkSize += valueChunkWriter.getCurrentChunkSize(); + } + return currentChunkSize; + } + + @Override + public void sealCurrentPage() { + timeChunkWriter.sealCurrentPage(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.sealCurrentPage(); + } + } + + @Override + public void clearPageWriter() { + timeChunkWriter.clearPageWriter(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + valueChunkWriter.clearPageWriter(); + } + } + + @Override + public int getNumOfPages() { + return timeChunkWriter.getNumOfPages(); + } + + @Override + public TSDataType getDataType() { - return TSDataType.Vector; ++ return TSDataType.VECTOR; + } + } diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java index 0000000,9968815..bdca8d5 mode 000000,100644..100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java @@@ -1,0 -1,111 +1,111 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.tsfile.write.writer; + + import org.apache.iotdb.tsfile.encoding.encoder.Encoder; + import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; + import org.apache.iotdb.tsfile.file.MetaMarker; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + import org.apache.iotdb.tsfile.utils.PublicBAOS; + import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; + import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter; + + import org.junit.Test; + + import java.io.IOException; + import java.nio.ByteBuffer; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.fail; + + public class TimeChunkWriterTest { + + @Test + public void testWrite1() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + TimeChunkWriter chunkWriter = + new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder); + for (long time = 1; time <= 10; time++) { + chunkWriter.write(time); + } + assertFalse(chunkWriter.checkPageSizeAndMayOpenANewPage()); + chunkWriter.sealCurrentPage(); + // page without statistics size: 82 + chunk header size: 8 + assertEquals(90L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeAllPagesOfChunkToTsFile(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + assertEquals( + (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer)); + assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(82, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); - assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); ++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(82, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testWrite2() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + TimeChunkWriter chunkWriter = + new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder); + for (long time = 1; time <= 10; time++) { + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + for (long time = 11; time <= 20; time++) { + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + assertEquals(2, chunkWriter.getNumOfPages()); + // two pages with statistics size: (82 + 17) * 2 + chunk header size: 9 + assertEquals(207L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeAllPagesOfChunkToTsFile(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer)); + assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(198, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); - assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); ++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(198, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + } diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java index 0000000,2ec6294..cab975c mode 000000,100644..100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java @@@ -1,0 -1,171 +1,171 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.tsfile.write.writer; + + import org.apache.iotdb.tsfile.compress.ICompressor; + import org.apache.iotdb.tsfile.compress.IUnCompressor; + import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder; + import org.apache.iotdb.tsfile.encoding.encoder.Encoder; + import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics; + import org.apache.iotdb.tsfile.utils.PublicBAOS; + import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; + import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + import org.apache.iotdb.tsfile.write.page.TimePageWriter; + + import org.junit.Test; + + import java.io.IOException; + import java.nio.ByteBuffer; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.fail; + + public class TimePageWriterTest { + + @Test + public void testWrite() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + try { + pageWriter.write(1L); + assertEquals(8, pageWriter.estimateMaxMemSize()); + ByteBuffer buffer1 = pageWriter.getUncompressedBytes(); + ByteBuffer buffer = ByteBuffer.wrap(buffer1.array()); + pageWriter.reset(); + assertEquals(0, pageWriter.estimateMaxMemSize()); + byte[] timeBytes = new byte[8]; + buffer.get(timeBytes); + ByteBuffer buffer2 = ByteBuffer.wrap(timeBytes); + PlainDecoder decoder = new PlainDecoder(); + assertEquals(1L, decoder.readLong(buffer2)); + decoder.reset(); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + pageWriter.write(1L); + pageWriter.write(2L); + pageWriter.write(3L); + // without page statistics + assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true)); + // total size + assertEquals(26, publicBAOS.size()); + TimeStatistics statistics = pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(3L, statistics.getEndTime()); + assertEquals(3, statistics.getCount()); + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // uncompressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + // compressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(1L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(2L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(3L, ReadWriteIOUtils.readLong(buffer)); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + pageWriter.write(1L); + pageWriter.write(2L); + pageWriter.write(3L); + // with page statistics + assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false)); + // total size + assertEquals(43, publicBAOS.size()); + TimeStatistics statistics = pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(3L, statistics.getEndTime()); + assertEquals(3, statistics.getCount()); + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // uncompressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + // compressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + TimeStatistics testStatistics = - (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.Vector); ++ (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.VECTOR); + assertEquals(1L, testStatistics.getStartTime()); + assertEquals(3L, testStatistics.getEndTime()); + assertEquals(3, testStatistics.getCount()); + assertEquals(1L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(2L, ReadWriteIOUtils.readLong(buffer)); + assertEquals(3L, ReadWriteIOUtils.readLong(buffer)); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testWritePageHeaderAndDataIntoBuffWithSnappy() { + Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0); + ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY); + TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + pageWriter.write(1L); + pageWriter.write(2L); + pageWriter.write(3L); + // without page statistics + assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true)); + + // total size + assertEquals(22, publicBAOS.size()); + TimeStatistics statistics = pageWriter.getStatistics(); + assertEquals(1L, statistics.getStartTime()); + assertEquals(3L, statistics.getEndTime()); + assertEquals(3, statistics.getCount()); + ByteBuffer compressedBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // uncompressedSize + assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer)); + // compressedSize + assertEquals(20, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer)); + byte[] compress = new byte[20]; + compressedBuffer.get(compress); + byte[] uncompress = new byte[24]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY); + unCompressor.uncompress(compress, 0, 20, uncompress, 0); + ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress); + assertEquals(1L, ReadWriteIOUtils.readLong(uncompressedBuffer)); + assertEquals(2L, ReadWriteIOUtils.readLong(uncompressedBuffer)); + assertEquals(3L, ReadWriteIOUtils.readLong(uncompressedBuffer)); + } catch (IOException e) { + fail(); + } + } + } diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java index 0000000,93e18bb..3ca81b1 mode 000000,100644..100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java @@@ -1,0 -1,178 +1,178 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.tsfile.write.writer; + + import org.apache.iotdb.tsfile.file.MetaMarker; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + import org.apache.iotdb.tsfile.utils.PublicBAOS; + import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; + import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl; + + import org.junit.Test; + + import java.io.IOException; + import java.nio.ByteBuffer; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.fail; + + public class VectorChunkWriterImplTest { + + @Test + public void testWrite1() { + VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub(); + VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + + chunkWriter.sealCurrentPage(); + // time chunk: 14 + 4 + 160; value chunk 1: 8 + 2 + 4 + 3 + 80; value chunk 2: 8 + 2 + 4 + 3 + + // 20; value chunk 3: 9 + 4 + 7 + 20 * 8; + assertEquals(492L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeToFileWriter(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // time chunk + assertEquals( + (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(164, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); - assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); ++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 164); + + // value chunk 1 + assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(89, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 89); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(29, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 29); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(171, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(171, buffer.remaining()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testWrite2() { + VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub(); + VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + for (int time = 21; time <= 40; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + + // time chunk: 14 + (4 + 17 + 160) * 2 + // value chunk 1: 9 + (2 + 41 + 4 + 3 + 80) * 2 + // value chunk 2: 9 + (2 + 41 + 4 + 3 + 20) * 2 + // value chunk 3: 9 + (4 + 57 + 4 + 3 + 160) * 2 + assertEquals(1259L, chunkWriter.getCurrentChunkSize()); + + try { + TestTsFileOutput testTsFileOutput = new TestTsFileOutput(); + TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true); + chunkWriter.writeToFileWriter(writer); + PublicBAOS publicBAOS = testTsFileOutput.publicBAOS; + ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + // time chunk + assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(362, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); - assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer)); ++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 362); + + // value chunk 1 + assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(260, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 260); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(140, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + buffer.position(buffer.position() + 140); + + // value chunk 2 + assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer)); + assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer)); + assertEquals(456, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer)); + assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer)); + assertEquals(456, buffer.remaining()); + + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + } diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java index 0000000,795a0a6..40335f5 mode 000000,100644..100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java @@@ -1,0 -1,80 +1,80 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.tsfile.write.writer; + + import org.apache.iotdb.tsfile.encoding.encoder.Encoder; + import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + + import java.util.Arrays; + import java.util.List; + + public class VectorMeasurementSchemaStub implements IMeasurementSchema { + + @Override + public String getMeasurementId() { + return "s1.time"; + } + + @Override + public CompressionType getCompressor() { + return CompressionType.UNCOMPRESSED; + } + + @Override + public TSDataType getType() { - return TSDataType.Vector; ++ return TSDataType.VECTOR; + } + + @Override + public TSEncoding getTimeTSEncoding() { + return TSEncoding.PLAIN; + } + + @Override + public Encoder getTimeEncoder() { + return new PlainEncoder(TSDataType.INT64, 0); + } + + @Override + public List<String> getValueMeasurementIdList() { + return Arrays.asList("s1", "s2", "s3"); + } + + @Override + public List<TSDataType> getValueTSDataTypeList() { + return Arrays.asList(TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE); + } + + @Override + public List<TSEncoding> getValueTSEncodingList() { + return Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN); + } + + @Override + public List<Encoder> getValueEncoderList() { + return Arrays.asList( + new PlainEncoder(TSDataType.FLOAT, 0), + new PlainEncoder(TSDataType.INT32, 0), + new PlainEncoder(TSDataType.DOUBLE, 0)); + } + }
