This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty-mpp-2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 41a69eb1e0b4e63d0b6c337a6303979f09c37a05 Author: JackieTien97 <[email protected]> AuthorDate: Mon Mar 28 18:48:42 2022 +0800 add UT for SeriesScanOperator --- .../db/mpp/execution/FragmentInstanceContext.java | 4 + .../db/mpp/operator/source/SeriesScanUtil.java | 2 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 1 + .../db/mpp/operator/SeriesScanOperatorTest.java | 117 +++++++++++++++++++++ .../reader/series/SeriesAggregateReaderTest.java | 2 +- .../reader/series/SeriesReaderByTimestampTest.java | 2 +- .../db/query/reader/series/SeriesReaderTest.java | 2 +- .../query/reader/series/SeriesReaderTestUtil.java | 22 ++-- 8 files changed, 137 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java index 1f80bf6..015212f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java @@ -65,4 +65,8 @@ public class FragmentInstanceContext extends QueryContext { operatorContexts.add(operatorContext); return operatorContext; } + + public List<OperatorContext> getOperatorContexts() { + return operatorContexts; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java index b369665..3a1041d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java @@ -793,7 +793,7 @@ public class SeriesScanUtil { builder.declarePosition(); } } - hasCachedNextOverlappedPage = builder.isEmpty(); + hasCachedNextOverlappedPage = !builder.isEmpty(); cachedTsBlock = builder.build(); /* * if current overlapped page has valid data, return, otherwise read next overlapped page diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index a069c1e..8566608 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.TriggerManagementException; import org.apache.iotdb.db.exception.UDFRegistrationException; import org.apache.iotdb.db.metadata.idtable.IDTableManager; import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory; +import org.apache.iotdb.db.mpp.operator.OperatorContext; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.control.QueryResourceManager; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java new file mode 100644 index 0000000..5891566 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.operator; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.common.PlanFragmentId; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; +import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; +import org.apache.iotdb.db.utils.QueryUtils; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.*; + +public class SeriesScanOperatorTest { + private static final String SERIES_READER_TEST_SG = "root.seriesScanOperatorTest"; + private final List<String> deviceIds = new ArrayList<>(); + private final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); + + private final List<TsFileResource> seqResources = new ArrayList<>(); + private final List<TsFileResource> unSeqResources = new ArrayList<>(); + + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_READER_TEST_SG); + } + + @After + public void tearDown() throws IOException { + SeriesReaderTestUtil.tearDown(seqResources, unSeqResources); + } + + @Test + public void batchTest() { + try { + MeasurementPath measurementPath = new MeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0", TSDataType.INT32); + Set<String> allSensors = new HashSet<>(); + allSensors.add("sensor0"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceContext fragmentInstanceContext = new FragmentInstanceContext(new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance")); + fragmentInstanceContext.addOperatorContext(1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName()); + QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources); + QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true); + SeriesScanOperator seriesScanOperator = + new SeriesScanOperator( + measurementPath, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(0), + dataSource, + null, + null, + true); + int count = 0; + while (seriesScanOperator.hasNext()) { + TsBlock tsBlock = seriesScanOperator.next(); + assertEquals(1, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof IntColumn); + assertEquals(20, tsBlock.getPositionCount()); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long expectedTime = i + 20L * count; + assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); + if (expectedTime < 200) { + assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + } else if (expectedTime < 260 + || (expectedTime >= 300 && expectedTime < 380) + || expectedTime >= 400) { + assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + } else { + assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i)); + } + } + count++; + } + } catch (IOException | IllegalPathException e) { + e.printStackTrace(); + fail(); + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java index aa7a4d3..b43839f 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java @@ -60,7 +60,7 @@ public class SeriesAggregateReaderTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { EnvironmentUtils.envSetUp(); - SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources); + SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); } @After diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java index 140aa4f..166e605 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java @@ -51,7 +51,7 @@ public class SeriesReaderByTimestampTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { EnvironmentUtils.envSetUp(); - SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources); + SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); } @After diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java index ed507f7..ac35ca1 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java @@ -56,7 +56,7 @@ public class SeriesReaderTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { - SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources); + SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); } @After diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java index bef697c..2958355 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java @@ -58,17 +58,17 @@ public class SeriesReaderTestUtil { private static long ptNum = 100; private static long flushInterval = 20; private static TSEncoding encoding = TSEncoding.PLAIN; - private static final String SERIES_READER_TEST_SG = "root.seriesReaderTest"; public static void setUp( List<MeasurementSchema> measurementSchemas, List<String> deviceIds, List<TsFileResource> seqResources, - List<TsFileResource> unseqResources) + List<TsFileResource> unseqResources, + String sgName) throws MetadataException, IOException, WriteProcessException { IoTDB.schemaEngine.init(); - prepareSeries(measurementSchemas, deviceIds); - prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds); + prepareSeries(measurementSchemas, deviceIds, sgName); + prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds, sgName); } public static void tearDown( @@ -86,10 +86,10 @@ public class SeriesReaderTestUtil { List<TsFileResource> seqResources, List<TsFileResource> unseqResources, List<MeasurementSchema> measurementSchemas, - List<String> deviceIds) + List<String> deviceIds, String sgName) throws IOException, WriteProcessException { for (int i = 0; i < seqFileNum; i++) { - File file = new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i)); + File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i)); TsFileResource tsFileResource = new TsFileResource(file); tsFileResource.setStatus(TsFileResourceStatus.CLOSED); tsFileResource.setMinPlanIndex(i); @@ -100,7 +100,7 @@ public class SeriesReaderTestUtil { } for (int i = 0; i < unseqFileNum; i++) { File file = - new File(TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, i + seqFileNum)); + new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum)); TsFileResource tsFileResource = new TsFileResource(file); tsFileResource.setStatus(TsFileResourceStatus.CLOSED); tsFileResource.setMinPlanIndex(i + seqFileNum); @@ -118,7 +118,7 @@ public class SeriesReaderTestUtil { File file = new File( - TestConstant.getTestTsFilePath(SERIES_READER_TEST_SG, 0, 0, seqFileNum + unseqFileNum)); + TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum)); TsFileResource tsFileResource = new TsFileResource(file); tsFileResource.setStatus(TsFileResourceStatus.CLOSED); tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum); @@ -171,16 +171,16 @@ public class SeriesReaderTestUtil { } private static void prepareSeries( - List<MeasurementSchema> measurementSchemas, List<String> deviceIds) throws MetadataException { + List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName) throws MetadataException { for (int i = 0; i < measurementNum; i++) { measurementSchemas.add( new MeasurementSchema( "sensor" + i, TSDataType.INT32, encoding, CompressionType.UNCOMPRESSED)); } for (int i = 0; i < deviceNum; i++) { - deviceIds.add(SERIES_READER_TEST_SG + PATH_SEPARATOR + "device" + i); + deviceIds.add(sgName + PATH_SEPARATOR + "device" + i); } - IoTDB.schemaEngine.setStorageGroup(new PartialPath(SERIES_READER_TEST_SG)); + IoTDB.schemaEngine.setStorageGroup(new PartialPath(sgName)); for (String device : deviceIds) { for (MeasurementSchema measurementSchema : measurementSchemas) { IoTDB.schemaEngine.createTimeseries(
