This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push: new 12fe408 Fix desc batchdata count bug (#2186) 12fe408 is described below commit 12fe4084f2eeea0048be82e190cb7eb5424abdf1 Author: Xiangwei Wei <34242296+alima...@users.noreply.github.com> AuthorDate: Sat Dec 5 22:06:29 2020 +0800 Fix desc batchdata count bug (#2186) remove Serializable interface on BatchData Co-authored-by: LebronAl <txypot...@gmail.com> --- .../iotdb/db/query/reader/chunk/MemPageReader.java | 2 +- .../iotdb/db/query/reader/series/SeriesReader.java | 4 +- .../iotdb/db/integration/IOTDBGroupByIT.java | 5 + .../iotdb/db/integration/IoTDBAggregationIT.java | 5 + .../integration/IoTDBAggregationLargeDataIT.java | 5 + .../integration/IoTDBAggregationSmallDataIT.java | 7 +- .../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 7 +- .../iotdb/db/integration/IoTDBLargeDataIT.java | 5 + .../IoTDBMultiOverlappedChunkInUnseqIT.java | 5 + .../db/integration/IoTDBMultiOverlappedPageIT.java | 5 + .../iotdb/db/integration/IoTDBMultiSeriesIT.java | 5 + .../db/integration/IoTDBMultiStatementsIT.java | 5 + .../db/integration/IoTDBOverlappedPageIT.java | 5 + .../apache/iotdb/tsfile/read/common/BatchData.java | 35 +- .../iotdb/tsfile/read/common/BatchDataFactory.java | 7 +- .../{DescBatchData.java => DescReadBatchData.java} | 14 +- .../tsfile/read/common/DescReadWriteBatchData.java | 374 +++++++++++++++++++++ .../iotdb/tsfile/read/reader/page/PageReader.java | 2 +- 18 files changed, 469 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java index 2acaf05..f28ea2f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java @@ -44,7 +44,7 @@ public class MemPageReader implements IPageReader { @Override public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { BatchData batchData = BatchDataFactory - .createBatchData(chunkMetadata.getDataType(), ascending); + .createBatchData(chunkMetadata.getDataType(), ascending, false); while (timeValuePairIterator.hasNextTimeValuePair()) { TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair(); if (valueFilter == null || valueFilter diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 8aa4176..b433bae 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -578,7 +578,8 @@ public class SeriesReader { if (mergeReader.hasNextTimeValuePair()) { - cachedBatchData = BatchDataFactory.createBatchData(dataType); + cachedBatchData = BatchDataFactory + .createBatchData(dataType, orderUtils.getAscending(), true); long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); if (firstPageReader != null) { currentPageEndPointTime = orderUtils @@ -653,6 +654,7 @@ public class SeriesReader { timeValuePair.getTimestamp(), timeValuePair.getValue().getValue()); } } + cachedBatchData.flip(); hasCachedNextOverlappedPage = cachedBatchData.hasCurrent(); /* * if current overlapped page has valid data, return, otherwise read next overlapped page diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java index bbb227c..19acc6d 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.integration; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.junit.After; @@ -104,6 +105,8 @@ public class IOTDBGroupByIT { public void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1000); EnvironmentUtils.envSetUp(); Class.forName(Config.JDBC_DRIVER_NAME); @@ -114,6 +117,8 @@ public class IOTDBGroupByIT { public void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } @Test diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java index bdeb6a4..d6fe48a 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java @@ -37,6 +37,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Locale; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.junit.After; @@ -101,6 +102,8 @@ public class IoTDBAggregationIT { EnvironmentUtils.closeStatMonitor(); prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(); IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1000); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); EnvironmentUtils.envSetUp(); Class.forName(Config.JDBC_DRIVER_NAME); prepareData(); @@ -110,6 +113,8 @@ public class IoTDBAggregationIT { public void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } //add test for part of points in page don't satisfy filter diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java index 53c2bfd..95fd267 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java @@ -36,6 +36,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.junit.After; @@ -117,6 +118,8 @@ public class IoTDBAggregationLargeDataIT { public void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); EnvironmentUtils.envSetUp(); } @@ -124,6 +127,8 @@ public class IoTDBAggregationLargeDataIT { public void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } @Test diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java index 5869442..3564027 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java @@ -35,6 +35,8 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.jdbc.IoTDBSQLException; @@ -123,7 +125,8 @@ public class IoTDBAggregationSmallDataIT { public void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); EnvironmentUtils.envSetUp(); - + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); //Thread.sleep(5000); insertSQL(); } @@ -131,6 +134,8 @@ public class IoTDBAggregationSmallDataIT { @After public void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } @Test diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java index e4a319a..467754a 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java @@ -31,6 +31,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.junit.AfterClass; @@ -108,7 +110,8 @@ public class IoTDBAlignByDeviceIT { public static void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); EnvironmentUtils.envSetUp(); - + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); insertData(); } @@ -116,6 +119,8 @@ public class IoTDBAlignByDeviceIT { @AfterClass public static void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } private static void insertData() throws ClassNotFoundException { diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java index 6afb895..6a35f8d 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java @@ -30,6 +30,7 @@ import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -54,6 +55,8 @@ public class IoTDBLargeDataIT { public static void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); // use small page setting // origin value @@ -80,6 +83,8 @@ public class IoTDBLargeDataIT { tsFileConfig.setPageSizeInByte(pageSizeInByte); tsFileConfig.setGroupSizeInByte(groupSizeInByte); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); EnvironmentUtils.cleanEnv(); } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java index 62422ef..00c5c66 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java @@ -28,6 +28,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.junit.AfterClass; @@ -45,6 +46,8 @@ public class IoTDBMultiOverlappedChunkInUnseqIT { @BeforeClass public static void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold(); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024); EnvironmentUtils.envSetUp(); @@ -57,6 +60,8 @@ public class IoTDBMultiOverlappedChunkInUnseqIT { // recovery value EnvironmentUtils.cleanEnv(); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } @Test diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java index 7819366..17dd4a5 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java @@ -29,6 +29,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -48,6 +49,8 @@ public class IoTDBMultiOverlappedPageIT { @BeforeClass public static void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold(); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16); // max_number_of_points_in_page = 10 @@ -64,6 +67,8 @@ public class IoTDBMultiOverlappedPageIT { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage); EnvironmentUtils.cleanEnv(); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } @Test diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java index 784557b..42d4bac 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java @@ -30,6 +30,7 @@ import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -57,6 +58,8 @@ public class IoTDBMultiSeriesIT { public static void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); // use small page setting // origin value @@ -88,6 +91,8 @@ public class IoTDBMultiSeriesIT { IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte); TSFileDescriptor.getInstance().getConfig().setCompressor("SNAPPY"); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } private static void insertData() diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java index d155bd2..b6fecca 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java @@ -27,6 +27,7 @@ import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -51,6 +52,8 @@ public class IoTDBMultiStatementsIT { public static void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); // use small page setting // origin value @@ -76,6 +79,8 @@ public class IoTDBMultiStatementsIT { tsFileConfig.setPageSizeInByte(pageSizeInByte); tsFileConfig.setGroupSizeInByte(groupSizeInByte); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); EnvironmentUtils.cleanEnv(); } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java index bbfcf64..76131ad 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.integration; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -48,6 +49,8 @@ public class IoTDBOverlappedPageIT { @BeforeClass public static void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold(); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16); // max_number_of_points_in_page = 10 @@ -64,6 +67,8 @@ public class IoTDBOverlappedPageIT { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage); EnvironmentUtils.cleanEnv(); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); } @Test diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java index 88d921a..d17a789 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java @@ -18,10 +18,8 @@ */ package org.apache.iotdb.tsfile.read.common; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.function.BiPredicate; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -50,10 +48,9 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong; * while (batchData.hasCurrent()) { long time = batchData.currentTime(); Object value = * batchData.currentValue(); batchData.next(); } */ -public class BatchData implements Serializable { +public class BatchData { - private static final long serialVersionUID = -4620310601188394839L; - private static final int capacityThreshold = TSFileConfig.ARRAY_CAPACITY_THRESHOLD; + public static final int CAPACITY_THRESHOLD = TSFileConfig.ARRAY_CAPACITY_THRESHOLD; protected int capacity = 16; protected TSDataType dataType; @@ -69,15 +66,15 @@ public class BatchData implements Serializable { protected int writeCurArrayIndex; // the insert timestamp number of timeRet - private int count; + protected int count; - private List<long[]> timeRet; - private List<boolean[]> booleanRet; - private List<int[]> intRet; - private List<long[]> longRet; - private List<float[]> floatRet; - private List<double[]> doubleRet; - private List<Binary[]> binaryRet; + protected List<long[]> timeRet; + protected List<boolean[]> booleanRet; + protected List<int[]> intRet; + protected List<long[]> longRet; + protected List<float[]> floatRet; + protected List<double[]> doubleRet; + protected List<Binary[]> binaryRet; public BatchData() { dataType = null; @@ -217,7 +214,7 @@ public class BatchData implements Serializable { */ public void putBoolean(long t, boolean v) { if (writeCurArrayIndex == capacity) { - if (capacity >= capacityThreshold) { + if (capacity >= CAPACITY_THRESHOLD) { timeRet.add(new long[capacity]); booleanRet.add(new boolean[capacity]); writeCurListIndex++; @@ -252,7 +249,7 @@ public class BatchData implements Serializable { */ public void putInt(long t, int v) { if (writeCurArrayIndex == capacity) { - if (capacity >= capacityThreshold) { + if (capacity >= CAPACITY_THRESHOLD) { timeRet.add(new long[capacity]); intRet.add(new int[capacity]); writeCurListIndex++; @@ -287,7 +284,7 @@ public class BatchData implements Serializable { */ public void putLong(long t, long v) { if (writeCurArrayIndex == capacity) { - if (capacity >= capacityThreshold) { + if (capacity >= CAPACITY_THRESHOLD) { timeRet.add(new long[capacity]); longRet.add(new long[capacity]); writeCurListIndex++; @@ -322,7 +319,7 @@ public class BatchData implements Serializable { */ public void putFloat(long t, float v) { if (writeCurArrayIndex == capacity) { - if (capacity >= capacityThreshold) { + if (capacity >= CAPACITY_THRESHOLD) { timeRet.add(new long[capacity]); floatRet.add(new float[capacity]); writeCurListIndex++; @@ -357,7 +354,7 @@ public class BatchData implements Serializable { */ public void putDouble(long t, double v) { if (writeCurArrayIndex == capacity) { - if (capacity >= capacityThreshold) { + if (capacity >= CAPACITY_THRESHOLD) { timeRet.add(new long[capacity]); doubleRet.add(new double[capacity]); writeCurListIndex++; @@ -391,7 +388,7 @@ public class BatchData implements Serializable { */ public void putBinary(long t, Binary v) { if (writeCurArrayIndex == capacity) { - if (capacity >= capacityThreshold) { + if (capacity >= CAPACITY_THRESHOLD) { timeRet.add(new long[capacity]); binaryRet.add(new Binary[capacity]); writeCurListIndex++; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java index 2f60bfe..b0cdeb2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java @@ -27,11 +27,14 @@ public class BatchDataFactory { throw new IllegalStateException("Factory class"); } - public static BatchData createBatchData(TSDataType dataType, boolean ascending) { + public static BatchData createBatchData(TSDataType dataType, boolean ascending, boolean isWriteDesc) { if (ascending) { return new BatchData(dataType); + } else if (isWriteDesc) { + return new DescReadWriteBatchData(dataType); + } else { + return new DescReadBatchData(dataType); } - return new DescBatchData(dataType); } public static BatchData createBatchData(TSDataType dataType) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java similarity index 84% rename from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java index 55a795d..6da303c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java @@ -21,9 +21,19 @@ package org.apache.iotdb.tsfile.read.common; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -public class DescBatchData extends BatchData { +/** + * This class is just for reading batch data reversely. The data source is from page reader. + * For example, + * the timeRet from pageReader is [1, 1000], + * It will be written in ascending sequence, but the sequence of reading will be 1000 -> 1. + */ +public class DescReadBatchData extends BatchData { + + public DescReadBatchData() { + + } - public DescBatchData(TSDataType dataType) { + public DescReadBatchData(TSDataType dataType) { super(dataType); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java new file mode 100644 index 0000000..bbfe26d --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.read.common; + +import java.util.LinkedList; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; + +/** + * This class is for reading and writing batch data in reverse. The data source is from mergeReader. + * For example, + * the time sequence from mergeReader is 1000 -> 1, to keep the consistency that the timestamp + * should be ascending. It will be written in reverse, i.e. the timeRet will be [1, 1000]. + * Then it can be handled the same as DescReadBatchData. + */ +public class DescReadWriteBatchData extends DescReadBatchData { + + public DescReadWriteBatchData(TSDataType dataType) { + super(); + this.dataType = dataType; + this.readCurListIndex = 0; + this.readCurArrayIndex = 0; + this.writeCurListIndex = 0; + this.writeCurArrayIndex = capacity - 1; + + timeRet = new LinkedList<>(); + timeRet.add(new long[capacity]); + count = 0; + + switch (dataType) { + case BOOLEAN: + booleanRet = new LinkedList<>(); + booleanRet.add(new boolean[capacity]); + break; + case INT32: + intRet = new LinkedList<>(); + intRet.add(new int[capacity]); + break; + case INT64: + longRet = new LinkedList<>(); + longRet.add(new long[capacity]); + break; + case FLOAT: + floatRet = new LinkedList<>(); + floatRet.add(new float[capacity]); + break; + case DOUBLE: + doubleRet = new LinkedList<>(); + doubleRet.add(new double[capacity]); + break; + case TEXT: + binaryRet = new LinkedList<>(); + binaryRet.add(new Binary[capacity]); + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + } + + /** + * put boolean data reversely. + * + * @param t timestamp + * @param v boolean data + */ + @Override + public void putBoolean(long t, boolean v) { + if (writeCurArrayIndex == -1) { + if (capacity >= CAPACITY_THRESHOLD) { + ((LinkedList) timeRet).addFirst(new long[capacity]); + ((LinkedList) booleanRet).addFirst(new boolean[capacity]); + writeCurListIndex++; + writeCurArrayIndex = capacity - 1; + } else { + int newCapacity = capacity << 1; + + long[] newTimeData = new long[newCapacity]; + boolean[] newValueData = new boolean[newCapacity]; + + System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); + System.arraycopy(booleanRet.get(0), 0, newValueData, newCapacity - capacity, capacity); + + timeRet.set(0, newTimeData); + booleanRet.set(0, newValueData); + + writeCurArrayIndex = newCapacity - capacity - 1; + capacity = newCapacity; + } + } + timeRet.get(0)[writeCurArrayIndex] = t; + booleanRet.get(0)[writeCurArrayIndex] = v; + + writeCurArrayIndex--; + count++; + } + + /** + * put int data reversely. + * + * @param t timestamp + * @param v int data + */ + @Override + public void putInt(long t, int v) { + if (writeCurArrayIndex == -1) { + if (capacity >= CAPACITY_THRESHOLD) { + ((LinkedList) timeRet).addFirst(new long[capacity]); + ((LinkedList) intRet).addFirst(new int[capacity]); + writeCurListIndex++; + writeCurArrayIndex = capacity - 1; + } else { + int newCapacity = capacity << 1; + + long[] newTimeData = new long[newCapacity]; + int[] newValueData = new int[newCapacity]; + + System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); + System.arraycopy(intRet.get(0), 0, newValueData, newCapacity - capacity, capacity); + + timeRet.set(0, newTimeData); + intRet.set(0, newValueData); + + writeCurArrayIndex = newCapacity - capacity - 1; + capacity = newCapacity; + } + } + timeRet.get(0)[writeCurArrayIndex] = t; + intRet.get(0)[writeCurArrayIndex] = v; + + writeCurArrayIndex--; + count++; + } + + /** + * put long data reversely. + * + * @param t timestamp + * @param v long data + */ + @Override + public void putLong(long t, long v) { + if (writeCurArrayIndex == -1) { + if (capacity >= CAPACITY_THRESHOLD) { + ((LinkedList) timeRet).addFirst(new long[capacity]); + ((LinkedList) longRet).addFirst(new long[capacity]); + writeCurListIndex++; + writeCurArrayIndex = capacity - 1; + } else { + int newCapacity = capacity << 1; + + long[] newTimeData = new long[newCapacity]; + long[] newValueData = new long[newCapacity]; + + System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); + System.arraycopy(longRet.get(0), 0, newValueData, newCapacity - capacity, capacity); + + timeRet.set(0, newTimeData); + longRet.set(0, newValueData); + + writeCurArrayIndex = newCapacity - capacity - 1; + capacity = newCapacity; + } + } + timeRet.get(0)[writeCurArrayIndex] = t; + longRet.get(0)[writeCurArrayIndex] = v; + + writeCurArrayIndex--; + count++; + } + + /** + * put float data reversely. + * + * @param t timestamp + * @param v float data + */ + @Override + public void putFloat(long t, float v) { + if (writeCurArrayIndex == -1) { + if (capacity >= CAPACITY_THRESHOLD) { + ((LinkedList) timeRet).addFirst(new long[capacity]); + ((LinkedList) floatRet).addFirst(new float[capacity]); + writeCurListIndex++; + writeCurArrayIndex = capacity - 1; + } else { + int newCapacity = capacity << 1; + + long[] newTimeData = new long[newCapacity]; + float[] newValueData = new float[newCapacity]; + + System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); + System.arraycopy(floatRet.get(0), 0, newValueData, newCapacity - capacity, capacity); + + timeRet.set(0, newTimeData); + floatRet.set(0, newValueData); + + writeCurArrayIndex = newCapacity - capacity - 1; + capacity = newCapacity; + } + } + timeRet.get(0)[writeCurArrayIndex] = t; + floatRet.get(0)[writeCurArrayIndex] = v; + + writeCurArrayIndex--; + count++; + } + + /** + * put double data reversely. + * + * @param t timestamp + * @param v double data + */ + @Override + public void putDouble(long t, double v) { + if (writeCurArrayIndex == -1) { + if (capacity >= CAPACITY_THRESHOLD) { + ((LinkedList) timeRet).addFirst(new long[capacity]); + ((LinkedList) doubleRet).addFirst(new double[capacity]); + writeCurListIndex++; + writeCurArrayIndex = capacity - 1; + } else { + int newCapacity = capacity << 1; + + long[] newTimeData = new long[newCapacity]; + double[] newValueData = new double[newCapacity]; + + System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); + System.arraycopy(doubleRet.get(0), 0, newValueData, newCapacity - capacity, capacity); + + timeRet.set(0, newTimeData); + doubleRet.set(0, newValueData); + + writeCurArrayIndex = newCapacity - capacity - 1; + capacity = newCapacity; + } + } + timeRet.get(0)[writeCurArrayIndex] = t; + doubleRet.get(0)[writeCurArrayIndex] = v; + + writeCurArrayIndex--; + count++; + } + + /** + * put binary data reversely. + * + * @param t timestamp + * @param v binary data. + */ + @Override + public void putBinary(long t, Binary v) { + if (writeCurArrayIndex == -1) { + if (capacity >= CAPACITY_THRESHOLD) { + ((LinkedList) timeRet).addFirst(new long[capacity]); + ((LinkedList) binaryRet).addFirst(new Binary[capacity]); + writeCurListIndex++; + writeCurArrayIndex = capacity - 1; + } else { + int newCapacity = capacity << 1; + + long[] newTimeData = new long[newCapacity]; + Binary[] newValueData = new Binary[newCapacity]; + + System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); + System.arraycopy(binaryRet.get(0), 0, newValueData, newCapacity - capacity, capacity); + + timeRet.set(0, newTimeData); + binaryRet.set(0, newValueData); + + writeCurArrayIndex = newCapacity - capacity - 1; + capacity = newCapacity; + } + } + timeRet.get(0)[writeCurArrayIndex] = t; + binaryRet.get(0)[writeCurArrayIndex] = v; + + writeCurArrayIndex--; + count++; + } + + @Override + public boolean hasCurrent() { + return (readCurListIndex == 0 && readCurArrayIndex > writeCurArrayIndex) || ( + readCurListIndex > 0 && readCurArrayIndex >= 0); + } + + @Override + public void next() { + super.readCurArrayIndex--; + if ((readCurListIndex == 0 && readCurArrayIndex <= writeCurArrayIndex) + || readCurArrayIndex == -1) { + super.readCurListIndex--; + super.readCurArrayIndex = capacity - 1; + } + } + + @Override + public void resetBatchData() { + super.readCurArrayIndex = capacity - 1; + super.readCurListIndex = writeCurListIndex; + } + + @Override + public long getTimeByIndex(int idx) { + return timeRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) + % capacity]; + } + + @Override + public long getLongByIndex(int idx) { + return longRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) + % capacity]; + } + + @Override + public double getDoubleByIndex(int idx) { + return doubleRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) + % capacity]; + } + + @Override + public int getIntByIndex(int idx) { + return intRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) + % capacity]; + } + + @Override + public float getFloatByIndex(int idx) { + return floatRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) + % capacity]; + } + + @Override + public Binary getBinaryByIndex(int idx) { + return binaryRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) + % capacity]; + } + + @Override + public boolean getBooleanByIndex(int idx) { + return booleanRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) + % capacity]; + } + + /** + * Read: When put data, the writeIndex increases while the readIndex remains 0. + * For descending read, we need to read from writeIndex to writeCurArrayIndex + */ + @Override + public BatchData flip() { + super.readCurArrayIndex = capacity - 1; + super.readCurListIndex = writeCurListIndex; + return this; + } + +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index efc3b01..a6f7211 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -107,7 +107,7 @@ public class PageReader implements IPageReader { @Override public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { - BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending); + BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false); while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer);