This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch max_by in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 53a4291559802656db78479a0d40b6a7c0720ea0 Merge: 955edb759d0 c52da2bdbd0 Author: lancelly <[email protected]> AuthorDate: Fri Jan 26 19:04:26 2024 +0800 Using Binary as Intermediate example/client-cpp-example/README.md | 2 +- .../it/cluster/IoTDBClusterRestartIT.java | 70 ++ .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 9 + .../maxby/IoTDBMaxByAlignedSeriesIT.java | 69 +- .../db/it/aggregation/maxby/IoTDBMaxByIT.java | 177 +++-- .../apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java | 46 -- .../apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java | 320 +++++++++ .../src/assembly/resources/tools/collect-info.bat | 267 ++++++++ .../tools/{collection-info.sh => collect-info.sh} | 212 +++--- .../assembly/resources/tools/collection-info.bat | 275 -------- iotdb-client/client-cpp/README.md | 20 +- .../src/assembly/resources/conf/confignode-env.bat | 2 +- .../resources/conf/iotdb-confignode.properties | 5 - ...register-confignode.sh => daemon-confignode.sh} | 24 +- .../heartbeat/DataNodeHeartbeatHandler.java | 2 +- .../iotdb/confignode/conf/ConfigNodeConfig.java | 17 +- .../confignode/conf/ConfigNodeDescriptor.java | 8 - .../iotdb/confignode/manager/ConfigManager.java | 87 ++- .../manager/consensus/ConsensusManager.java | 2 - .../confignode/manager/load/cache/LoadCache.java | 2 +- .../manager/load/cache/node/BaseNodeCache.java | 2 +- .../load/cache/node/ConfigNodeHeartbeatCache.java | 2 +- .../load/cache/node/DataNodeHeartbeatCache.java | 2 +- .../load/cache/node/NodeHeartbeatSample.java | 6 +- .../manager/load/cache/node/NodeStatistics.java | 2 +- .../manager/load/cache/region/RegionCache.java | 5 +- .../load/cache/region/RegionHeartbeatSample.java | 2 +- .../manager/load/service/HeartbeatService.java | 4 +- .../pipe/coordinator/runtime/PipeMetaSyncer.java | 3 + .../iotdb/confignode/persistence/AuthorInfo.java | 5 - .../persistence/executor/ConfigPlanExecutor.java | 9 +- .../partition/DatabasePartitionTable.java | 11 + .../persistence/partition/PartitionInfo.java | 19 + .../persistence/partition/RegionGroup.java | 15 + .../procedure/env/ConfigNodeProcedureEnv.java | 2 +- .../iotdb/confignode/service/ConfigNode.java | 14 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 2 +- .../router/priority/GreedyPriorityTest.java | 6 +- .../priority/LeaderPriorityBalancerTest.java | 6 +- .../manager/load/cache/LoadCacheTest.java | 2 +- .../manager/load/cache/NodeCacheTest.java | 6 +- .../manager/load/cache/RegionGroupCacheTest.java | 6 +- .../manager/load/cache/RegionRouteCacheTest.java | 2 +- .../iotdb/consensus/config/IoTConsensusConfig.java | 16 - .../apache/iotdb/consensus/config/RatisConfig.java | 30 - .../exception/RatisReadUnavailableException.java | 13 +- .../iot/client/IoTConsensusClientPool.java | 2 - .../consensus/iot/logdispatcher/LogDispatcher.java | 24 +- .../iotdb/consensus/ratis/RatisConsensus.java | 20 +- .../apache/iotdb/consensus/ratis/utils/Utils.java | 6 +- .../iotdb/consensus/ratis/RatisConsensusTest.java | 4 +- .../iotdb/consensus/ratis/RecoverReadTest.java | 6 +- .../src/assembly/resources/conf/datanode-env.bat | 2 +- .../resources/conf/iotdb-datanode.properties | 7 +- .../{register-datanode.sh => daemon-datanode.sh} | 24 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 53 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 44 +- .../db/consensus/DataRegionConsensusImpl.java | 2 - .../db/consensus/SchemaRegionConsensusImpl.java | 1 - .../agent/runtime/PipePeriodicalJobExecutor.java | 5 + .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 9 +- .../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 118 +++- .../thrift/async/IoTDBThriftAsyncConnector.java | 38 +- .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 41 +- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 13 +- .../common/tablet/PipeRawTabletInsertionEvent.java | 11 + .../tablet/TabletInsertionDataContainer.java | 152 +++-- .../common/tsfile/PipeTsFileInsertionEvent.java | 19 +- .../pipe/extractor/IoTDBDataRegionExtractor.java | 34 +- .../PipeHistoricalDataRegionTsFileExtractor.java | 54 +- .../PipeRealtimeDataRegionHybridExtractor.java | 9 +- .../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 7 +- .../resource/tsfile/PipeTsFileResourceManager.java | 33 +- .../pipe/resource/wal/PipeWALResourceManager.java | 41 +- .../iotdb/db/pipe/task/PipeDataNodeTask.java | 30 + .../db/pipe/task/subtask/PipeDataNodeSubtask.java | 105 +-- .../subtask/connector/PipeConnectorSubtask.java | 166 ++--- .../subtask/processor/PipeProcessorSubtask.java | 14 +- .../protocol/client/DataNodeClientPoolFactory.java | 2 - .../db/queryengine/common/MPPQueryContext.java | 10 +- .../common/header/ColumnHeaderConstant.java | 1 + .../execution/aggregation/MaxByAccumulator.java | 202 ++++-- .../queryengine/execution/driver/DataDriver.java | 1 + .../execution/driver/DataDriverContext.java | 16 +- .../fragment/FragmentInstanceContext.java | 4 + .../execution/load/LoadTsFileManager.java | 148 +++-- .../execution/operator/AggregationUtil.java | 6 + .../iotdb/db/queryengine/plan/Coordinator.java | 5 +- .../plan/analyze/schema/ClusterSchemaFetcher.java | 6 +- .../queryengine/plan/execution/QueryExecution.java | 3 + .../execution/config/metadata/ShowRegionTask.java | 7 +- .../plan/planner/LocalExecutionPlanner.java | 7 +- .../plan/planner/LogicalPlanBuilder.java | 41 +- .../plan/planner/OperatorTreeGenerator.java | 4 +- .../plan/planner/distribution/SourceRewriter.java | 9 +- .../plan/planner/plan/LogicalQueryPlan.java | 6 +- .../plan/planner/plan/PlanFragment.java | 5 + .../plan/parameter/AggregationDescriptor.java | 18 +- .../scheduler/FragmentInstanceDispatcherImpl.java | 30 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 8 +- .../mtree/impl/mem/mnode/info/LogicalViewInfo.java | 2 +- .../mtree/impl/pbtree/flush/Scheduler.java | 150 ++--- .../impl/pbtree/memory/ReleaseFlushMonitor.java | 4 +- .../mtree/impl/pbtree/schemafile/RecordUtils.java | 55 +- .../impl/pbtree/schemafile/WrappedSegment.java | 13 +- .../schemafile/pagemgr/BTreePageManager.java | 8 +- .../pbtree/schemafile/pagemgr/PageIOChannel.java | 184 ++++++ .../schemafile/pagemgr/PageIndexSortBuckets.java | 129 ++++ .../pbtree/schemafile/pagemgr/PageManager.java | 464 +------------ .../impl/pbtree/schemafile/pagemgr/PagePool.java | 152 +++++ .../schemafile/pagemgr/SchemaPageContext.java | 109 ++++ .../java/org/apache/iotdb/db/service/DataNode.java | 24 +- .../db/service/metrics/DataNodeMetricsHelper.java | 2 +- .../iotdb/db/service/metrics/WritingMetrics.java | 16 +- .../iotdb/db/storageengine/StorageEngine.java | 19 +- .../db/storageengine/dataregion/DataRegion.java | 18 +- .../impl/ReadChunkCompactionPerformer.java | 19 +- .../execute/task/CompactionTaskSummary.java | 44 +- .../execute/task/InnerSpaceCompactionTask.java | 2 + .../task/InsertionCrossSpaceCompactionTask.java | 8 +- .../execute/utils/MultiTsFileDeviceIterator.java | 4 + .../execute/utils/executor/ModifiedStatus.java} | 15 +- .../fast/AlignedSeriesCompactionExecutor.java | 1 + .../fast/NonAlignedSeriesCompactionExecutor.java | 1 + .../executor/fast/SeriesCompactionExecutor.java | 6 +- .../ReadChunkAlignedSeriesCompactionExecutor.java | 466 ++++++++++++++ .../readchunk/SingleSeriesCompactionExecutor.java | 4 +- .../executor/readchunk/loader/ChunkLoader.java | 104 +++ .../readchunk/loader/InstantChunkLoader.java | 105 +++ .../readchunk/loader/InstantPageLoader.java | 103 +++ .../executor/readchunk/loader/PageLoader.java | 107 +++ .../execute/utils/reader/PointPriorityReader.java | 2 +- .../compaction/io/CompactionTsFileReader.java | 18 + .../compaction/schedule/CompactionScheduler.java | 2 +- .../compaction/schedule/CompactionTaskManager.java | 3 +- .../compaction/schedule/CompactionWorker.java | 3 + .../impl/SizeTieredCompactionSelector.java | 41 +- .../InsertionCrossCompactionTaskResource.java | 25 + .../dataregion/memtable/AbstractMemTable.java | 18 + .../dataregion/memtable/IMemTable.java | 2 + .../dataregion/memtable/TsFileProcessor.java | 7 + .../dataregion/tsfile/TsFileResource.java | 26 +- .../dataregion/utils/TsFileResourceUtils.java | 75 +-- .../dataregion/wal/buffer/WALBuffer.java | 59 +- .../wal/checkpoint/CheckpointManager.java | 50 +- .../dataregion/wal/checkpoint/MemTableInfo.java | 22 +- .../dataregion/wal/io/WALByteBufReader.java | 26 +- .../dataregion/wal/io/WALMetaData.java | 76 ++- .../storageengine/dataregion/wal/node/WALNode.java | 213 +++--- .../dataregion/wal/recover/WALNodeRecoverTask.java | 41 +- .../wal/recover/file/TsFilePlanRedoer.java | 2 + .../db/tools/schema/PBTreeFileSketchTool.java | 2 +- .../org/apache/iotdb/db/utils/SchemaUtils.java | 29 +- .../iotdb/db/utils/constant/SqlConstant.java | 2 - .../metadata/mtree/schemafile/SchemaFileTest.java | 2 +- .../schemaRegion/SchemaStatisticsTest.java | 2 +- .../pipe/event/PipeTabletInsertionEventTest.java | 93 ++- .../execution/aggregation/AccumulatorTest.java | 17 +- .../plan/planner/PipelineBuilderTest.java | 55 ++ .../storageengine/dataregion/DataRegionTest.java | 10 +- .../compaction/CompactionValidationTest.java | 103 +++ .../FastInnerCompactionPerformerTest.java | 108 ++++ .../compaction/ReadChunkInnerCompactionTest.java | 144 +++++ .../InsertionCrossSpaceCompactionSelectorTest.java | 4 +- .../cross/InsertionCrossSpaceCompactionTest.java | 6 +- ...nkCompactionPerformerWithAlignedSeriesTest.java | 715 +++++++++++++++++++++ .../compaction/utils/CompactionCheckerUtils.java | 8 +- .../compaction/utils/CompactionTestFileWriter.java | 8 +- .../dataregion/wal/node/WALEntryHandlerTest.java | 13 +- .../wal/node/WalDeleteOutdatedNewTest.java | 585 +++++++++++++++++ .../wal/recover/WALRecoverWriterTest.java | 9 +- .../resources/conf/iotdb-cluster.properties | 5 +- .../resources/conf/iotdb-common.properties | 49 +- .../sbin/{clean-all.bat => destroy-all.bat} | 22 +- .../sbin/{clean-all.sh => destroy-all.sh} | 59 +- ...clean-confignode.bat => destroy-confignode.bat} | 34 +- .../{clean-confignode.sh => destroy-confignode.sh} | 7 +- .../{clean-datanode.bat => destroy-datanode.bat} | 14 +- .../{clean-datanode.sh => destroy-datanode.sh} | 7 +- .../src/assembly/resources/sbin/start-all.sh | 9 +- .../src/assembly/resources/sbin/stop-all.sh | 15 +- .../commons/auth/authorizer/BasicAuthorizer.java | 1 - .../iotdb/commons/auth/role/BasicRoleManager.java | 6 + .../commons/auth/role/LocalFileRoleAccessor.java | 12 +- .../commons/auth/role/LocalFileRoleManager.java | 6 + .../iotdb/commons/auth/user/BasicUserManager.java | 1 + .../commons/auth/user/LocalFileUserAccessor.java | 23 +- .../commons/auth/user/LocalFileUserManager.java | 5 + .../iotdb/commons/client/ClientPoolFactory.java | 29 +- .../client/property/ClientPoolProperty.java | 35 +- .../apache/iotdb/commons/conf/CommonConfig.java | 48 +- .../iotdb/commons/conf/CommonDescriptor.java | 33 +- .../commons/pipe/agent/task/PipeTaskAgent.java | 90 ++- .../iotdb/commons/pipe/config/PipeConfig.java | 20 +- .../commons/pipe/task/meta/PipeMetaKeeper.java | 4 + .../commons/pipe/task/subtask/PipeSubtask.java | 15 +- .../schema/view/viewExpression/ViewExpression.java | 18 + .../service/metric/JvmGcMonitorMetrics.java | 10 +- .../service/metric}/cpu/CpuUsageMetrics.java | 23 +- .../org/apache/iotdb/commons/utils/FileUtils.java | 8 +- .../iotdb/commons/client/ClientManagerTest.java | 47 +- .../file/metadata/AlignedTimeSeriesMetadata.java | 1 + .../iotdb/tsfile/read/TsFileSequenceReader.java | 3 +- .../tsfile/read/reader/page/AlignedPageReader.java | 7 +- .../thrift-commons/src/main/thrift/common.thrift | 2 - .../src/main/thrift/confignode.thrift | 1 + pom.xml | 2 +- 207 files changed, 6660 insertions(+), 2416 deletions(-) diff --cc integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java index a92eab034bb,00000000000..648b9e7b388 mode 100644,000000..100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java @@@ -1,62 -1,0 +1,63 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.aggregation.maxby; + +import org.apache.iotdb.it.env.EnvFactory; ++ +import org.junit.BeforeClass; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData; + - public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT{ - protected static final String[] ALIGNED_DATASET = - new String[] { - // x input - "CREATE ALIGNED TIMESERIES root.db.d1(x1 INT32, x2 INT64, x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)", - // y input - "CREATE ALIGNED TIMESERIES root.db.d1(y1 INT32, y2 INT64, y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)", - "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", - "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", - "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", - "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", - "flush", - // For Align By Device - "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)", - "CREATE ALIGNED TIMESERIES root.db.d2(y1 INT32, y2 INT64, y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", - }; ++public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT { ++ protected static final String[] ALIGNED_DATASET = ++ new String[] { ++ // x input ++ "CREATE ALIGNED TIMESERIES root.db.d1(x1 INT32, x2 INT64, x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)", ++ // y input ++ "CREATE ALIGNED TIMESERIES root.db.d1(y1 INT32, y2 INT64, y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)", ++ "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", ++ "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", ++ "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", ++ "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", ++ "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", ++ "flush", ++ // For Align By Device ++ "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)", ++ "CREATE ALIGNED TIMESERIES root.db.d2(y1 INT32, y2 INT64, y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", ++ "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", ++ }; + - @BeforeClass - public static void setUp() throws Exception { - EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000); - EnvFactory.getEnv().initClusterEnvironment(); - prepareData(ALIGNED_DATASET); - } ++ @BeforeClass ++ public static void setUp() throws Exception { ++ EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000); ++ EnvFactory.getEnv().initClusterEnvironment(); ++ prepareData(ALIGNED_DATASET); ++ } +} diff --cc integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java index 891d5cd96e6,00000000000..1cde3f239b8 mode 100644,000000..100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java @@@ -1,362 -1,0 +1,411 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.aggregation.maxby; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData; +import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; +import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR; +import static org.apache.iotdb.db.utils.constant.TestConstant.maxBy; +import static org.apache.iotdb.itbase.constant.TestConstant.DEVICE; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBMaxByIT { + protected static final String[] NON_ALIGNED_DATASET = + new String[] { + "CREATE DATABASE root.db", + // x input + "CREATE TIMESERIES root.db.d1.x1 WITH DATATYPE=INT32, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.x2 WITH DATATYPE=INT64, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.x3 WITH DATATYPE=FLOAT, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.x4 WITH DATATYPE=DOUBLE, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.x5 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.x6 WITH DATATYPE=TEXT, ENCODING=PLAIN", + // y input + "CREATE TIMESERIES root.db.d1.y1 WITH DATATYPE=INT32, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.y2 WITH DATATYPE=INT64, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.y3 WITH DATATYPE=FLOAT, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.y4 WITH DATATYPE=DOUBLE, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.y5 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", + "CREATE TIMESERIES root.db.d1.y6 WITH DATATYPE=TEXT, ENCODING=PLAIN", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", - "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", ++ "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", + "flush", + - // For Align By Device - "CREATE TIMESERIES root.db.d2.x1 WITH DATATYPE=INT32, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.x2 WITH DATATYPE=INT64, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.x3 WITH DATATYPE=FLOAT, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.x4 WITH DATATYPE=DOUBLE, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.x5 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.x6 WITH DATATYPE=TEXT, ENCODING=PLAIN", - // y input - "CREATE TIMESERIES root.db.d2.y1 WITH DATATYPE=INT32, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.y2 WITH DATATYPE=INT64, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.y3 WITH DATATYPE=FLOAT, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.y4 WITH DATATYPE=DOUBLE, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.y5 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", - "CREATE TIMESERIES root.db.d2.y6 WITH DATATYPE=TEXT, ENCODING=PLAIN", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 4, 4, 4, 4, true, \"1\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"1\")", - "flush" ++ // For Align By Device ++ "CREATE TIMESERIES root.db.d2.x1 WITH DATATYPE=INT32, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.x2 WITH DATATYPE=INT64, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.x3 WITH DATATYPE=FLOAT, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.x4 WITH DATATYPE=DOUBLE, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.x5 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.x6 WITH DATATYPE=TEXT, ENCODING=PLAIN", ++ // y input ++ "CREATE TIMESERIES root.db.d2.y1 WITH DATATYPE=INT32, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.y2 WITH DATATYPE=INT64, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.y3 WITH DATATYPE=FLOAT, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.y4 WITH DATATYPE=DOUBLE, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.y5 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", ++ "CREATE TIMESERIES root.db.d2.y6 WITH DATATYPE=TEXT, ENCODING=PLAIN", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 4, 4, 4, 4, true, \"1\")", ++ "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", ++ "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", ++ "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"1\")", ++ "flush" + }; + + protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy:"; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000); + EnvFactory.getEnv().initClusterEnvironment(); + prepareData(NON_ALIGNED_DATASET); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testMaxByWithUnsupportedYInputTypes() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + try { + try (ResultSet resultSet = + statement.executeQuery("SELECT max_by(x1, y5) FROM root.db.d1")) { + resultSet.next(); + fail(); + } + } catch (Exception e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE)); + } + try { + try (ResultSet resultSet = + statement.executeQuery("SELECT max_by(x1, y6) FROM root.db.d1")) { + resultSet.next(); + fail(); + } + } catch (Exception e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE)); + } + try { + try (ResultSet resultSet = + statement.executeQuery("SELECT max_by(x5, y5) FROM root.db.d1")) { + resultSet.next(); + fail(); + } + } catch (Exception e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE)); + } + try { + try (ResultSet resultSet = + statement.executeQuery("SELECT max_by(x5, y6) FROM root.db.d1")) { + resultSet.next(); + fail(); + } + } catch (Exception e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE)); + } + try { + try (ResultSet resultSet = + statement.executeQuery("SELECT max_by(x6, y5) FROM root.db.d1")) { + resultSet.next(); + fail(); + } + } catch (Exception e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE)); + } + try { + try (ResultSet resultSet = + statement.executeQuery("SELECT max_by(x6, y6) FROM root.db.d1")) { + resultSet.next(); + fail(); + } + } catch (Exception e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE)); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithDifferentXAndYInputTypes() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + Map<String, String[]> expectedHeaders = + generateExpectedHeadersForMaxByTest( - "root.db.d1", new String[] {"x1", "x2", "x3", "x4", "x5", "x6"}, new String[] {"y1", "y2", "y3", "y4"}); ++ "root.db.d1", ++ new String[] {"x1", "x2", "x3", "x4", "x5", "x6"}, ++ new String[] {"y1", "y2", "y3", "y4"}); + String[] retArray = new String[] {"3,3,3.0,3.0,false,3,"}; + for (Map.Entry<String, String[]> expectedHeader : expectedHeaders.entrySet()) { + String y = expectedHeader.getKey(); + resultSetEqualTest( + String.format( + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 3", + y, y, y, y, y, y), + expectedHeader.getValue(), + retArray); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithDifferentXAndYInputTypesAndNullXValue() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + Map<String, String[]> expectedHeaders = + generateExpectedHeadersForMaxByTest( - "root.db.d1", new String[] {"x1", "x2", "x3", "x4", "x5", "x6"},new String[] {"y1", "y2", "y3", "y4"}); ++ "root.db.d1", ++ new String[] {"x1", "x2", "x3", "x4", "x5", "x6"}, ++ new String[] {"y1", "y2", "y3", "y4"}); + String[] retArray = new String[] {"null,null,null,null,null,null,"}; + for (Map.Entry<String, String[]> expectedHeader : expectedHeaders.entrySet()) { + String y = expectedHeader.getKey(); + resultSetEqualTest( + String.format( + "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 4", + y, y, y, y, y, y), + expectedHeader.getValue(), + retArray); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithDifferentYInputTypesAndXAsTime() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + Map<String, String[]> expectedHeaders = + generateExpectedHeadersForMaxByTest( - "root.db.d1", new String[] {"Time", "Time", "Time", "Time", "Time", "Time"},new String[] {"y1", "y2", "y3", "y4"}); ++ "root.db.d1", ++ new String[] {"Time", "Time", "Time", "Time", "Time", "Time"}, ++ new String[] {"y1", "y2", "y3", "y4"}); + String[] retArray = new String[] {"3,3,3,3,3,3,"}; + for (Map.Entry<String, String[]> expectedHeader : expectedHeaders.entrySet()) { + String y = expectedHeader.getKey(); + resultSetEqualTest( + String.format( + "select max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s) from root.db.d1 where time <= 3", + y, y, y, y, y, y), + expectedHeader.getValue(), + retArray); + } + String[] retArray1 = new String[] {"4,4,4,4,4,4,"}; + for (Map.Entry<String, String[]> expectedHeader : expectedHeaders.entrySet()) { + String y = expectedHeader.getKey(); + resultSetEqualTest( - String.format( - "select max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s) from root.db.d1 where time <= 4", - y, y, y, y, y, y), - expectedHeader.getValue(), - retArray1); ++ String.format( ++ "select max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s) from root.db.d1 where time <= 4", ++ y, y, y, y, y, y), ++ expectedHeader.getValue(), ++ retArray1); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithExpression() { + try (Connection connection = EnvFactory.getEnv().getConnection(); - Statement statement = connection.createStatement()) { - String[] expectedHeader = new String[]{"max_by(root.db.d1.x1 + 1 - 3, -cos(sin(root.db.d1.y2 / 10)))","max_by(root.db.d1.x2 * 2 / 3, -cos(sin(root.db.d1.y2 / 10)))","max_by(floor(root.db.d1.x3), -cos(sin(root.db.d1.y2 / 10)))","max_by(ceil(root.db.d1.x4), -cos(sin(root.db.d1.y2 / 10)))","max_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 10)))","max_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 / 10)))",}; ++ Statement statement = connection.createStatement()) { ++ String[] expectedHeader = ++ new String[] { ++ "max_by(root.db.d1.x1 + 1 - 3, -cos(sin(root.db.d1.y2 / 10)))", ++ "max_by(root.db.d1.x2 * 2 / 3, -cos(sin(root.db.d1.y2 / 10)))", ++ "max_by(floor(root.db.d1.x3), -cos(sin(root.db.d1.y2 / 10)))", ++ "max_by(ceil(root.db.d1.x4), -cos(sin(root.db.d1.y2 / 10)))", ++ "max_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 10)))", ++ "max_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 / 10)))", ++ }; + String[] retArray = new String[] {"1.0,2.0,3.0,3.0,false,4,"}; + String y = "-cos(sin(y2 / 10))"; + resultSetEqualTest( - String.format( - "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, '3', '4'),%s) from root.db.d1 where time <= 3", - y, y, y, y, y, y), - expectedHeader, - retArray); ++ String.format( ++ "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, '3', '4'),%s) from root.db.d1 where time <= 3", ++ y, y, y, y, y, y), ++ expectedHeader, ++ retArray); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithAlignByDevice() { + try (Connection connection = EnvFactory.getEnv().getConnection(); - Statement statement = connection.createStatement()) { - String[] expectedHeader = new String[]{DEVICE, "max_by(x1 + 1 - 3, -cos(sin(y2 / 10)))","max_by(x2 * 2 / 3, -cos(sin(y2 / 10)))","max_by(floor(x3), -cos(sin(y2 / 10)))","max_by(ceil(x4), -cos(sin(y2 / 10)))","max_by(x5, -cos(sin(y2 / 10)))","max_by(REPLACE(x6, '3', '4'), -cos(sin(y2 / 10)))",}; - String[] retArray = new String[] {"root.db.d1,1.0,2.0,3.0,3.0,false,4,", "root.db.d2,1.0,2.0,3.0,3.0,false,4,"}; ++ Statement statement = connection.createStatement()) { ++ String[] expectedHeader = ++ new String[] { ++ DEVICE, ++ "max_by(x1 + 1 - 3, -cos(sin(y2 / 10)))", ++ "max_by(x2 * 2 / 3, -cos(sin(y2 / 10)))", ++ "max_by(floor(x3), -cos(sin(y2 / 10)))", ++ "max_by(ceil(x4), -cos(sin(y2 / 10)))", ++ "max_by(x5, -cos(sin(y2 / 10)))", ++ "max_by(REPLACE(x6, '3', '4'), -cos(sin(y2 / 10)))", ++ }; ++ String[] retArray = ++ new String[] { ++ "root.db.d1,1.0,2.0,3.0,3.0,false,4,", "root.db.d2,1.0,2.0,3.0,3.0,false,4," ++ }; + String y = "-cos(sin(y2 / 10))"; + resultSetEqualTest( - String.format( - "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, '3', '4'),%s) from root.db.** where time <= 3 align by device", - y, y, y, y, y, y), - expectedHeader, - retArray); ++ String.format( ++ "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, '3', '4'),%s) from root.db.** where time <= 3 align by device", ++ y, y, y, y, y, y), ++ expectedHeader, ++ retArray); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithGroupBy() { + try (Connection connection = EnvFactory.getEnv().getConnection(); - Statement statement = connection.createStatement()) { - String[] expectedHeader = new String[]{TIMESTAMP_STR, "max_by(root.db.d1.x1, root.db.d1.y2)","max_by(root.db.d1.x2, root.db.d1.y2)","max_by(root.db.d1.x3, root.db.d1.y2)","max_by(root.db.d1.x4, root.db.d1.y2)","max_by(root.db.d1.x5, root.db.d1.y2)","max_by(root.db.d1.x6, root.db.d1.y2)",}; - String[] retArray = new String[] {"0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3,"}; ++ Statement statement = connection.createStatement()) { ++ String[] expectedHeader = ++ new String[] { ++ TIMESTAMP_STR, ++ "max_by(root.db.d1.x1, root.db.d1.y2)", ++ "max_by(root.db.d1.x2, root.db.d1.y2)", ++ "max_by(root.db.d1.x3, root.db.d1.y2)", ++ "max_by(root.db.d1.x4, root.db.d1.y2)", ++ "max_by(root.db.d1.x5, root.db.d1.y2)", ++ "max_by(root.db.d1.x6, root.db.d1.y2)", ++ }; ++ String[] retArray = ++ new String[] { ++ "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3," ++ }; + String y = "y2"; + resultSetEqualTest( - String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms)", - y, y, y, y, y, y), - expectedHeader, - retArray); ++ String.format( ++ "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms)", ++ y, y, y, y, y, y), ++ expectedHeader, ++ retArray); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxByWithSlidingWindow() { + try (Connection connection = EnvFactory.getEnv().getConnection(); - Statement statement = connection.createStatement()) { - String[] expectedHeader = new String[]{TIMESTAMP_STR, "max_by(root.db.d1.x1, root.db.d1.y2)","max_by(root.db.d1.x2, root.db.d1.y2)","max_by(root.db.d1.x3, root.db.d1.y2)","max_by(root.db.d1.x4, root.db.d1.y2)","max_by(root.db.d1.x5, root.db.d1.y2)","max_by(root.db.d1.x6, root.db.d1.y2)",}; - String[] retArray = new String[] {"0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3,"}; ++ Statement statement = connection.createStatement()) { ++ String[] expectedHeader = ++ new String[] { ++ TIMESTAMP_STR, ++ "max_by(root.db.d1.x1, root.db.d1.y2)", ++ "max_by(root.db.d1.x2, root.db.d1.y2)", ++ "max_by(root.db.d1.x3, root.db.d1.y2)", ++ "max_by(root.db.d1.x4, root.db.d1.y2)", ++ "max_by(root.db.d1.x5, root.db.d1.y2)", ++ "max_by(root.db.d1.x6, root.db.d1.y2)", ++ }; ++ String[] retArray = ++ new String[] { ++ "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3," ++ }; + String y = "y2"; + resultSetEqualTest( - String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms)", - y, y, y, y, y, y), - expectedHeader, - retArray); ++ String.format( ++ "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms)", ++ y, y, y, y, y, y), ++ expectedHeader, ++ retArray); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + - + // test max_by different types of x + // test max_by time + // test max_by with expression + // test max_by align by device + // test max_by group by time + // test max_by sliding window + // test max_by aligned series + // test max_by multi data region + // test max_by group by level + // test max_by having + + /** @return yInput -> expectedHeader */ + private Map<String, String[]> generateExpectedHeadersForMaxByTest( + String device, String[] xInput, String[] yInput) { + Map<String, String[]> res = new HashMap<>(); + Arrays.stream(yInput) + .forEach( + y -> { + res.put( + y, + Arrays.stream(xInput) + .map(x -> maxBy("Time".equals(x) ? x : device + "." + x, device + "." + y)) + .toArray(String[]::new)); + }); + return res; + } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java index 21add5ae17e,00000000000..47092e43962 mode 100644,000000..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java @@@ -1,347 -1,0 +1,407 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation; + +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; ++import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; ++import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BitMap; ++import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + ++import java.io.ByteArrayOutputStream; ++import java.io.DataOutputStream; ++import java.io.IOException; ++import java.util.Collections; ++ +import static com.google.common.base.Preconditions.checkArgument; + +/** max(x,y) returns the value of x associated with the maximum value of y over all input values. */ +public class MaxByAccumulator implements Accumulator { + + private final TSDataType xDataType; + + private final TSDataType yDataType; + + private final TsPrimitiveType yMaxValue; + + private final TsPrimitiveType xResult; + + private boolean xNull = true; + + private boolean initResult; + + private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy: %s"; + + public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) { + this.xDataType = xDataType; + this.yDataType = yDataType; + this.xResult = TsPrimitiveType.getByType(xDataType); + this.yMaxValue = TsPrimitiveType.getByType(yDataType); + } + + // Column should be like: | Time | x | y | + @Override + public void addInput(Column[] column, BitMap bitMap, int lastIndex) { ++ checkArgument(column.length == 3, "Length of input Column[] for MaxBy should be 3"); + switch (yDataType) { + case INT32: + addIntInput(column, bitMap, lastIndex); + return; + case INT64: + addLongInput(column, bitMap, lastIndex); + return; + case FLOAT: + addFloatInput(column, bitMap, lastIndex); + return; + case DOUBLE: + addDoubleInput(column, bitMap, lastIndex); + return; + case TEXT: + case BOOLEAN: + default: + throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, yDataType)); + } + } + + // partialResult should be like: | partialMaxByBinary | + @Override + public void addIntermediate(Column[] partialResult) { - checkArgument(partialResult.length == 2, "partialResult of MaxBy should be 2"); ++ checkArgument(partialResult.length == 1, "partialResult of MaxBy should be 1"); + // Return if y is null. + if (partialResult[0].isNull(0)) { + return; + } - switch (yDataType) { - case INT32: - updateIntResult(partialResult[1].getInt(0), partialResult[0], 0); - break; - case INT64: - updateLongResult(partialResult[1].getLong(0), partialResult[0], 0); - break; - case FLOAT: - updateFloatResult(partialResult[1].getFloat(0), partialResult[0], 0); - break; - case DOUBLE: - updateDoubleResult(partialResult[1].getDouble(0), partialResult[0], 0); - break; - case TEXT: - case BOOLEAN: - default: - throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, yDataType)); - } ++ byte[] bytes = partialResult[0].getBinary(0).getValues(); ++ updateFromBytesIntermediateInput(bytes); + } + + @Override + public void addStatistics(Statistics statistics) { + throw new UnsupportedOperationException(getClass().getName()); + } + + // finalResult should be single column, like: | finalXValue | + @Override + public void setFinal(Column finalResult) { + if (finalResult.isNull(0)) { + return; + } + initResult = true; - if (finalResult.isNull(0)) { - xNull = true; - return; - } - switch (xDataType) { - case INT32: - xResult.setInt(finalResult.getInt(0)); - break; - case INT64: - xResult.setLong(finalResult.getLong(0)); - break; - case FLOAT: - xResult.setFloat(finalResult.getFloat(0)); - break; - case DOUBLE: - xResult.setDouble(finalResult.getDouble(0)); - break; - case TEXT: - xResult.setBinary(finalResult.getBinary(0)); - break; - case BOOLEAN: - xResult.setBoolean(finalResult.getBoolean(0)); - break; - default: - throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType)); - } ++ updateX(finalResult, 0); + } + - // columnBuilders should be like | xColumnBuilder | yColumnBuilder | ++ // columnBuilders should be like | TextIntermediateColumnBuilder | + @Override + public void outputIntermediate(ColumnBuilder[] columnBuilders) { - checkArgument(columnBuilders.length == 2, "partialResult of MaxValue should be 2"); ++ checkArgument(columnBuilders.length == 1, "partialResult of MaxValue should be 1"); + if (!initResult) { + columnBuilders[0].appendNull(); - columnBuilders[1].appendNull(); + return; + } - switch (yDataType) { - case INT32: - writeX(columnBuilders[0]); - columnBuilders[1].writeInt(yMaxValue.getInt()); - break; - case INT64: - writeX(columnBuilders[0]); - columnBuilders[1].writeLong(yMaxValue.getLong()); - break; - case FLOAT: - writeX(columnBuilders[0]); - columnBuilders[1].writeFloat(yMaxValue.getFloat()); - break; - case DOUBLE: - writeX(columnBuilders[0]); - columnBuilders[1].writeDouble(yMaxValue.getDouble()); - break; - case TEXT: - case BOOLEAN: - default: - throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, yDataType)); - } ++ columnBuilders[0].writeBinary(new Binary(serialize())); + } + + @Override + public void outputFinal(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + return; + } + writeX(columnBuilder); + } + + @Override + public void reset() { + initResult = false; + xNull = true; + this.xResult.reset(); + this.yMaxValue.reset(); + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public TSDataType[] getIntermediateType() { - return new TSDataType[] {xDataType, yDataType}; ++ return new TSDataType[] {TSDataType.TEXT}; + } + + @Override + public TSDataType getFinalType() { + return xDataType; + } + + private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) { + for (int i = 0; i <= lastIndex; i++) { + if (bitMap != null && !bitMap.isMarked(i)) { + continue; + } + if (!column[2].isNull(i)) { + updateIntResult(column[2].getInt(i), column[1], i); + } + } + } + + private void updateIntResult(int yMaxVal, Column xColumn, int xIndex) { + if (!initResult || yMaxVal > yMaxValue.getInt()) { + initResult = true; + yMaxValue.setInt(yMaxVal); + updateX(xColumn, xIndex); + } + } + + private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) { + for (int i = 0; i <= lastIndex; i++) { + if (bitMap != null && !bitMap.isMarked(i)) { + continue; + } + if (!column[2].isNull(i)) { + updateLongResult(column[2].getLong(i), column[1], i); + } + } + } + + private void updateLongResult(long yMaxVal, Column xColumn, int xIndex) { + if (!initResult || yMaxVal > yMaxValue.getLong()) { + initResult = true; + yMaxValue.setLong(yMaxVal); + updateX(xColumn, xIndex); + } + } + + private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) { + for (int i = 0; i <= lastIndex; i++) { + if (bitMap != null && !bitMap.isMarked(i)) { + continue; + } + if (!column[2].isNull(i)) { + updateFloatResult(column[2].getFloat(i), column[1], i); + } + } + } + + private void updateFloatResult(float yMaxVal, Column xColumn, int xIndex) { + if (!initResult || yMaxVal > yMaxValue.getFloat()) { + initResult = true; + yMaxValue.setFloat(yMaxVal); + updateX(xColumn, xIndex); + } + } + + private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) { + for (int i = 0; i <= lastIndex; i++) { + if (bitMap != null && !bitMap.isMarked(i)) { + continue; + } + if (!column[2].isNull(i)) { + updateDoubleResult(column[2].getDouble(i), column[1], i); + } + } + } + + private void updateDoubleResult(double yMaxVal, Column xColumn, int xIndex) { + if (!initResult || yMaxVal > yMaxValue.getDouble()) { + initResult = true; + yMaxValue.setDouble(yMaxVal); + updateX(xColumn, xIndex); + } + } + + private void writeX(ColumnBuilder columnBuilder) { + if (xNull) { + columnBuilder.appendNull(); + return; + } + switch (xDataType) { + case INT32: + columnBuilder.writeInt(xResult.getInt()); + break; + case INT64: + columnBuilder.writeLong(xResult.getLong()); + break; + case FLOAT: + columnBuilder.writeFloat(xResult.getFloat()); + break; + case DOUBLE: + columnBuilder.writeDouble(xResult.getDouble()); + break; + case TEXT: + columnBuilder.writeBinary(xResult.getBinary()); + break; + case BOOLEAN: + columnBuilder.writeBoolean(xResult.getBoolean()); + break; + default: + throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType)); + } + } + + private void updateX(Column xColumn, int xIndex) { + if (xColumn.isNull(xIndex)) { + xNull = true; + } else { + xNull = false; + switch (xDataType) { + case INT32: + xResult.setInt(xColumn.getInt(xIndex)); + break; + case INT64: + xResult.setLong(xColumn.getLong(xIndex)); + break; + case FLOAT: + xResult.setFloat(xColumn.getFloat(xIndex)); + break; + case DOUBLE: + xResult.setDouble(xColumn.getDouble(xIndex)); + break; + case TEXT: + xResult.setBinary(xColumn.getBinary(xIndex)); + break; + case BOOLEAN: + xResult.setBoolean(xColumn.getBoolean(xIndex)); + break; + default: + throw new UnSupportedDataTypeException( + String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType)); + } + } + } ++ ++ private byte[] serialize() { ++ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ++ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); ++ try { ++ writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream); ++ dataOutputStream.writeBoolean(xNull); ++ if (!xNull) { ++ writeIntermediateToStream(xDataType, xResult, dataOutputStream); ++ } ++ } catch (IOException e) { ++ throw new UnsupportedOperationException( ++ "Failed to serialize intermediate result for MaxByAccumulator.", e); ++ } ++ return byteArrayOutputStream.toByteArray(); ++ } ++ ++ private void writeIntermediateToStream( ++ TSDataType dataType, TsPrimitiveType value, DataOutputStream dataOutputStream) ++ throws IOException { ++ switch (dataType) { ++ case INT32: ++ dataOutputStream.writeInt(value.getInt()); ++ break; ++ case INT64: ++ dataOutputStream.writeLong(value.getLong()); ++ break; ++ case FLOAT: ++ dataOutputStream.writeFloat(value.getFloat()); ++ break; ++ case DOUBLE: ++ dataOutputStream.writeDouble(value.getDouble()); ++ break; ++ case TEXT: ++ dataOutputStream.writeBytes(value.getBinary().toString()); ++ break; ++ case BOOLEAN: ++ dataOutputStream.writeBoolean(value.getBoolean()); ++ break; ++ default: ++ throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, dataType)); ++ } ++ } ++ ++ private void updateFromBytesIntermediateInput(byte[] bytes) { ++ int offset = 0; ++ // Use Column to store x value ++ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(xDataType)); ++ ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0]; ++ switch (yDataType) { ++ case INT32: ++ int intMaxVal = BytesUtils.bytesToInt(bytes); ++ offset += Integer.BYTES; ++ readXFromBytesIntermediateInput(bytes, offset, columnBuilder); ++ updateIntResult(intMaxVal, columnBuilder.build(), 0); ++ break; ++ case INT64: ++ long longMaxVal = BytesUtils.bytesToLong(bytes); ++ offset += Long.BYTES; ++ readXFromBytesIntermediateInput(bytes, offset, columnBuilder); ++ updateLongResult(longMaxVal, columnBuilder.build(), 0); ++ break; ++ case FLOAT: ++ float floatMaxVal = BytesUtils.bytesToFloat(bytes); ++ offset += Float.BYTES; ++ readXFromBytesIntermediateInput(bytes, offset, columnBuilder); ++ updateFloatResult(floatMaxVal, columnBuilder.build(), 0); ++ break; ++ case DOUBLE: ++ double doubleMaxVal = BytesUtils.bytesToDouble(bytes); ++ offset += Long.BYTES; ++ readXFromBytesIntermediateInput(bytes, offset, columnBuilder); ++ updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0); ++ break; ++ case TEXT: ++ case BOOLEAN: ++ default: ++ throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, yDataType)); ++ } ++ } ++ ++ private void readXFromBytesIntermediateInput( ++ byte[] bytes, int offset, ColumnBuilder columnBuilder) { ++ boolean isXNull = BytesUtils.bytesToBool(bytes, offset); ++ offset += 1; ++ if (isXNull) { ++ columnBuilder.appendNull(); ++ } else { ++ switch (xDataType) { ++ case INT32: ++ columnBuilder.writeInt(BytesUtils.bytesToInt(bytes, offset)); ++ break; ++ case INT64: ++ columnBuilder.writeLong(BytesUtils.bytesToLongFromOffset(bytes, 8, offset)); ++ break; ++ case FLOAT: ++ columnBuilder.writeFloat(BytesUtils.bytesToFloat(bytes, offset)); ++ break; ++ case DOUBLE: ++ columnBuilder.writeDouble(BytesUtils.bytesToDouble(bytes, offset)); ++ break; ++ case TEXT: ++ columnBuilder.writeBinary( ++ new Binary(BytesUtils.subBytes(bytes, offset, bytes.length - offset))); ++ break; ++ case BOOLEAN: ++ columnBuilder.writeBoolean(BytesUtils.bytesToBool(bytes, offset)); ++ break; ++ default: ++ throw new UnSupportedDataTypeException( ++ String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType)); ++ } ++ } ++ } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 4d013814ace,15fb330bab2..68dff31c74f --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@@ -698,46 -697,27 +698,35 @@@ public class LogicalPlanBuilder public static void updateTypeProviderByPartialAggregation( AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) { - List<TAggregationType> splitAggregations = + List<String> partialAggregationsNames = SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType()); - for (TAggregationType aggregation : splitAggregations) { - String functionName = aggregation.toString().toLowerCase(); - TSDataType aggregationType = SchemaUtils.getAggregationType(functionName); - String inputExpressionStr = - getExpressionStringThatDeterminesReturnType(aggregationDescriptor); - String inputExpressionStr = - aggregationDescriptor.getInputExpressions().get(0).getExpressionString(); ++ String inputExpressionStr = getInputExpressionString(aggregationDescriptor); + for (String partialAggregationName : partialAggregationsNames) { + TSDataType aggregationType = SchemaUtils.getAggregationType(partialAggregationName); typeProvider.setType( - String.format("%s(%s)", functionName, aggregationDescriptor.getParametersString()), + String.format("%s(%s)", partialAggregationName, inputExpressionStr), aggregationType == null ? typeProvider.getType(inputExpressionStr) : aggregationType); } } - /** - * For aggregate functions that accept single input, we return the first input Expression. For - * aggregate functions that can accept multiple inputs, if the type of intermediate results is - * determined by the input Expression, we return the corresponding Expression used to determine - * the type of intermediate results. - */ - private static String getExpressionStringThatDeterminesReturnType( - AggregationDescriptor aggregationDescriptor) { - switch (aggregationDescriptor.getAggregationType()) { - case MAX_BY_Y_INPUT: - return aggregationDescriptor.getInputExpressions().get(1).getExpressionString(); - case MAX_BY_X_INPUT: - default: - return aggregationDescriptor.getInputExpressions().get(0).getExpressionString(); ++ private static String getInputExpressionString(AggregationDescriptor aggregationDescriptor) { ++ // We just process first input Expression of Count_IF ++ if (TAggregationType.COUNT_IF.equals(aggregationDescriptor.getAggregationType())) { ++ return aggregationDescriptor.getInputExpressions().get(0).getExpressionString(); ++ } else { ++ return aggregationDescriptor.getParametersString(); + } + } + public static void updateTypeProviderByPartialAggregation( CrossSeriesAggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) { - List<TAggregationType> splitAggregations = + List<String> partialAggregationsNames = SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType()); PartialPath path = ((TimeSeriesOperand) aggregationDescriptor.getOutputExpression()).getPath(); - for (TAggregationType aggregationType : splitAggregations) { - String functionName = aggregationType.toString().toLowerCase(); + for (String partialAggregationName : partialAggregationsNames) { typeProvider.setType( - String.format("%s(%s)", functionName, path.getFullPath()), - SchemaUtils.getSeriesTypeByPath(path, functionName)); + String.format("%s(%s)", partialAggregationName, path.getFullPath()), + SchemaUtils.getSeriesTypeByPath(path, partialAggregationName)); } } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java index 3c223cd54ee,3b32baef3fa..5192f5a0186 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java @@@ -34,8 -34,9 +34,10 @@@ import java.util.Iterator import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; + import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.addPartialSuffix; + public class AggregationDescriptor { // aggregation function type @@@ -152,28 -148,23 +154,26 @@@ outputAggregationNames.add(SqlConstant.MIN_TIME); break; case STDDEV: - outputAggregationNames.add(SqlConstant.STDDEV); + outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV)); break; case STDDEV_POP: - outputAggregationNames.add(SqlConstant.STDDEV_POP); + outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_POP)); break; case STDDEV_SAMP: - outputAggregationNames.add(SqlConstant.STDDEV_SAMP); + outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_SAMP)); break; case VARIANCE: - outputAggregationNames.add(SqlConstant.VARIANCE); + outputAggregationNames.add(addPartialSuffix(SqlConstant.VARIANCE)); break; case VAR_POP: - outputAggregationNames.add(SqlConstant.VAR_POP); + outputAggregationNames.add(addPartialSuffix(SqlConstant.VAR_POP)); break; case VAR_SAMP: - outputAggregationNames.add(SqlConstant.VAR_SAMP); + outputAggregationNames.add(addPartialSuffix(SqlConstant.VAR_SAMP)); break; + case MAX_BY: - // max_by(x, y) takes x and y as the input - outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_X); - outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_Y); ++ outputAggregationNames.add(addPartialSuffix(SqlConstant.MAX_BY)); + break; default: outputAggregationNames.add(aggregationFuncName); } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java index 4e6e9619669,ddddec125d1..ec88324163e --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java @@@ -136,9 -138,9 +138,10 @@@ public class SchemaUtils case SqlConstant.MIN_VALUE: case SqlConstant.MAX_VALUE: case SqlConstant.MODE: + case SqlConstant.MAX_BY: - default: return null; + default: + return TSDataType.TEXT; } } @@@ -186,30 -187,28 +189,30 @@@ } } - public static List<TAggregationType> splitPartialAggregation(TAggregationType aggregationType) { + public static List<String> splitPartialAggregation(TAggregationType aggregationType) { switch (aggregationType) { case FIRST_VALUE: - return Collections.singletonList(TAggregationType.MIN_TIME); + return Collections.singletonList(SqlConstant.MIN_TIME); case LAST_VALUE: - return Collections.singletonList(TAggregationType.MAX_TIME); + return Collections.singletonList(SqlConstant.MAX_TIME); case STDDEV: - return Collections.singletonList(TAggregationType.STDDEV); + return Collections.singletonList(addPartialSuffix(SqlConstant.STDDEV)); case STDDEV_POP: - return Collections.singletonList(TAggregationType.STDDEV_POP); + return Collections.singletonList(addPartialSuffix(SqlConstant.STDDEV_POP)); case STDDEV_SAMP: - return Collections.singletonList(TAggregationType.STDDEV_SAMP); + return Collections.singletonList(addPartialSuffix(SqlConstant.STDDEV_SAMP)); case VARIANCE: - return Collections.singletonList(TAggregationType.VARIANCE); + return Collections.singletonList(addPartialSuffix(SqlConstant.VARIANCE)); case VAR_POP: - return Collections.singletonList(TAggregationType.VAR_POP); + return Collections.singletonList(addPartialSuffix(SqlConstant.VAR_POP)); case VAR_SAMP: - return Collections.singletonList(TAggregationType.VAR_SAMP); + return Collections.singletonList(addPartialSuffix(SqlConstant.VAR_SAMP)); + case MAX_BY: - return Arrays.asList(TAggregationType.MAX_BY_X_INPUT, TAggregationType.MAX_BY_Y_INPUT); ++ return Collections.singletonList(addPartialSuffix(SqlConstant.MAX_BY)); case AVG: - return Arrays.asList(TAggregationType.COUNT, TAggregationType.SUM); + return Arrays.asList(SqlConstant.COUNT, SqlConstant.SUM); case TIME_DURATION: - return Arrays.asList(TAggregationType.MAX_TIME, TAggregationType.MIN_TIME); + return Arrays.asList(SqlConstant.MAX_TIME, SqlConstant.MIN_TIME); case SUM: case MIN_VALUE: case MAX_VALUE: diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java index f3e13d1cac1,75d8a04aac1..686476ff6a0 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java @@@ -47,9 -47,6 +47,7 @@@ public class SqlConstant public static final String MAX_TIME = "max_time"; public static final String MAX_VALUE = "max_value"; public static final String MIN_VALUE = "min_value"; + public static final String MAX_BY = "max_by"; - public static final String MAX_BY_INPUT_X = "max_by_input_x"; - public static final String MAX_BY_INPUT_Y = "max_by_input_y"; public static final String EXTREME = "extreme"; public static final String FIRST_VALUE = "first_value"; public static final String LAST_VALUE = "last_value"; diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java index 55fe3edeead,941a39dcb53..310eb019789 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java @@@ -876,44 -864,4 +876,37 @@@ public class AccumulatorTest varSampAccumulator.outputFinal(finalResult); Assert.assertEquals(841.6666666666666, finalResult.build().getDouble(0), 0.001); } + + @Test + public void maxByAccumulatorTest() { + Accumulator maxByAccumulator = + AccumulatorFactory.createAccumulator( + TAggregationType.MAX_BY, + Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE), + Collections.emptyList(), + Collections.emptyMap(), + true); - Assert.assertEquals(TSDataType.INT32, maxByAccumulator.getIntermediateType()[0]); - Assert.assertEquals(TSDataType.DOUBLE, maxByAccumulator.getIntermediateType()[1]); ++ Assert.assertEquals(TSDataType.TEXT, maxByAccumulator.getIntermediateType()[0]); + Assert.assertEquals(TSDataType.INT32, maxByAccumulator.getFinalType()); + // Returns null if there's no data - ColumnBuilder[] intermediateResult = new ColumnBuilder[2]; - intermediateResult[0] = new IntColumnBuilder(null, 1); - intermediateResult[1] = new DoubleColumnBuilder(null, 1); ++ ColumnBuilder[] intermediateResult = new ColumnBuilder[1]; ++ intermediateResult[0] = new BinaryColumnBuilder(null, 1); + maxByAccumulator.outputIntermediate(intermediateResult); + Assert.assertTrue(intermediateResult[0].build().isNull(0)); - Assert.assertTrue(intermediateResult[1].build().isNull(0)); + ColumnBuilder finalResult = new IntColumnBuilder(null, 1); + maxByAccumulator.outputFinal(finalResult); + Assert.assertTrue(finalResult.build().isNull(0)); + + Column[] timeAndValueColumn = getTimeAndTwoValueColumns(1, 0); + maxByAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1); + Assert.assertFalse(maxByAccumulator.hasFinalResult()); - intermediateResult[0] = new IntColumnBuilder(null, 1); - intermediateResult[1] = new DoubleColumnBuilder(null, 1); ++ intermediateResult[0] = new BinaryColumnBuilder(null, 1); + maxByAccumulator.outputIntermediate(intermediateResult); - Assert.assertEquals(-99, intermediateResult[0].build().getInt(0)); - Assert.assertEquals(99d, intermediateResult[1].build().getDouble(0), 0.001); + + // add intermediate result as input - maxByAccumulator.addIntermediate( - new Column[] {intermediateResult[0].build(), intermediateResult[1].build()}); ++ maxByAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()}); + finalResult = new IntColumnBuilder(null, 1); + maxByAccumulator.outputFinal(finalResult); + Assert.assertEquals(-99, finalResult.build().getInt(0)); + } } diff --cc iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index 21a69cf689e,dbdb8bd406f..9ec7b42d0ce --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@@ -196,8 -196,5 +196,6 @@@ enum TAggregationType STDDEV_SAMP, VARIANCE, VAR_POP, - VAR_SAMP + VAR_SAMP, + MAX_BY, - MAX_BY_X_INPUT, - MAX_BY_Y_INPUT }
