This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/sonar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 52e900afe94477a796ae43085d45a18db753d67d Author: JackieTien97 <[email protected]> AuthorDate: Tue Jun 20 18:02:10 2023 +0800 server/src/main/java/org/apache/iotdb/db/mpp/execution/operator done except for SeriesScanUtil --- .../db/mpp/execution/operator/AggregationUtil.java | 56 +++++++------- .../iotdb/db/mpp/execution/operator/Operator.java | 27 +++++-- .../db/mpp/execution/operator/OperatorContext.java | 2 +- .../AbstractSeriesAggregationScanOperator.java | 14 +++- .../AlignedSeriesAggregationScanOperator.java | 1 + .../operator/source/AlignedSeriesScanOperator.java | 53 ++++++------- .../operator/source/AlignedSeriesScanUtil.java | 87 ++++++++++++---------- .../operator/source/DataSourceOperator.java | 1 + .../operator/source/ExchangeOperator.java | 1 + .../operator/source/LastCacheScanOperator.java | 1 + .../source/SeriesAggregationScanOperator.java | 2 + .../operator/source/SeriesScanOperator.java | 6 +- .../execution/operator/source/SeriesScanUtil.java | 1 + .../operator/source/ShowQueriesOperator.java | 15 +--- .../execution/operator/source/SourceOperator.java | 1 + 15 files changed, 150 insertions(+), 118 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java index 0eade1b82fe..b0901cd5c5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java @@ -105,31 +105,7 @@ public class AggregationUtil { inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending); } - // Get the row which need to be processed by aggregator - IWindow curWindow = new TimeWindow(curTimeRange); - TimeColumn timeColumn = inputTsBlock.getTimeColumn(); - int lastIndexToProcess = 0; - for (int i = 0; i < inputTsBlock.getPositionCount(); i++) { - if (!curWindow.satisfy(timeColumn, i)) { - break; - } - lastIndexToProcess = i; - } - - for (Aggregator aggregator : aggregators) { - // current agg method has been calculated - if (aggregator.hasFinalResult()) { - continue; - } - - aggregator.processTsBlock(inputTsBlock, null, lastIndexToProcess); - } - int lastReadRowIndex = lastIndexToProcess + 1; - if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { - inputTsBlock = null; - } else { - inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); - } + inputTsBlock = process(inputTsBlock, curTimeRange, aggregators); } // judge whether the calculation finished @@ -142,6 +118,34 @@ public class AggregationUtil { isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound, inputTsBlock); } + private static TsBlock process( + TsBlock inputTsBlock, TimeRange curTimeRange, List<Aggregator> aggregators) { + // Get the row which need to be processed by aggregator + IWindow curWindow = new TimeWindow(curTimeRange); + TimeColumn timeColumn = inputTsBlock.getTimeColumn(); + int lastIndexToProcess = 0; + for (int i = 0; i < inputTsBlock.getPositionCount(); i++) { + if (!curWindow.satisfy(timeColumn, i)) { + break; + } + lastIndexToProcess = i; + } + + for (Aggregator aggregator : aggregators) { + // current agg method has been calculated + if (aggregator.hasFinalResult()) { + continue; + } + aggregator.processTsBlock(inputTsBlock, null, lastIndexToProcess); + } + int lastReadRowIndex = lastIndexToProcess + 1; + if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { + return null; + } else { + return inputTsBlock.subTsBlock(lastReadRowIndex); + } + } + /** Append a row of aggregation results to the result tsBlock. */ public static void appendAggregationResult( TsBlockBuilder tsBlockBuilder, List<? extends Aggregator> aggregators, long outputTime) { @@ -161,7 +165,7 @@ public class AggregationUtil { tsBlockBuilder.declarePosition(); } - /** @return whether the tsBlock contains the data of the current time window */ + /** return whether the tsBlock contains the data of the current time window. */ public static boolean satisfiedTimeRange( TsBlock tsBlock, TimeRange curTimeRange, boolean ascending) { if (tsBlock == null || tsBlock.isEmpty()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java index d762220da39..970fe037a3c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -34,6 +35,7 @@ public interface Operator extends AutoCloseable { * Returns a future that will be completed when the operator becomes unblocked. If the operator is * not blocked, this method should return {@code NOT_BLOCKED}. */ + @SuppressWarnings("squid:S1452") default ListenableFuture<?> isBlocked() { return NOT_BLOCKED; } @@ -50,7 +52,12 @@ public interface Operator extends AutoCloseable { } } - /** Gets next tsBlock from this operator. If no data is currently available, return null. */ + /** + * Gets next tsBlock from this operator. If no data is currently available, return null. + * + * @throws Exception Error happened while fetching next batch + */ + @SuppressWarnings("squid:S112") TsBlock next() throws Exception; default boolean hasNextWithTimer() throws Exception { @@ -64,7 +71,12 @@ public interface Operator extends AutoCloseable { } } - /** @return true if the operator has more data, otherwise false */ + /** + * return true if the operator has more data, otherwise false. + * + * @throws Exception Error happened while judging whether there exists next batch + */ + @SuppressWarnings("squid:S112") boolean hasNext() throws Exception; /** This method will always be called before releasing the Operator reference. */ @@ -73,13 +85,16 @@ public interface Operator extends AutoCloseable { /** * Is this operator completely finished processing and no more output TsBlock will be produced. + * + * @throws Exception Error happened while judging whether operator is finished */ + @SuppressWarnings("squid:S112") boolean isFinished() throws Exception; /** * We should also consider the memory used by its children operator, so the calculation logic may * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator, - * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....) + * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....). * * <p>Each operator's MaxPeekMemory should also take retained size of each child operator into * account. @@ -89,12 +104,12 @@ public interface Operator extends AutoCloseable { */ long calculateMaxPeekMemory(); - /** @return estimated max memory footprint for returned TsBlock when calling operator.next() */ + /** return estimated max memory footprint for returned TsBlock when calling operator.next(). */ long calculateMaxReturnSize(); /** - * @return each operator's retained size(including all its children's retained size) after calling - * its next() method + * return each operator's retained size(including all its children's retained size) after calling + * its next() method. */ long calculateRetainedSizeAfterCallingNext(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java index c5c0db783d5..90d98bbb9cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator; import org.apache.iotdb.commons.utils.TestOnly; @@ -81,7 +82,6 @@ public class OperatorContext { this.driverContext = driverContext; } - // TODO forbid get instance context from operator directly public FragmentInstanceContext getInstanceContext() { return driverContext.getFragmentInstanceContext(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java index 67b42a4319f..2983a0842f3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java @@ -65,8 +65,8 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData protected boolean finished = false; private final long cachedRawDataSize; - private final long maxReturnSize; + @SuppressWarnings("squid:S107") protected AbstractSeriesAggregationScanOperator( PlanNodeId sourceId, OperatorContext context, @@ -149,9 +149,13 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData @Override public boolean isFinished() throws Exception { - return finished || (finished = !hasNextWithTimer()); + if (!finished) { + finished = !hasNextWithTimer(); + } + return finished; } + @SuppressWarnings("squid:S112") protected void calculateNextAggregationResult() { try { if (calcFromCachedData()) { @@ -208,6 +212,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData } } + @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) protected boolean readAndCalcFromFile() throws IOException { while (seriesScanUtil.hasNextFile()) { if (canUseCurrentFileStatistics()) { @@ -246,6 +251,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData return false; } + @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) protected boolean readAndCalcFromChunk() throws IOException { while (seriesScanUtil.hasNextChunk()) { if (canUseCurrentChunkStatistics()) { @@ -284,6 +290,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData return false; } + @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) protected boolean readAndCalcFromPage() throws IOException { while (seriesScanUtil.hasNextPage()) { if (canUseCurrentPageStatistics()) { @@ -328,6 +335,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData return false; } + @SuppressWarnings({"squid:S3740"}) protected boolean canUseCurrentFileStatistics() throws IOException { Statistics fileStatistics = seriesScanUtil.currentFileTimeStatistics(); return !seriesScanUtil.isFileOverlapped() @@ -335,6 +343,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData && !seriesScanUtil.currentFileModified(); } + @SuppressWarnings({"squid:S3740"}) protected boolean canUseCurrentChunkStatistics() throws IOException { Statistics chunkStatistics = seriesScanUtil.currentChunkTimeStatistics(); return !seriesScanUtil.isChunkOverlapped() @@ -342,6 +351,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData && !seriesScanUtil.currentChunkModified(); } + @SuppressWarnings({"squid:S3740"}) protected boolean canUseCurrentPageStatistics() throws IOException { Statistics currentPageStatistics = seriesScanUtil.currentPageTimeStatistics(); if (currentPageStatistics == null) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java index 0f9e0af85f3..99f107aa804 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java @@ -33,6 +33,7 @@ import java.util.List; /** This operator is responsible to do the aggregation calculation especially for aligned series. */ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregationScanOperator { + @SuppressWarnings("squid:S107") public AlignedSeriesAggregationScanOperator( PlanNodeId sourceId, AlignedPath seriesPath, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java index ebc407ea971..18337b16d0d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.AlignedPath; @@ -71,6 +72,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { return checkTsBlockSizeAndGetResult(); } + @SuppressWarnings("squid:S112") @Override public boolean hasNext() throws Exception { if (retainedTsBlock != null) { @@ -85,26 +87,13 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { // here use do-while to promise doing this at least once do { /* - * consume page data firstly - */ - if (readPageData()) { - continue; - } - - /* - * consume chunk data secondly - */ - if (readChunkData()) { - continue; - } - - /* - * consume next file finally + * 1. consume page data firstly + * 2. consume chunk data secondly + * 3. consume next file finally */ - if (readFileData()) { - continue; + if (!readPageData() && !readChunkData() && !readFileData()) { + break; } - break; } while (System.nanoTime() - start < maxRuntime && !builder.isFull()); @@ -178,21 +167,25 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount(); columnIndex < columnSize; columnIndex++) { - ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex); - Column column = tsBlock.getColumn(columnIndex); - if (column.mayHaveNull()) { - for (int i = 0; i < size; i++) { - if (column.isNull(i)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write(column, i); - } - } - } else { - for (int i = 0; i < size; i++) { + appendOneColumn(columnIndex, tsBlock, size); + } + } + + private void appendOneColumn(int columnIndex, TsBlock tsBlock, int size) { + ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex); + Column column = tsBlock.getColumn(columnIndex); + if (column.mayHaveNull()) { + for (int i = 0; i < size; i++) { + if (column.isNull(i)) { + columnBuilder.appendNull(); + } else { columnBuilder.write(column, i); } } + } else { + for (int i = 0; i < size; i++) { + columnBuilder.write(column, i); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java index ebfe5f87b11..883dadfd5f4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.AlignedPath; @@ -135,63 +136,71 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { && !firstTimeSeriesMetadata.isModified()) { Filter queryFilter = scanOptions.getQueryFilter(); if (queryFilter != null) { - // TODO accept valueStatisticsList to filter if (!queryFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) { skipCurrentFile(); } } else { - // For aligned series, When we only query some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not - // be selected. - // NOTE: if we change the query semantic in the future for aligned series, we need to remove - // this check here. - long rowCount = - ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics().getCount(); - for (Statistics statistics : - ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) { - if (statistics == null || statistics.hasNullValue(rowCount)) { - return; - } - } - // When the number of points in all value chunk groups is the same as that in the time chunk - // group, it means that there is no null value, and all timestamps will be selected. - if (paginationController.hasCurOffset(rowCount)) { - skipCurrentFile(); - paginationController.consumeOffset(rowCount); - } + skipOffsetByTimeSeriesMetadata(); } } } + @SuppressWarnings("squid:S3740") + private void skipOffsetByTimeSeriesMetadata() { + // For aligned series, When we only query some measurements under an aligned device, if the + // values of these queried measurements at a timestamp are all null, the timestamp will not + // be selected. + // NOTE: if we change the query semantic in the future for aligned series, we need to remove + // this check here. + long rowCount = + ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics().getCount(); + for (Statistics statistics : + ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) { + if (statistics == null || statistics.hasNullValue(rowCount)) { + return; + } + } + // When the number of points in all value chunk groups is the same as that in the time chunk + // group, it means that there is no null value, and all timestamps will be selected. + if (paginationController.hasCurOffset(rowCount)) { + skipCurrentFile(); + paginationController.consumeOffset(rowCount); + } + } + @Override protected void filterFirstChunkMetadata() throws IOException { if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) { Filter queryFilter = scanOptions.getQueryFilter(); if (queryFilter != null) { - // TODO accept valueStatisticsList to filter if (!queryFilter.satisfy(firstChunkMetadata.getStatistics())) { skipCurrentChunk(); } } else { - // For aligned series, When we only query some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not - // be selected. - // NOTE: if we change the query semantic in the future for aligned series, we need to remove - // this check here. - long rowCount = firstChunkMetadata.getStatistics().getCount(); - for (Statistics statistics : - ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) { - if (statistics == null || statistics.hasNullValue(rowCount)) { - return; - } - } - // When the number of points in all value chunks is the same as that in the time chunk, it - // means that there is no null value, and all timestamps will be selected. - if (paginationController.hasCurOffset(rowCount)) { - skipCurrentChunk(); - paginationController.consumeOffset(rowCount); - } + skipOffsetByChunkMetadata(); + } + } + } + + @SuppressWarnings("squid:S3740") + private void skipOffsetByChunkMetadata() { + // For aligned series, When we only query some measurements under an aligned device, if the + // values of these queried measurements at a timestamp are all null, the timestamp will not + // be selected. + // NOTE: if we change the query semantic in the future for aligned series, we need to remove + // this check here. + long rowCount = firstChunkMetadata.getStatistics().getCount(); + for (Statistics statistics : + ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) { + if (statistics == null || statistics.hasNullValue(rowCount)) { + return; } } + // When the number of points in all value chunks is the same as that in the time chunk, it + // means that there is no null value, and all timestamps will be selected. + if (paginationController.hasCurOffset(rowCount)) { + skipCurrentChunk(); + paginationController.consumeOffset(rowCount); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/DataSourceOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/DataSourceOperator.java index 665f45a7446..3d5dbe8e9ef 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/DataSourceOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/DataSourceOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java index de23d93fb8b..152317363a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java index eb975d34693..e6256a23d31 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java index 8120f6e497b..e1474b51c60 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.PartialPath; @@ -39,6 +40,7 @@ import java.util.List; */ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScanOperator { + @SuppressWarnings("squid:S107") public SeriesAggregationScanOperator( PlanNodeId sourceId, PartialPath seriesPath, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java index 8719bed2e35..4270d59f627 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.PartialPath; @@ -86,12 +87,12 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { @Override public SourceOperator createOperator(DriverContext driverContext) { checkState(!closed, "Factory is already closed"); - OperatorContext operatorContext = - driverContext.addOperatorContext(operatorId, sourceId, getOperatorType()); SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); scanOptionsBuilder.withAllSensors(allSensors); scanOptionsBuilder.withGlobalTimeFilter(timeFilter); scanOptionsBuilder.withQueryFilter(valueFilter); + OperatorContext operatorContext = + driverContext.addOperatorContext(operatorId, sourceId, getOperatorType()); return new SeriesScanOperator( operatorContext, sourceId, @@ -134,6 +135,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { return checkTsBlockSizeAndGetResult(); } + @SuppressWarnings("squid:S112") @Override public boolean hasNext() throws Exception { if (retainedTsBlock != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index 27b111654ba..728d88ee23a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.PartialPath; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java index 690daa70fe5..55d293f4347 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory; @@ -31,8 +32,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.utils.Binary; -import com.google.common.util.concurrent.ListenableFuture; - import java.util.List; import java.util.concurrent.TimeUnit; @@ -105,14 +104,6 @@ public class ShowQueriesOperator implements SourceOperator { return sourceId; } - @Override - public ListenableFuture<?> isBlocked() { - return NOT_BLOCKED; - } - - @Override - public void close() throws Exception {} - private TsBlock buildTsBlock() { List<TSDataType> outputDataTypes = DatasetHeaderFactory.getShowQueriesHeader().getRespDataTypes(); @@ -124,14 +115,14 @@ public class ShowQueriesOperator implements SourceOperator { ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders(); long currTime = System.currentTimeMillis(); String[] splits = queryExecutions.get(0).getQueryId().split("_"); - int DataNodeId = Integer.parseInt(splits[splits.length - 1]); + int dataNodeId = Integer.parseInt(splits[splits.length - 1]); for (IQueryExecution queryExecution : queryExecutions) { timeColumnBuilder.writeLong( TimestampPrecisionUtils.convertToCurrPrecision( queryExecution.getStartExecutionTime(), TimeUnit.MILLISECONDS)); columnBuilders[0].writeBinary(Binary.valueOf(queryExecution.getQueryId())); - columnBuilders[1].writeInt(DataNodeId); + columnBuilders[1].writeInt(dataNodeId); columnBuilders[2].writeFloat( (float) (currTime - queryExecution.getStartExecutionTime()) / 1000); columnBuilders[3].writeBinary( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SourceOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SourceOperator.java index 4489cad90e6..4ab6a56c86b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SourceOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SourceOperator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.db.mpp.execution.operator.Operator;
