This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 19c3537 [ISSUE-2611] An unsequence file that covers too many sequence
file causes OOM query (#2710)
19c3537 is described below
commit 19c35375f1bde699ab1d0825e9048da714beb2e7
Author: Xiangwei Wei <[email protected]>
AuthorDate: Sun Feb 21 16:16:04 2021 +0800
[ISSUE-2611] An unsequence file that covers too many sequence file causes
OOM query (#2710)
---
.../iotdb/db/query/reader/series/SeriesReader.java | 21 +++++++----
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 41 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 297d57b..19049ea 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -378,7 +378,7 @@ public class SeriesReader {
/*
* try to unpack all overlapped ChunkMetadata to cachedPageReaders
*/
- unpackAllOverlappedChunkMetadataToCachedPageReaders(
+ unpackAllOverlappedChunkMetadataToPageReaders(
orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()),
true);
} else {
/*
@@ -389,7 +389,7 @@ public class SeriesReader {
long endpointTime =
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
- unpackAllOverlappedChunkMetadataToCachedPageReaders(endpointTime,
false);
+ unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false);
}
}
@@ -414,7 +414,7 @@ public class SeriesReader {
long endpointTime =
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
- unpackAllOverlappedChunkMetadataToCachedPageReaders(endpointTime,
false);
+ unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false);
}
if (firstPageOverlapped()) {
@@ -446,15 +446,22 @@ public class SeriesReader {
.getStartTime()));
}
- private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long
endpointTime, boolean init)
+ private void unpackAllOverlappedChunkMetadataToPageReaders(long
endpointTime, boolean init)
throws IOException {
if (firstChunkMetadata != null &&
orderUtils.isOverlapped(endpointTime,
firstChunkMetadata.getStatistics())) {
unpackOneChunkMetaData(firstChunkMetadata);
firstChunkMetadata = null;
}
+ // In case unpacking too many sequence chunks
+ boolean hasMeetSeq = false;
while (!cachedChunkMetadata.isEmpty() &&
orderUtils.isOverlapped(endpointTime,
cachedChunkMetadata.peek().getStatistics())) {
+ if (cachedChunkMetadata.peek().isSeq() && hasMeetSeq) {
+ break;
+ } else if (cachedChunkMetadata.peek().isSeq()) {
+ hasMeetSeq = true;
+ }
unpackOneChunkMetaData(cachedChunkMetadata.poll());
}
if (init && firstPageReader == null && (!seqPageReaders.isEmpty() ||
!unSeqPageReaders
@@ -607,7 +614,7 @@ public class SeriesReader {
unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
timeValuePair.getTimestamp(), false);
-
unpackAllOverlappedChunkMetadataToCachedPageReaders(timeValuePair.getTimestamp(),
false);
+
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(),
false);
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
if (firstPageReader != null) {
@@ -673,9 +680,9 @@ public class SeriesReader {
private long updateEndPointTime(long currentPageEndPointTime,
VersionPageReader pageReader) {
if (orderUtils.getAscending()) {
- return Math.max(currentPageEndPointTime,
pageReader.getStatistics().getEndTime());
+ return Math.min(currentPageEndPointTime,
pageReader.getStatistics().getEndTime());
} else {
- return Math.min(currentPageEndPointTime,
pageReader.getStatistics().getStartTime());
+ return Math.max(currentPageEndPointTime,
pageReader.getStatistics().getStartTime());
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index 788b18b..ba2a46d 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -408,4 +409,44 @@ public class IoTDBSeriesReaderIT {
resultSet.close();
}
}
+
+ /** Test when one un-sequenced file may cover a long time range. */
+ @Test
+ public void queryWithLongRangeUnSeqTest() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ // make up data
+ final String INSERT_TEMPLATE = "insert into root.sg.d1(time, s1)
values(%d, %d)";
+ final String FLUSH_CMD = "flush";
+ for (int i = 1; i <= 10; i++) {
+ statement.execute(String.format(INSERT_TEMPLATE, i, i));
+ }
+ statement.execute(FLUSH_CMD);
+ for (int i = 12; i <= 20; i++) {
+ statement.execute(String.format(INSERT_TEMPLATE, i, i));
+ }
+ statement.execute(FLUSH_CMD);
+ for (int i = 21; i <= 110; i++) {
+ statement.execute(String.format(INSERT_TEMPLATE, i, i));
+ if (i % 10 == 0) {
+ statement.execute(FLUSH_CMD);
+ }
+ }
+ // unSeq from here
+ for (int i = 11; i <= 101; i += 10) {
+ statement.execute(String.format(INSERT_TEMPLATE, i, i));
+ }
+ statement.execute(FLUSH_CMD);
+
+ // query from here
+ ResultSet resultSet = statement.executeQuery("select s1 from root.sg.d1
where time > 10");
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ Assert.assertEquals(100, cnt);
+ }
+ }
}