This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f06460b Switch to use new TsFile interfaces in Last query executor
(#1048)
f06460b is described below
commit f06460bf1ad732fcd55ce9b01abfb010cb83ebfa
Author: wshao08 <[email protected]>
AuthorDate: Fri Apr 17 16:20:40 2020 +0800
Switch to use new TsFile interfaces in Last query executor (#1048)
* Switch to use new TimeseriesMetadata in Last query executor
---
.../query/dataset/groupby/GroupByFillDataSet.java | 9 ++--
.../iotdb/db/query/executor/LastQueryExecutor.java | 46 ++++++++++++------
.../iotdb/db/query/executor/QueryRouter.java | 2 +-
.../apache/iotdb/db/query/fill/PreviousFill.java | 3 +-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 44 -----------------
.../apache/iotdb/db/integration/IoTDBLastIT.java | 55 +++++++++-------------
6 files changed, 59 insertions(+), 100 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 7b73eb8..ef5bb56 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -54,7 +54,7 @@ public class GroupByFillDataSet extends QueryDataSet {
this.groupByEngineDataSet = groupByEngineDataSet;
this.fillTypes = fillTypes;
initPreviousParis(context, groupByFillPlan);
- initLastTimeArray(context);
+ initLastTimeArray(context, groupByFillPlan);
}
private void initPreviousParis(QueryContext context, GroupByFillPlan
groupByFillPlan)
@@ -75,13 +75,14 @@ public class GroupByFillDataSet extends QueryDataSet {
}
}
- private void initLastTimeArray(QueryContext context)
+ private void initLastTimeArray(QueryContext context, GroupByFillPlan
groupByFillPlan)
throws IOException, StorageEngineException, QueryProcessException {
lastTimeArray = new long[paths.size()];
Arrays.fill(lastTimeArray, Long.MAX_VALUE);
for (int i = 0; i < paths.size(); i++) {
- TimeValuePair lastTimeValuePair =
- LastQueryExecutor.calculateLastPairForOneSeries(paths.get(i),
dataTypes.get(i), context);
+ TimeValuePair lastTimeValuePair =
LastQueryExecutor.calculateLastPairForOneSeries(
+ paths.get(i), dataTypes.get(i), context,
+
groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice()));
if (lastTimeValuePair.getValue() != null) {
lastTimeArray[i] = lastTimeValuePair.getTimestamp();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index b3d5a08..91cfb82 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.executor;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
+import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -35,6 +36,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -70,7 +72,7 @@ public class LastQueryExecutor {
*
* @param context query context
*/
- public QueryDataSet execute(QueryContext context)
+ public QueryDataSet execute(QueryContext context, LastQueryPlan
lastQueryPlan)
throws StorageEngineException, IOException, QueryProcessException {
ListDataSet dataSet = new ListDataSet(
@@ -78,8 +80,9 @@ public class LastQueryExecutor {
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
for (int i = 0; i < selectedSeries.size(); i++) {
- TimeValuePair lastTimeValuePair =
- calculateLastPairForOneSeries(selectedSeries.get(i),
dataTypes.get(i), context);
+ TimeValuePair lastTimeValuePair = calculateLastPairForOneSeries(
+ selectedSeries.get(i), dataTypes.get(i), context,
+
lastQueryPlan.getAllMeasurementsInDevice(selectedSeries.get(i).getDevice()));
if (lastTimeValuePair.getValue() != null) {
RowRecord resultRecord = new
RowRecord(lastTimeValuePair.getTimestamp());
Field pathField = new Field(TSDataType.TEXT);
@@ -104,7 +107,7 @@ public class LastQueryExecutor {
* @return TimeValuePair
*/
public static TimeValuePair calculateLastPairForOneSeries(
- Path seriesPath, TSDataType tsDataType, QueryContext context)
+ Path seriesPath, TSDataType tsDataType, QueryContext context,
Set<String> sensors)
throws IOException, QueryProcessException, StorageEngineException {
// Retrieve last value from MNode
@@ -128,14 +131,27 @@ public class LastQueryExecutor {
if (!seqFileResources.isEmpty()) {
for (int i = seqFileResources.size() - 1; i >= 0; i--) {
- List<ChunkMetadata> chunkMetadata =
FileLoaderUtils.loadChunkMetadataFromTsFileResource(
- seqFileResources.get(i), seriesPath, context);
- if (!chunkMetadata.isEmpty()) {
- ChunkMetadata lastChunkMetaData =
chunkMetadata.get(chunkMetadata.size() - 1);
- Statistics chunkStatistics = lastChunkMetaData.getStatistics();
- resultPair = constructLastPair(
- chunkStatistics.getEndTime(),
chunkStatistics.getLastValue(), tsDataType);
- break;
+ TimeseriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(
+ seqFileResources.get(i), seriesPath, context, null, sensors);
+ if (timeseriesMetadata != null) {
+ if (timeseriesMetadata.getStatistics().canUseStatistics()) {
+ Statistics timeseriesMetadataStats =
timeseriesMetadata.getStatistics();
+ resultPair = constructLastPair(
+ timeseriesMetadataStats.getEndTime(),
+ timeseriesMetadataStats.getLastValue(),
+ tsDataType);
+ break;
+ } else {
+ List<ChunkMetadata> chunkMetadataList =
timeseriesMetadata.loadChunkMetadataList();
+ if (!chunkMetadataList.isEmpty()) {
+ ChunkMetadata lastChunkMetaData =
chunkMetadataList.get(chunkMetadataList.size() - 1);
+ Statistics chunkStatistics = lastChunkMetaData.getStatistics();
+ resultPair =
+ constructLastPair(
+ chunkStatistics.getEndTime(),
chunkStatistics.getLastValue(), tsDataType);
+ break;
+ }
+ }
}
}
}
@@ -145,9 +161,9 @@ public class LastQueryExecutor {
if (resource.getEndTimeMap().get(seriesPath.getDevice()) <
resultPair.getTimestamp()) {
continue;
}
- List<ChunkMetadata> chunkMetadata =
- FileLoaderUtils.loadChunkMetadataFromTsFileResource(resource,
seriesPath, context);
- for (ChunkMetadata chunkMetaData : chunkMetadata) {
+ TimeseriesMetadata timeseriesMetadata =
+ FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath,
context, null, sensors);
+ for (ChunkMetadata chunkMetaData :
timeseriesMetadata.loadChunkMetadataList()) {
if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
&& chunkMetaData.getVersion() > version) {
Statistics chunkStatistics = chunkMetaData.getStatistics();
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 9a4aaf2..c8ca580 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -185,7 +185,7 @@ public class QueryRouter implements IQueryRouter {
public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext
context)
throws StorageEngineException, QueryProcessException, IOException {
LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan);
- return lastQueryExecutor.execute(context);
+ return lastQueryExecutor.execute(context, lastQueryPlan);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
index 28bd1c1..7b7e508 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
@@ -147,8 +147,7 @@ public class PreviousFill extends IFill {
timeseriesMetadata.getStatistics().getLastValue(),
dataType);
} else {
- List<ChunkMetadata> seqChunkMetadataList =
- FileLoaderUtils.loadChunkMetadataList(timeseriesMetadata);
+ List<ChunkMetadata> seqChunkMetadataList =
timeseriesMetadata.loadChunkMetadataList();
for (int i = seqChunkMetadataList.size() - 1; i >= 0; i--) {
lastPoint = getChunkLastPoint(seqChunkMetadataList.get(i));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index bc12fa8..b03360b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -159,50 +159,6 @@ public class FileLoaderUtils {
return chunkReader.loadPageReaderList();
}
-
- /**
- * load all ChunkMetadatas belong to the seriesPath
- */
- public static List<ChunkMetadata> loadChunkMetadataFromTsFileResource(
- TsFileResource resource, Path seriesPath, QueryContext context) throws
IOException {
- List<ChunkMetadata> chunkMetadataList;
- if (resource == null) {
- return new ArrayList<>();
- }
- if (resource.isClosed()) {
- chunkMetadataList =
ChunkMetadataCache.getInstance().get(resource.getPath(), seriesPath);
- } else {
- chunkMetadataList = resource.getChunkMetadataList();
- }
- List<Modification> pathModifications =
- context.getPathModifications(resource.getModFile(),
seriesPath.getFullPath());
-
- if (!pathModifications.isEmpty()) {
- QueryUtils.modifyChunkMetaData(chunkMetadataList, pathModifications);
- }
-
- TsFileSequenceReader tsFileSequenceReader =
- FileReaderManager.getInstance().get(resource.getPath(),
resource.isClosed());
- for (ChunkMetadata data : chunkMetadataList) {
- data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
- }
- List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk();
- if (memChunks != null) {
- for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) {
- if (!memChunks.isEmpty()) {
- chunkMetadataList.add(readOnlyMemChunk.getChunkMetaData());
- }
- }
- }
-
- /*
- * remove empty or invalid chunk metadata
- */
- chunkMetadataList.removeIf(chunkMetaData -> (
- chunkMetaData.getStartTime() > chunkMetaData.getEndTime()));
- return chunkMetadataList;
- }
-
public static List<ChunkMetadata> getChunkMetadataList(Path path, String
filePath) throws IOException {
TsFileSequenceReader tsFileReader =
FileReaderManager.getInstance().get(filePath, true);
return tsFileReader.getChunkMetadataList(path);
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
index a30311d..c1b4108 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.integration;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -96,15 +97,15 @@ public class IoTDBLastIT {
}
@Test
- public void lastCacheTest() throws SQLException {
- String[] retArray1 =
+ public void lastCacheUpdateTest() throws SQLException, MetadataException {
+ String[] retArray =
new String[] {
"500,root.ln.wf01.wt01.temperature,22.1",
"500,root.ln.wf01.wt01.status,false",
- "500,root.ln.wf01.wt01.id,5"
- };
- String[] retArray2 =
- new String[] {
+ "500,root.ln.wf01.wt01.id,5",
+ "700,root.ln.wf01.wt01.temperature,33.1",
+ "700,root.ln.wf01.wt01.status,false",
+ "700,root.ln.wf01.wt01.id,3",
"700,root.ln.wf01.wt01.temperature,33.1",
"700,root.ln.wf01.wt01.status,false",
"700,root.ln.wf01.wt01.id,3"
@@ -124,16 +125,17 @@ public class IoTDBLastIT {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
- Assert.assertEquals(retArray1[cnt], ans);
+ Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
- LeafMNode node = (LeafMNode)
MManager.getInstance().getNodeByPath("root.ln.wf01.wt01.temperature");
+ LeafMNode node =
+ (LeafMNode)
MManager.getInstance().getNodeByPath("root.ln.wf01.wt01.temperature");
node.resetCache();
statement.execute(
- "insert into root.ln.wf01.wt01(time, temperature, status, id)
values(700, 33.1, false, 3)");
+ "insert into root.ln.wf01.wt01(time, temperature, status, id)
values(700, 33.1, false, 3)");
// Last cache is updated with above insert sql
long time = node.getCachedLast().getTimestamp();
@@ -141,13 +143,12 @@ public class IoTDBLastIT {
hasResultSet = statement.execute("select last temperature,status,id from
root.ln.wf01.wt01");
Assert.assertTrue(hasResultSet);
- cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
- Assert.assertEquals(retArray2[cnt], ans);
+ Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
@@ -161,33 +162,26 @@ public class IoTDBLastIT {
hasResultSet = statement.execute("select last temperature,status,id from
root.ln.wf01.wt01");
Assert.assertTrue(hasResultSet);
- cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
- Assert.assertEquals(retArray2[cnt], ans);
+ Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ Assert.assertEquals(cnt, retArray.length);
}
}
@Test
- public void lastWithUnSeqFilesTest() {
- String[] retArray1 =
+ public void lastWithUnSeqFilesTest() throws SQLException, MetadataException {
+ String[] retArray =
new String[] {
"500,root.ln.wf01.wt02.temperature,15.7",
"500,root.ln.wf01.wt02.status,false",
- "500,root.ln.wf01.wt02.id,9"
- };
-
- String[] retArray2 =
- new String[] {
+ "500,root.ln.wf01.wt02.id,9",
"600,root.ln.wf01.wt02.temperature,10.2",
"600,root.ln.wf01.wt02.status,false",
"600,root.ln.wf01.wt02.id,6"
@@ -212,7 +206,7 @@ public class IoTDBLastIT {
resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
- Assert.assertEquals(retArray1[cnt], ans);
+ Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
@@ -224,24 +218,21 @@ public class IoTDBLastIT {
statement.execute("flush");
hasResultSet = statement.execute(
"select last temperature,status,id from root.ln.wf01.wt02");
- cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ resultSet.getString(TIMESEIRES_STR) + ","
+ resultSet.getString(VALUE_STR);
- Assert.assertEquals(retArray2[cnt], ans);
+ Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ Assert.assertEquals(cnt, retArray.length);
}
}
@Test
- public void lastWithEmptyChunkMetadataTest() throws SQLException {
+ public void lastWithEmptyChunkMetadataTest() throws SQLException,
MetadataException {
String[] retArray =
new String[] {
"300,root.ln.wf01.wt03.temperature,23.1",
@@ -274,10 +265,6 @@ public class IoTDBLastIT {
}
Assert.assertEquals(cnt, 1);
}
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
}