This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch AlignedSeriesScanOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dcbdecbca526dadbd2550e47406acf2f1a1b42ed Author: JackieTien97 <[email protected]> AuthorDate: Wed May 4 16:43:58 2022 +0800 [IOTDB-3080] Implementation of AlignedSeriesScanOperator --- .../operator/source/AlignedSeriesScanOperator.java | 149 +++++++ .../operator/source/AlignedSeriesScanUtil.java | 10 +- .../execution/operator/source/SeriesScanUtil.java | 20 +- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 29 ++ .../operator/AlignedSeriesScanOperatorTest.java | 456 +++++++++++++++++++++ .../execution/operator/AlignedSeriesTestUtil.java | 260 ++++++++++++ .../execution/operator/TimeJoinOperatorTest.java | 2 +- .../iotdb/tsfile/read/common/block/TsBlock.java | 100 +++-- .../tsfile/write/record/datapoint/DataPoint.java | 10 +- 9 files changed, 989 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java new file mode 100644 index 0000000000..c47ab9f95d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution.operator.source; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.metadata.path.AlignedPath; +import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import java.io.IOException; +import java.util.HashSet; + +public class AlignedSeriesScanOperator implements DataSourceOperator { + + private final OperatorContext operatorContext; + private final AlignedSeriesScanUtil seriesScanUtil; + private final PlanNodeId sourceId; + private TsBlock tsBlock; + private boolean hasCachedTsBlock = false; + private boolean finished = false; + + public AlignedSeriesScanOperator( + PlanNodeId sourceId, + AlignedPath seriesPath, + OperatorContext context, + Filter timeFilter, + Filter valueFilter, + boolean ascending) { + this.sourceId = sourceId; + this.operatorContext = context; + this.seriesScanUtil = + new AlignedSeriesScanUtil( + seriesPath, + new HashSet<>(seriesPath.getMeasurementList()), + context.getInstanceContext(), + timeFilter, + valueFilter, + ascending); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() { + if (hasCachedTsBlock || hasNext()) { + hasCachedTsBlock = false; + return tsBlock; + } + throw new IllegalStateException("no next batch"); + } + + @Override + public boolean hasNext() { + + try { + if (hasCachedTsBlock) { + return true; + } + + /* + * consume page data firstly + */ + if (readPageData()) { + hasCachedTsBlock = true; + return true; + } + + /* + * consume chunk data secondly + */ + if (readChunkData()) { + hasCachedTsBlock = true; + return true; + } + + /* + * consume next file finally + */ + while (seriesScanUtil.hasNextFile()) { + if (readChunkData()) { + hasCachedTsBlock = true; + return true; + } + } + return hasCachedTsBlock; + } catch (IOException e) { + throw new RuntimeException("Error happened while scanning the file", e); + } + } + + @Override + public boolean isFinished() { + return finished || (finished = !hasNext()); + } + + private boolean readChunkData() throws IOException { + while (seriesScanUtil.hasNextChunk()) { + if (readPageData()) { + return true; + } + } + return false; + } + + private boolean readPageData() throws IOException { + while (seriesScanUtil.hasNextPage()) { + tsBlock = seriesScanUtil.nextPage(); + if (!isEmpty(tsBlock)) { + return true; + } + } + return false; + } + + private boolean isEmpty(TsBlock tsBlock) { + return tsBlock == null || tsBlock.isEmpty(); + } + + @Override + public PlanNodeId getSourceId() { + return sourceId; + } + + @Override + public void initQueryDataSource(QueryDataSource dataSource) { + seriesScanUtil.initQueryDataSource(dataSource); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java index 2d13cd3c1d..05ad3e6c84 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java @@ -30,7 +30,9 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; @@ -45,12 +47,11 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { public AlignedSeriesScanUtil( PartialPath seriesPath, Set<String> allSensors, - TSDataType dataType, FragmentInstanceContext context, Filter timeFilter, Filter valueFilter, boolean ascending) { - super(seriesPath, allSensors, dataType, context, timeFilter, valueFilter, ascending); + super(seriesPath, allSensors, TSDataType.VECTOR, context, timeFilter, valueFilter, ascending); dataTypes = ((AlignedPath) seriesPath) .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); @@ -82,4 +83,9 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { protected List<TSDataType> getTsDataTypeList() { return dataTypes; } + + @Override + protected IPointReader getPointReader(TsBlock tsBlock) { + return tsBlock.getTsBlockAlignedRowIterator(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index 8503b06806..a7686d00b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter; import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader; import org.apache.iotdb.tsfile.read.reader.IPageReader; +import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import java.io.IOException; @@ -712,9 +713,8 @@ public class SeriesScanUtil { // current timeValuePair is overlapped with firstPageReader, add it to merged reader // and update endTime to the max end time mergeReader.addReader( - firstPageReader - .getAllSatisfiedPageData(orderUtils.getAscending()) - .getTsBlockSingleColumnIterator(), + getPointReader( + firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())), firstPageReader.version, orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()), context); @@ -739,9 +739,7 @@ public class SeriesScanUtil { timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { VersionPageReader pageReader = seqPageReaders.remove(0); mergeReader.addReader( - pageReader - .getAllSatisfiedPageData(orderUtils.getAscending()) - .getTsBlockSingleColumnIterator(), + getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), pageReader.version, orderUtils.getOverlapCheckTime(pageReader.getStatistics()), context); @@ -922,9 +920,7 @@ public class SeriesScanUtil { private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException { mergeReader.addReader( - pageReader - .getAllSatisfiedPageData(orderUtils.getAscending()) - .getTsBlockSingleColumnIterator(), + getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), pageReader.version, orderUtils.getOverlapCheckTime(pageReader.getStatistics()), context); @@ -1076,7 +1072,11 @@ public class SeriesScanUtil { return Collections.singletonList(dataType); } - protected Filter getAnyFilter() { + protected IPointReader getPointReader(TsBlock tsBlock) { + return tsBlock.getTsBlockSingleColumnIterator(); + } + + private Filter getAnyFilter() { return timeFilter != null ? timeFilter : valueFilter; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 9f127c883c..6fd6b513ef 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.plan.planner; import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; @@ -49,6 +50,7 @@ import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator; import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator; @@ -77,6 +79,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; @@ -182,6 +185,32 @@ public class LocalExecutionPlanner { return seriesScanOperator; } + @Override + public Operator visitAlignedSeriesScan( + AlignedSeriesScanNode node, LocalExecutionPlanContext context) { + AlignedPath seriesPath = node.getAlignedPath(); + boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC; + OperatorContext operatorContext = + context.instanceContext.addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + AlignedSeriesScanOperator.class.getSimpleName()); + + AlignedSeriesScanOperator seriesScanOperator = + new AlignedSeriesScanOperator( + node.getPlanNodeId(), + seriesPath, + operatorContext, + node.getTimeFilter(), + node.getValueFilter(), + ascending); + + context.addSourceOperator(seriesScanOperator); + context.addPath(seriesPath); + + return seriesScanOperator; + } + @Override public Operator visitSchemaScan(SchemaScanNode node, LocalExecutionPlanContext context) { if (node instanceof TimeSeriesSchemaScanNode) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java new file mode 100644 index 0000000000..6cfac9abed --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution.operator; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.path.AlignedPath; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.common.PlanFragmentId; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn; +import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn; +import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn; +import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; +import org.apache.iotdb.tsfile.read.common.block.column.LongColumn; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class AlignedSeriesScanOperatorTest { + + private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.AlignedSeriesScanOperatorTest"; + private final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); + + private final List<TsFileResource> seqResources = new ArrayList<>(); + private final List<TsFileResource> unSeqResources = new ArrayList<>(); + + private static final double DELTA = 0.000001; + + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + AlignedSeriesTestUtil.setUp( + measurementSchemas, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG); + } + + @After + public void tearDown() throws IOException { + AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources); + } + + @Test + public void batchTest1() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + AlignedPath alignedPath = + new AlignedPath( + SERIES_SCAN_OPERATOR_TEST_SG + ".device0", + measurementSchemas.stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toList()), + measurementSchemas.stream() + .map(m -> (IMeasurementSchema) m) + .collect(Collectors.toList())); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName()); + + AlignedSeriesScanOperator seriesScanOperator = + new AlignedSeriesScanOperator( + planNodeId, + alignedPath, + fragmentInstanceContext.getOperatorContexts().get(0), + null, + null, + true); + seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + int count = 0; + while (seriesScanOperator.hasNext()) { + TsBlock tsBlock = seriesScanOperator.next(); + assertEquals(6, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn); + assertTrue(tsBlock.getColumn(1) instanceof IntColumn); + assertTrue(tsBlock.getColumn(2) instanceof LongColumn); + assertTrue(tsBlock.getColumn(3) instanceof FloatColumn); + assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn); + assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn); + + assertEquals(20, tsBlock.getPositionCount()); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long expectedTime = i + 20L * count; + assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); + int delta = 0; + if (expectedTime < 200) { + delta = 20000; + } else if (expectedTime < 260 + || (expectedTime >= 300 && expectedTime < 380) + || expectedTime >= 400) { + delta = 10000; + } + assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA); + assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA); + assertEquals( + String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString()); + } + count++; + } + assertEquals(25, count); + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void batchTest2() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + AlignedPath alignedPath1 = + new AlignedPath( + SERIES_SCAN_OPERATOR_TEST_SG + ".device0", + measurementSchemas.stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toList()), + measurementSchemas.stream() + .map(m -> (IMeasurementSchema) m) + .collect(Collectors.toList())); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + fragmentInstanceContext.addOperatorContext( + 2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId3 = new PlanNodeId("3"); + fragmentInstanceContext.addOperatorContext( + 3, planNodeId3, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId4 = new PlanNodeId("4"); + fragmentInstanceContext.addOperatorContext( + 4, planNodeId4, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId5 = new PlanNodeId("5"); + fragmentInstanceContext.addOperatorContext( + 5, planNodeId5, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId6 = new PlanNodeId("6"); + fragmentInstanceContext.addOperatorContext( + 6, planNodeId6, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId7 = new PlanNodeId("7"); + fragmentInstanceContext.addOperatorContext( + 7, planNodeId7, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId8 = new PlanNodeId("8"); + fragmentInstanceContext.addOperatorContext( + 8, planNodeId8, SeriesScanOperator.class.getSimpleName()); + fragmentInstanceContext.addOperatorContext( + 9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName()); + AlignedSeriesScanOperator seriesScanOperator1 = + new AlignedSeriesScanOperator( + planNodeId1, + alignedPath1, + fragmentInstanceContext.getOperatorContexts().get(0), + null, + null, + true); + seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + AlignedPath alignedPath2 = + new AlignedPath( + SERIES_SCAN_OPERATOR_TEST_SG + ".device1", + measurementSchemas.stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toList()), + measurementSchemas.stream() + .map(m -> (IMeasurementSchema) m) + .collect(Collectors.toList())); + AlignedSeriesScanOperator seriesScanOperator2 = + new AlignedSeriesScanOperator( + planNodeId2, + alignedPath2, + fragmentInstanceContext.getOperatorContexts().get(1), + null, + null, + true); + seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + Set<String> allSensors = new HashSet<>(); + allSensors.add("sensor0"); + allSensors.add("sensor1"); + allSensors.add("sensor2"); + allSensors.add("sensor3"); + allSensors.add("sensor4"); + allSensors.add("sensor5"); + + MeasurementPath measurementPath3 = + new MeasurementPath( + SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN); + SeriesScanOperator seriesScanOperator3 = + new SeriesScanOperator( + planNodeId3, + measurementPath3, + allSensors, + TSDataType.BOOLEAN, + fragmentInstanceContext.getOperatorContexts().get(2), + null, + null, + true); + seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath4 = + new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32); + SeriesScanOperator seriesScanOperator4 = + new SeriesScanOperator( + planNodeId4, + measurementPath4, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(3), + null, + null, + true); + seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath5 = + new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64); + SeriesScanOperator seriesScanOperator5 = + new SeriesScanOperator( + planNodeId5, + measurementPath5, + allSensors, + TSDataType.INT64, + fragmentInstanceContext.getOperatorContexts().get(4), + null, + null, + true); + seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath6 = + new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT); + SeriesScanOperator seriesScanOperator6 = + new SeriesScanOperator( + planNodeId6, + measurementPath6, + allSensors, + TSDataType.FLOAT, + fragmentInstanceContext.getOperatorContexts().get(5), + null, + null, + true); + seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath7 = + new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE); + SeriesScanOperator seriesScanOperator7 = + new SeriesScanOperator( + planNodeId7, + measurementPath7, + allSensors, + TSDataType.DOUBLE, + fragmentInstanceContext.getOperatorContexts().get(6), + null, + null, + true); + seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath8 = + new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.DOUBLE); + SeriesScanOperator seriesScanOperator8 = + new SeriesScanOperator( + planNodeId8, + measurementPath8, + allSensors, + TSDataType.TEXT, + fragmentInstanceContext.getOperatorContexts().get(7), + null, + null, + true); + seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + TimeJoinOperator timeJoinOperator = + new TimeJoinOperator( + fragmentInstanceContext.getOperatorContexts().get(8), + Arrays.asList( + seriesScanOperator1, + seriesScanOperator2, + seriesScanOperator3, + seriesScanOperator4, + seriesScanOperator5, + seriesScanOperator6, + seriesScanOperator7, + seriesScanOperator8), + OrderBy.TIMESTAMP_ASC, + Arrays.asList( + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT), + Arrays.asList( + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(0, 1), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(0, 2), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(0, 3), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(0, 4), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(0, 5), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 1), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 2), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 3), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 4), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 5), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(2, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(3, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(4, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(5, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(6, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(7, 0), new AscTimeComparator())), + new AscTimeComparator()); + int count = 0; + while (timeJoinOperator.hasNext()) { + TsBlock tsBlock = timeJoinOperator.next(); + assertEquals(18, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn); + assertTrue(tsBlock.getColumn(1) instanceof IntColumn); + assertTrue(tsBlock.getColumn(2) instanceof LongColumn); + assertTrue(tsBlock.getColumn(3) instanceof FloatColumn); + assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn); + assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn); + assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn); + assertTrue(tsBlock.getColumn(7) instanceof IntColumn); + assertTrue(tsBlock.getColumn(8) instanceof LongColumn); + assertTrue(tsBlock.getColumn(9) instanceof FloatColumn); + assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn); + assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn); + assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn); + assertTrue(tsBlock.getColumn(13) instanceof IntColumn); + assertTrue(tsBlock.getColumn(14) instanceof LongColumn); + assertTrue(tsBlock.getColumn(15) instanceof FloatColumn); + assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn); + assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn); + + assertEquals(20, tsBlock.getPositionCount()); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long expectedTime = i + 20L * count; + assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); + int delta = 0; + if (expectedTime < 200) { + delta = 20000; + } else if (expectedTime < 260 + || (expectedTime >= 300 && expectedTime < 380) + || expectedTime >= 400) { + delta = 10000; + } + assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(0).getBoolean(i)); + assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(6).getBoolean(i)); + assertEquals((delta + expectedTime) % 2 == 0, tsBlock.getColumn(12).getBoolean(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(1).getInt(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(7).getInt(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(13).getInt(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(2).getLong(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(8).getLong(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(14).getLong(i)); + assertEquals(delta + expectedTime, tsBlock.getColumn(3).getFloat(i), DELTA); + assertEquals(delta + expectedTime, tsBlock.getColumn(9).getFloat(i), DELTA); + assertEquals(delta + expectedTime, tsBlock.getColumn(15).getFloat(i), DELTA); + assertEquals(delta + expectedTime, tsBlock.getColumn(4).getDouble(i), DELTA); + assertEquals(delta + expectedTime, tsBlock.getColumn(10).getDouble(i), DELTA); + assertEquals(delta + expectedTime, tsBlock.getColumn(16).getDouble(i), DELTA); + assertEquals( + String.valueOf(delta + expectedTime), tsBlock.getColumn(5).getBinary(i).toString()); + assertEquals( + String.valueOf(delta + expectedTime), tsBlock.getColumn(11).getBinary(i).toString()); + assertEquals( + String.valueOf(delta + expectedTime), tsBlock.getColumn(17).getBinary(i).toString()); + } + count++; + } + assertEquals(25, count); + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java new file mode 100644 index 0000000000..32ea855b57 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesTestUtil.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution.operator; + +import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.Assert; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; + +/** + * This util contains 5 seqFiles and 5 unseqFiles in default. + * + * <p>Sequence time range of data: [0, 99], [100, 199], [200, 299], [300, 399], [400, 499] + * + * <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0, + * 199] + * + * <p>d0 and d1 are aligned, d2 is nonAligned + */ +public class AlignedSeriesTestUtil { + + public static void setUp( + List<MeasurementSchema> measurementSchemas, + List<TsFileResource> seqResources, + List<TsFileResource> unseqResources, + String sgName) + throws MetadataException, IOException, WriteProcessException { + IoTDB.configManager.init(); + prepareSeries(measurementSchemas, sgName); + prepareFiles(seqResources, unseqResources, measurementSchemas, sgName); + } + + public static void tearDown( + List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException { + removeFiles(seqResources, unseqResources); + seqResources.clear(); + unseqResources.clear(); + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + IoTDB.configManager.clear(); + EnvironmentUtils.cleanAllDir(); + } + + private static void prepareFiles( + List<TsFileResource> seqResources, + List<TsFileResource> unseqResources, + List<MeasurementSchema> measurementSchemas, + String sgName) + throws IOException, WriteProcessException { + int seqFileNum = 5; + long ptNum = 100; + for (int i = 0; i < seqFileNum; i++) { + File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i)); + TsFileResource tsFileResource = new TsFileResource(file); + tsFileResource.setStatus(TsFileResourceStatus.CLOSED); + tsFileResource.setMinPlanIndex(i); + tsFileResource.setMaxPlanIndex(i); + tsFileResource.setVersion(i); + seqResources.add(tsFileResource); + prepareFile(sgName, tsFileResource, i * ptNum, ptNum, 0, measurementSchemas); + } + int unseqFileNum = 5; + for (int i = 0; i < unseqFileNum; i++) { + File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum)); + TsFileResource tsFileResource = new TsFileResource(file); + tsFileResource.setStatus(TsFileResourceStatus.CLOSED); + tsFileResource.setMinPlanIndex(i + seqFileNum); + tsFileResource.setMaxPlanIndex(i + seqFileNum); + tsFileResource.setVersion(i + seqFileNum); + unseqResources.add(tsFileResource); + prepareFile( + sgName, + tsFileResource, + i * ptNum, + ptNum * (i + 1) / unseqFileNum, + 10000, + measurementSchemas); + } + + File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum)); + TsFileResource tsFileResource = new TsFileResource(file); + tsFileResource.setStatus(TsFileResourceStatus.CLOSED); + tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum); + tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum); + tsFileResource.setVersion(seqFileNum + unseqFileNum); + unseqResources.add(tsFileResource); + prepareFile(sgName, tsFileResource, 0, ptNum * 2, 20000, measurementSchemas); + } + + private static void prepareFile( + String sgName, + TsFileResource tsFileResource, + long timeOffset, + long ptNum, + long valueOffset, + List<MeasurementSchema> measurementSchemas) + throws IOException, WriteProcessException { + File file = tsFileResource.getTsFile(); + if (!file.getParentFile().exists()) { + Assert.assertTrue(file.getParentFile().mkdirs()); + } + TsFileWriter fileWriter = new TsFileWriter(file); + + String device0 = sgName + PATH_SEPARATOR + "device0"; + String device1 = sgName + PATH_SEPARATOR + "device1"; + String device2 = sgName + PATH_SEPARATOR + "device2"; + + fileWriter.registerAlignedTimeseries(new Path(device0), measurementSchemas); + fileWriter.registerAlignedTimeseries(new Path(device1), measurementSchemas); + fileWriter.registerTimeseries(new Path(device2), measurementSchemas); + for (long i = timeOffset; i < timeOffset + ptNum; i++) { + + TSRecord record = new TSRecord(i, device0); + int index = 0; + for (MeasurementSchema measurementSchema : measurementSchemas) { + record.addTuple( + DataPoint.getDataPoint( + measurementSchema.getType(), + measurementSchema.getMeasurementId(), + index == 0 + ? String.valueOf((i + valueOffset) % 2 == 0) + : String.valueOf((i + valueOffset)))); + index++; + } + fileWriter.writeAligned(record); + tsFileResource.updateStartTime(device0, i); + tsFileResource.updateEndTime(device0, i); + + record.deviceId = device1; + fileWriter.writeAligned(record); + tsFileResource.updateStartTime(device1, i); + tsFileResource.updateEndTime(device1, i); + + record.deviceId = device2; + fileWriter.write(record); + tsFileResource.updateStartTime(device2, i); + tsFileResource.updateEndTime(device2, i); + + long flushInterval = 20; + if ((i + 1) % flushInterval == 0) { + fileWriter.flushAllChunkGroups(); + } + } + fileWriter.close(); + } + + private static void prepareSeries(List<MeasurementSchema> measurementSchemas, String sgName) + throws MetadataException { + + measurementSchemas.add( + new MeasurementSchema( + "sensor0", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY)); + measurementSchemas.add( + new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY)); + measurementSchemas.add( + new MeasurementSchema( + "sensor2", TSDataType.INT64, TSEncoding.TS_2DIFF, CompressionType.SNAPPY)); + measurementSchemas.add( + new MeasurementSchema( + "sensor3", TSDataType.FLOAT, TSEncoding.GORILLA, CompressionType.SNAPPY)); + measurementSchemas.add( + new MeasurementSchema( + "sensor4", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)); + measurementSchemas.add( + new MeasurementSchema( + "sensor5", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.SNAPPY)); + + IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName)); + IoTDB.schemaProcessor.createAlignedTimeSeries( + new PartialPath(sgName + PATH_SEPARATOR + "device0"), + measurementSchemas.stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toList()), + measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()), + measurementSchemas.stream() + .map(MeasurementSchema::getEncodingType) + .collect(Collectors.toList()), + measurementSchemas.stream() + .map(MeasurementSchema::getCompressor) + .collect(Collectors.toList())); + IoTDB.schemaProcessor.createAlignedTimeSeries( + new PartialPath(sgName + PATH_SEPARATOR + "device1"), + measurementSchemas.stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toList()), + measurementSchemas.stream().map(MeasurementSchema::getType).collect(Collectors.toList()), + measurementSchemas.stream() + .map(MeasurementSchema::getEncodingType) + .collect(Collectors.toList()), + measurementSchemas.stream() + .map(MeasurementSchema::getCompressor) + .collect(Collectors.toList())); + for (MeasurementSchema measurementSchema : measurementSchemas) { + IoTDB.schemaProcessor.createTimeseries( + new PartialPath( + sgName + + PATH_SEPARATOR + + "device2" + + PATH_SEPARATOR + + measurementSchema.getMeasurementId()), + measurementSchema.getType(), + measurementSchema.getEncodingType(), + measurementSchema.getCompressor(), + Collections.emptyMap()); + } + } + + private static void removeFiles( + List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException { + for (TsFileResource tsFileResource : seqResources) { + tsFileResource.remove(); + } + for (TsFileResource tsFileResource : unseqResources) { + tsFileResource.remove(); + } + + FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java index 2c916057a0..e1fbefa10f 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java @@ -82,7 +82,7 @@ public class TimeJoinOperatorTest { } @Test - public void batchTest() { + public void batchTest1() { ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java index 8034bfb189..c19d4f2c06 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java @@ -233,14 +233,14 @@ public class TsBlock { } /** Only used for the batch data of vector time series. */ - public IBatchDataIterator getTsBlockIterator(int subIndex) { - return new AlignedTsBlockIterator(0, subIndex); + public TsBlockAlignedRowIterator getTsBlockAlignedRowIterator() { + return new TsBlockAlignedRowIterator(0); } public class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator { - protected int rowIndex; - protected int columnIndex; + private int rowIndex; + private final int columnIndex; public TsBlockSingleColumnIterator(int rowIndex) { this.rowIndex = rowIndex; @@ -357,52 +357,94 @@ public class TsBlock { } } - private class AlignedTsBlockIterator extends TsBlockSingleColumnIterator { + private class TsBlockAlignedRowIterator implements IPointReader, IBatchDataIterator { - private final int subIndex; + private int rowIndex; - private AlignedTsBlockIterator(int index, int subIndex) { - super(index); - this.subIndex = subIndex; + public TsBlockAlignedRowIterator(int rowIndex) { + this.rowIndex = rowIndex; } @Override public boolean hasNext() { - while (super.hasNext() && currentValue() == null) { - super.next(); - } - return super.hasNext(); + return rowIndex < positionCount; } @Override public boolean hasNext(long minBound, long maxBound) { - while (super.hasNext() && currentValue() == null) { + while (hasNext()) { if (currentTime() < minBound || currentTime() >= maxBound) { break; } - super.next(); + next(); } - return super.hasNext(); + return hasNext(); } @Override - public Object currentValue() { - TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex); - return v == null ? null : v.getValue(); + public void next() { + rowIndex++; } @Override - public int totalLength() { - // aligned timeseries' BatchData length() may return the length of time column - // we need traverse to VectorBatchDataIterator calculate the actual value column's length - int cnt = 0; - int indexSave = rowIndex; - while (hasNext()) { - cnt++; - next(); + public long currentTime() { + return timeColumn.getLong(rowIndex); + } + + @Override + public TsPrimitiveType[] currentValue() { + TsPrimitiveType[] tsPrimitiveTypes = new TsPrimitiveType[valueColumns.length]; + for (int i = 0; i < valueColumns.length; i++) { + tsPrimitiveTypes[i] = valueColumns[i].getTsPrimitiveType(rowIndex); } - rowIndex = indexSave; - return cnt; + return tsPrimitiveTypes; + } + + @Override + public void reset() { + rowIndex = 0; + } + + @Override + public int totalLength() { + return positionCount; + } + + @Override + public boolean hasNextTimeValuePair() { + return hasNext(); + } + + @Override + public TimeValuePair nextTimeValuePair() { + TimeValuePair res = currentTimeValuePair(); + next(); + return res; + } + + @Override + public TimeValuePair currentTimeValuePair() { + return new TimeValuePair( + timeColumn.getLong(rowIndex), new TsPrimitiveType.TsVector(currentValue())); + } + + @Override + public void close() {} + + public long getEndTime() { + return TsBlock.this.getEndTime(); + } + + public long getStartTime() { + return TsBlock.this.getStartTime(); + } + + public int getRowIndex() { + return rowIndex; + } + + public void setRowIndex(int rowIndex) { + this.rowIndex = rowIndex; } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java index c61a57971e..4c81358741 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java @@ -63,19 +63,19 @@ public abstract class DataPoint { try { switch (dataType) { case INT32: - dataPoint = new IntDataPoint(measurementId, Integer.valueOf(value)); + dataPoint = new IntDataPoint(measurementId, Integer.parseInt(value)); break; case INT64: - dataPoint = new LongDataPoint(measurementId, Long.valueOf(value)); + dataPoint = new LongDataPoint(measurementId, Long.parseLong(value)); break; case FLOAT: - dataPoint = new FloatDataPoint(measurementId, Float.valueOf(value)); + dataPoint = new FloatDataPoint(measurementId, Float.parseFloat(value)); break; case DOUBLE: - dataPoint = new DoubleDataPoint(measurementId, Double.valueOf(value)); + dataPoint = new DoubleDataPoint(measurementId, Double.parseDouble(value)); break; case BOOLEAN: - dataPoint = new BooleanDataPoint(measurementId, Boolean.valueOf(value)); + dataPoint = new BooleanDataPoint(measurementId, Boolean.parseBoolean(value)); break; case TEXT: dataPoint = new StringDataPoint(measurementId, new Binary(value));
