This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch max_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 53a4291559802656db78479a0d40b6a7c0720ea0
Merge: 955edb759d0 c52da2bdbd0
Author: lancelly <[email protected]>
AuthorDate: Fri Jan 26 19:04:26 2024 +0800

    Using Binary as Intermediate

 example/client-cpp-example/README.md               |   2 +-
 .../it/cluster/IoTDBClusterRestartIT.java          |  70 ++
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  |   9 +
 .../maxby/IoTDBMaxByAlignedSeriesIT.java           |  69 +-
 .../db/it/aggregation/maxby/IoTDBMaxByIT.java      | 177 +++--
 .../apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java  |  46 --
 .../apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java | 320 +++++++++
 .../src/assembly/resources/tools/collect-info.bat  | 267 ++++++++
 .../tools/{collection-info.sh => collect-info.sh}  | 212 +++---
 .../assembly/resources/tools/collection-info.bat   | 275 --------
 iotdb-client/client-cpp/README.md                  |  20 +-
 .../src/assembly/resources/conf/confignode-env.bat |   2 +-
 .../resources/conf/iotdb-confignode.properties     |   5 -
 ...register-confignode.sh => daemon-confignode.sh} |  24 +-
 .../heartbeat/DataNodeHeartbeatHandler.java        |   2 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  17 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   8 -
 .../iotdb/confignode/manager/ConfigManager.java    |  87 ++-
 .../manager/consensus/ConsensusManager.java        |   2 -
 .../confignode/manager/load/cache/LoadCache.java   |   2 +-
 .../manager/load/cache/node/BaseNodeCache.java     |   2 +-
 .../load/cache/node/ConfigNodeHeartbeatCache.java  |   2 +-
 .../load/cache/node/DataNodeHeartbeatCache.java    |   2 +-
 .../load/cache/node/NodeHeartbeatSample.java       |   6 +-
 .../manager/load/cache/node/NodeStatistics.java    |   2 +-
 .../manager/load/cache/region/RegionCache.java     |   5 +-
 .../load/cache/region/RegionHeartbeatSample.java   |   2 +-
 .../manager/load/service/HeartbeatService.java     |   4 +-
 .../pipe/coordinator/runtime/PipeMetaSyncer.java   |   3 +
 .../iotdb/confignode/persistence/AuthorInfo.java   |   5 -
 .../persistence/executor/ConfigPlanExecutor.java   |   9 +-
 .../partition/DatabasePartitionTable.java          |  11 +
 .../persistence/partition/PartitionInfo.java       |  19 +
 .../persistence/partition/RegionGroup.java         |  15 +
 .../procedure/env/ConfigNodeProcedureEnv.java      |   2 +-
 .../iotdb/confignode/service/ConfigNode.java       |  14 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   2 +-
 .../router/priority/GreedyPriorityTest.java        |   6 +-
 .../priority/LeaderPriorityBalancerTest.java       |   6 +-
 .../manager/load/cache/LoadCacheTest.java          |   2 +-
 .../manager/load/cache/NodeCacheTest.java          |   6 +-
 .../manager/load/cache/RegionGroupCacheTest.java   |   6 +-
 .../manager/load/cache/RegionRouteCacheTest.java   |   2 +-
 .../iotdb/consensus/config/IoTConsensusConfig.java |  16 -
 .../apache/iotdb/consensus/config/RatisConfig.java |  30 -
 .../exception/RatisReadUnavailableException.java   |  13 +-
 .../iot/client/IoTConsensusClientPool.java         |   2 -
 .../consensus/iot/logdispatcher/LogDispatcher.java |  24 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |  20 +-
 .../apache/iotdb/consensus/ratis/utils/Utils.java  |   6 +-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   4 +-
 .../iotdb/consensus/ratis/RecoverReadTest.java     |   6 +-
 .../src/assembly/resources/conf/datanode-env.bat   |   2 +-
 .../resources/conf/iotdb-datanode.properties       |   7 +-
 .../{register-datanode.sh => daemon-datanode.sh}   |  24 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  53 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  44 +-
 .../db/consensus/DataRegionConsensusImpl.java      |   2 -
 .../db/consensus/SchemaRegionConsensusImpl.java    |   1 -
 .../agent/runtime/PipePeriodicalJobExecutor.java   |   5 +
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   9 +-
 .../db/pipe/agent/task/PipeTaskDataNodeAgent.java  | 118 +++-
 .../thrift/async/IoTDBThriftAsyncConnector.java    |  38 +-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |  41 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  13 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  11 +
 .../tablet/TabletInsertionDataContainer.java       | 152 +++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  19 +-
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  34 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  54 +-
 .../PipeRealtimeDataRegionHybridExtractor.java     |   9 +-
 .../iotdb/db/pipe/metric/PipeExtractorMetrics.java |   7 +-
 .../resource/tsfile/PipeTsFileResourceManager.java |  33 +-
 .../pipe/resource/wal/PipeWALResourceManager.java  |  41 +-
 .../iotdb/db/pipe/task/PipeDataNodeTask.java       |  30 +
 .../db/pipe/task/subtask/PipeDataNodeSubtask.java  | 105 +--
 .../subtask/connector/PipeConnectorSubtask.java    | 166 ++---
 .../subtask/processor/PipeProcessorSubtask.java    |  14 +-
 .../protocol/client/DataNodeClientPoolFactory.java |   2 -
 .../db/queryengine/common/MPPQueryContext.java     |  10 +-
 .../common/header/ColumnHeaderConstant.java        |   1 +
 .../execution/aggregation/MaxByAccumulator.java    | 202 ++++--
 .../queryengine/execution/driver/DataDriver.java   |   1 +
 .../execution/driver/DataDriverContext.java        |  16 +-
 .../fragment/FragmentInstanceContext.java          |   4 +
 .../execution/load/LoadTsFileManager.java          | 148 +++--
 .../execution/operator/AggregationUtil.java        |   6 +
 .../iotdb/db/queryengine/plan/Coordinator.java     |   5 +-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   6 +-
 .../queryengine/plan/execution/QueryExecution.java |   3 +
 .../execution/config/metadata/ShowRegionTask.java  |   7 +-
 .../plan/planner/LocalExecutionPlanner.java        |   7 +-
 .../plan/planner/LogicalPlanBuilder.java           |  41 +-
 .../plan/planner/OperatorTreeGenerator.java        |   4 +-
 .../plan/planner/distribution/SourceRewriter.java  |   9 +-
 .../plan/planner/plan/LogicalQueryPlan.java        |   6 +-
 .../plan/planner/plan/PlanFragment.java            |   5 +
 .../plan/parameter/AggregationDescriptor.java      |  18 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  30 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   8 +-
 .../mtree/impl/mem/mnode/info/LogicalViewInfo.java |   2 +-
 .../mtree/impl/pbtree/flush/Scheduler.java         | 150 ++---
 .../impl/pbtree/memory/ReleaseFlushMonitor.java    |   4 +-
 .../mtree/impl/pbtree/schemafile/RecordUtils.java  |  55 +-
 .../impl/pbtree/schemafile/WrappedSegment.java     |  13 +-
 .../schemafile/pagemgr/BTreePageManager.java       |   8 +-
 .../pbtree/schemafile/pagemgr/PageIOChannel.java   | 184 ++++++
 .../schemafile/pagemgr/PageIndexSortBuckets.java   | 129 ++++
 .../pbtree/schemafile/pagemgr/PageManager.java     | 464 +------------
 .../impl/pbtree/schemafile/pagemgr/PagePool.java   | 152 +++++
 .../schemafile/pagemgr/SchemaPageContext.java      | 109 ++++
 .../java/org/apache/iotdb/db/service/DataNode.java |  24 +-
 .../db/service/metrics/DataNodeMetricsHelper.java  |   2 +-
 .../iotdb/db/service/metrics/WritingMetrics.java   |  16 +-
 .../iotdb/db/storageengine/StorageEngine.java      |  19 +-
 .../db/storageengine/dataregion/DataRegion.java    |  18 +-
 .../impl/ReadChunkCompactionPerformer.java         |  19 +-
 .../execute/task/CompactionTaskSummary.java        |  44 +-
 .../execute/task/InnerSpaceCompactionTask.java     |   2 +
 .../task/InsertionCrossSpaceCompactionTask.java    |   8 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   |   4 +
 .../execute/utils/executor/ModifiedStatus.java}    |  15 +-
 .../fast/AlignedSeriesCompactionExecutor.java      |   1 +
 .../fast/NonAlignedSeriesCompactionExecutor.java   |   1 +
 .../executor/fast/SeriesCompactionExecutor.java    |   6 +-
 .../ReadChunkAlignedSeriesCompactionExecutor.java  | 466 ++++++++++++++
 .../readchunk/SingleSeriesCompactionExecutor.java  |   4 +-
 .../executor/readchunk/loader/ChunkLoader.java     | 104 +++
 .../readchunk/loader/InstantChunkLoader.java       | 105 +++
 .../readchunk/loader/InstantPageLoader.java        | 103 +++
 .../executor/readchunk/loader/PageLoader.java      | 107 +++
 .../execute/utils/reader/PointPriorityReader.java  |   2 +-
 .../compaction/io/CompactionTsFileReader.java      |  18 +
 .../compaction/schedule/CompactionScheduler.java   |   2 +-
 .../compaction/schedule/CompactionTaskManager.java |   3 +-
 .../compaction/schedule/CompactionWorker.java      |   3 +
 .../impl/SizeTieredCompactionSelector.java         |  41 +-
 .../InsertionCrossCompactionTaskResource.java      |  25 +
 .../dataregion/memtable/AbstractMemTable.java      |  18 +
 .../dataregion/memtable/IMemTable.java             |   2 +
 .../dataregion/memtable/TsFileProcessor.java       |   7 +
 .../dataregion/tsfile/TsFileResource.java          |  26 +-
 .../dataregion/utils/TsFileResourceUtils.java      |  75 +--
 .../dataregion/wal/buffer/WALBuffer.java           |  59 +-
 .../wal/checkpoint/CheckpointManager.java          |  50 +-
 .../dataregion/wal/checkpoint/MemTableInfo.java    |  22 +-
 .../dataregion/wal/io/WALByteBufReader.java        |  26 +-
 .../dataregion/wal/io/WALMetaData.java             |  76 ++-
 .../storageengine/dataregion/wal/node/WALNode.java | 213 +++---
 .../dataregion/wal/recover/WALNodeRecoverTask.java |  41 +-
 .../wal/recover/file/TsFilePlanRedoer.java         |   2 +
 .../db/tools/schema/PBTreeFileSketchTool.java      |   2 +-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |  29 +-
 .../iotdb/db/utils/constant/SqlConstant.java       |   2 -
 .../metadata/mtree/schemafile/SchemaFileTest.java  |   2 +-
 .../schemaRegion/SchemaStatisticsTest.java         |   2 +-
 .../pipe/event/PipeTabletInsertionEventTest.java   |  93 ++-
 .../execution/aggregation/AccumulatorTest.java     |  17 +-
 .../plan/planner/PipelineBuilderTest.java          |  55 ++
 .../storageengine/dataregion/DataRegionTest.java   |  10 +-
 .../compaction/CompactionValidationTest.java       | 103 +++
 .../FastInnerCompactionPerformerTest.java          | 108 ++++
 .../compaction/ReadChunkInnerCompactionTest.java   | 144 +++++
 .../InsertionCrossSpaceCompactionSelectorTest.java |   4 +-
 .../cross/InsertionCrossSpaceCompactionTest.java   |   6 +-
 ...nkCompactionPerformerWithAlignedSeriesTest.java | 715 +++++++++++++++++++++
 .../compaction/utils/CompactionCheckerUtils.java   |   8 +-
 .../compaction/utils/CompactionTestFileWriter.java |   8 +-
 .../dataregion/wal/node/WALEntryHandlerTest.java   |  13 +-
 .../wal/node/WalDeleteOutdatedNewTest.java         | 585 +++++++++++++++++
 .../wal/recover/WALRecoverWriterTest.java          |   9 +-
 .../resources/conf/iotdb-cluster.properties        |   5 +-
 .../resources/conf/iotdb-common.properties         |  49 +-
 .../sbin/{clean-all.bat => destroy-all.bat}        |  22 +-
 .../sbin/{clean-all.sh => destroy-all.sh}          |  59 +-
 ...clean-confignode.bat => destroy-confignode.bat} |  34 +-
 .../{clean-confignode.sh => destroy-confignode.sh} |   7 +-
 .../{clean-datanode.bat => destroy-datanode.bat}   |  14 +-
 .../{clean-datanode.sh => destroy-datanode.sh}     |   7 +-
 .../src/assembly/resources/sbin/start-all.sh       |   9 +-
 .../src/assembly/resources/sbin/stop-all.sh        |  15 +-
 .../commons/auth/authorizer/BasicAuthorizer.java   |   1 -
 .../iotdb/commons/auth/role/BasicRoleManager.java  |   6 +
 .../commons/auth/role/LocalFileRoleAccessor.java   |  12 +-
 .../commons/auth/role/LocalFileRoleManager.java    |   6 +
 .../iotdb/commons/auth/user/BasicUserManager.java  |   1 +
 .../commons/auth/user/LocalFileUserAccessor.java   |  23 +-
 .../commons/auth/user/LocalFileUserManager.java    |   5 +
 .../iotdb/commons/client/ClientPoolFactory.java    |  29 +-
 .../client/property/ClientPoolProperty.java        |  35 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  48 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |  33 +-
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  90 ++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  20 +-
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |   4 +
 .../commons/pipe/task/subtask/PipeSubtask.java     |  15 +-
 .../schema/view/viewExpression/ViewExpression.java |  18 +
 .../service/metric/JvmGcMonitorMetrics.java        |  10 +-
 .../service/metric}/cpu/CpuUsageMetrics.java       |  23 +-
 .../org/apache/iotdb/commons/utils/FileUtils.java  |   8 +-
 .../iotdb/commons/client/ClientManagerTest.java    |  47 +-
 .../file/metadata/AlignedTimeSeriesMetadata.java   |   1 +
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |   3 +-
 .../tsfile/read/reader/page/AlignedPageReader.java |   7 +-
 .../thrift-commons/src/main/thrift/common.thrift   |   2 -
 .../src/main/thrift/confignode.thrift              |   1 +
 pom.xml                                            |   2 +-
 207 files changed, 6660 insertions(+), 2416 deletions(-)

diff --cc 
integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java
index a92eab034bb,00000000000..648b9e7b388
mode 100644,000000..100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByAlignedSeriesIT.java
@@@ -1,62 -1,0 +1,63 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.it.aggregation.maxby;
 +
 +import org.apache.iotdb.it.env.EnvFactory;
++
 +import org.junit.BeforeClass;
 +
 +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
 +
- public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT{
-     protected static final String[] ALIGNED_DATASET =
-             new String[] {
-                     // x input
-                     "CREATE ALIGNED TIMESERIES root.db.d1(x1 INT32, x2 INT64, 
x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)",
-                     // y input
-                     "CREATE ALIGNED TIMESERIES root.db.d1(y1 INT32, y2 INT64, 
y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)",
-                     "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) 
values(1, 1, 1, 1, 1, true, \"1\")",
-                     "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) 
values(2, 2, 2, 2, 2, false, \"2\")",
-                     "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) 
values(3, 3, 3, 3, 3, false, \"3\")",
-                     "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) 
values(2, 2, 2, 2, 2, true, \"4\")",
-                     "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) 
values(3, 3, 3, 3, 3, false, \"3\")",
-                     "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) 
values(4, 4, 4, 4, 4, false, \"4\")",
-                     "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) 
values(8, 3, 3, 3, 3, false, \"3\")",
-                     "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) 
values(8, 8, 8, 8, 8, false, \"4\")",
-                     "flush",
-                     // For Align By Device
-                     "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, 
x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)",
-                     "CREATE ALIGNED TIMESERIES root.db.d2(y1 INT32, y2 INT64, 
y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)",
-                     "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) 
values(1, 1, 1, 1, 1, true, \"1\")",
-                     "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) 
values(2, 2, 2, 2, 2, false, \"2\")",
-                     "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) 
values(3, 3, 3, 3, 3, false, \"3\")",
-                     "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) 
values(2, 2, 2, 2, 2, true, \"4\")",
-                     "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) 
values(3, 3, 3, 3, 3, false, \"3\")",
-                     "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) 
values(4, 4, 4, 4, 4, false, \"4\")",
-                     "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) 
values(8, 3, 3, 3, 3, false, \"3\")",
-                     "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) 
values(8, 8, 8, 8, 8, false, \"4\")",
-             };
++public class IoTDBMaxByAlignedSeriesIT extends IoTDBMaxByIT {
++  protected static final String[] ALIGNED_DATASET =
++      new String[] {
++        // x input
++        "CREATE ALIGNED TIMESERIES root.db.d1(x1 INT32, x2 INT64, x3 FLOAT, 
x4 DOUBLE, x5 BOOLEAN, x6 TEXT)",
++        // y input
++        "CREATE ALIGNED TIMESERIES root.db.d1(y1 INT32, y2 INT64, y3 FLOAT, 
y4 DOUBLE, y5 BOOLEAN, y6 TEXT)",
++        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
++        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
++        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
++        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 
4, 4, false, \"4\")",
++        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
++        "flush",
++        // For Align By Device
++        "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, x3 FLOAT, 
x4 DOUBLE, x5 BOOLEAN, x6 TEXT)",
++        "CREATE ALIGNED TIMESERIES root.db.d2(y1 INT32, y2 INT64, y3 FLOAT, 
y4 DOUBLE, y5 BOOLEAN, y6 TEXT)",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
++        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 
4, 4, false, \"4\")",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
++      };
 +
-     @BeforeClass
-     public static void setUp() throws Exception {
-         
EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000);
-         EnvFactory.getEnv().initClusterEnvironment();
-         prepareData(ALIGNED_DATASET);
-     }
++  @BeforeClass
++  public static void setUp() throws Exception {
++    
EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000);
++    EnvFactory.getEnv().initClusterEnvironment();
++    prepareData(ALIGNED_DATASET);
++  }
 +}
diff --cc 
integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
index 891d5cd96e6,00000000000..1cde3f239b8
mode 100644,000000..100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
@@@ -1,362 -1,0 +1,411 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.it.aggregation.maxby;
 +
 +import org.apache.iotdb.it.env.EnvFactory;
 +import org.apache.iotdb.it.framework.IoTDBTestRunner;
 +import org.apache.iotdb.itbase.category.ClusterIT;
 +import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 +
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +import org.junit.runner.RunWith;
 +
 +import java.sql.Connection;
 +import java.sql.ResultSet;
 +import java.sql.Statement;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
 +import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
 +import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
 +import static org.apache.iotdb.db.utils.constant.TestConstant.maxBy;
 +import static org.apache.iotdb.itbase.constant.TestConstant.DEVICE;
 +import static org.junit.Assert.fail;
 +
 +@RunWith(IoTDBTestRunner.class)
 +@Category({LocalStandaloneIT.class, ClusterIT.class})
 +public class IoTDBMaxByIT {
 +  protected static final String[] NON_ALIGNED_DATASET =
 +      new String[] {
 +        "CREATE DATABASE root.db",
 +        // x input
 +        "CREATE TIMESERIES root.db.d1.x1 WITH DATATYPE=INT32, ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.x2 WITH DATATYPE=INT64, ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.x3 WITH DATATYPE=FLOAT, ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.x4 WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.x5 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.x6 WITH DATATYPE=TEXT, ENCODING=PLAIN",
 +        // y input
 +        "CREATE TIMESERIES root.db.d1.y1 WITH DATATYPE=INT32, ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.y2 WITH DATATYPE=INT64, ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.y3 WITH DATATYPE=FLOAT, ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.y4 WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.y5 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
 +        "CREATE TIMESERIES root.db.d1.y6 WITH DATATYPE=TEXT, ENCODING=PLAIN",
 +        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
 +        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
 +        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
 +        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
 +        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
 +        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 
4, 4, false, \"4\")",
-               "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 
3, 3, 3, 3, false, \"3\")",
-               "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 
8, 8, 8, 8, false, \"4\")",
++        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
 +        "flush",
 +
-               // For Align By Device
-               "CREATE TIMESERIES root.db.d2.x1 WITH DATATYPE=INT32, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.x2 WITH DATATYPE=INT64, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.x3 WITH DATATYPE=FLOAT, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.x4 WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.x5 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.x6 WITH DATATYPE=TEXT, 
ENCODING=PLAIN",
-               // y input
-               "CREATE TIMESERIES root.db.d2.y1 WITH DATATYPE=INT32, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.y2 WITH DATATYPE=INT64, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.y3 WITH DATATYPE=FLOAT, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.y4 WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.y5 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
-               "CREATE TIMESERIES root.db.d2.y6 WITH DATATYPE=TEXT, 
ENCODING=PLAIN",
-               "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 
1, 1, 1, 1, true, \"1\")",
-               "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 
2, 2, 2, 2, false, \"2\")",
-               "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 
3, 3, 3, 3, false, \"3\")",
-               "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 
4, 4, 4, 4, true, \"1\")",
-               "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 
2, 2, 2, 2, true, \"4\")",
-               "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 
3, 3, 3, 3, false, \"3\")",
-               "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 
1, 1, 1, 1, false, \"1\")",
-               "flush"
++        // For Align By Device
++        "CREATE TIMESERIES root.db.d2.x1 WITH DATATYPE=INT32, ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.x2 WITH DATATYPE=INT64, ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.x3 WITH DATATYPE=FLOAT, ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.x4 WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.x5 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.x6 WITH DATATYPE=TEXT, ENCODING=PLAIN",
++        // y input
++        "CREATE TIMESERIES root.db.d2.y1 WITH DATATYPE=INT32, ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.y2 WITH DATATYPE=INT64, ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.y3 WITH DATATYPE=FLOAT, ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.y4 WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.y5 WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
++        "CREATE TIMESERIES root.db.d2.y6 WITH DATATYPE=TEXT, ENCODING=PLAIN",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 4, 4, 
4, 4, true, \"1\")",
++        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
++        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
++        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 
1, 1, false, \"1\")",
++        "flush"
 +      };
 +
 +  protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy:";
 +
 +  @BeforeClass
 +  public static void setUp() throws Exception {
 +    
EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000);
 +    EnvFactory.getEnv().initClusterEnvironment();
 +    prepareData(NON_ALIGNED_DATASET);
 +  }
 +
 +  @AfterClass
 +  public static void tearDown() throws Exception {
 +    EnvFactory.getEnv().cleanClusterEnvironment();
 +  }
 +
 +  @Test
 +  public void testMaxByWithUnsupportedYInputTypes() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
 +        Statement statement = connection.createStatement()) {
 +      try {
 +        try (ResultSet resultSet =
 +            statement.executeQuery("SELECT max_by(x1, y5) FROM root.db.d1")) {
 +          resultSet.next();
 +          fail();
 +        }
 +      } catch (Exception e) {
 +        Assert.assertTrue(e.getMessage(), 
e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE));
 +      }
 +      try {
 +        try (ResultSet resultSet =
 +            statement.executeQuery("SELECT max_by(x1, y6) FROM root.db.d1")) {
 +          resultSet.next();
 +          fail();
 +        }
 +      } catch (Exception e) {
 +        Assert.assertTrue(e.getMessage(), 
e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE));
 +      }
 +      try {
 +        try (ResultSet resultSet =
 +            statement.executeQuery("SELECT max_by(x5, y5) FROM root.db.d1")) {
 +          resultSet.next();
 +          fail();
 +        }
 +      } catch (Exception e) {
 +        Assert.assertTrue(e.getMessage(), 
e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE));
 +      }
 +      try {
 +        try (ResultSet resultSet =
 +            statement.executeQuery("SELECT max_by(x5, y6) FROM root.db.d1")) {
 +          resultSet.next();
 +          fail();
 +        }
 +      } catch (Exception e) {
 +        Assert.assertTrue(e.getMessage(), 
e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE));
 +      }
 +      try {
 +        try (ResultSet resultSet =
 +            statement.executeQuery("SELECT max_by(x6, y5) FROM root.db.d1")) {
 +          resultSet.next();
 +          fail();
 +        }
 +      } catch (Exception e) {
 +        Assert.assertTrue(e.getMessage(), 
e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE));
 +      }
 +      try {
 +        try (ResultSet resultSet =
 +            statement.executeQuery("SELECT max_by(x6, y6) FROM root.db.d1")) {
 +          resultSet.next();
 +          fail();
 +        }
 +      } catch (Exception e) {
 +        Assert.assertTrue(e.getMessage(), 
e.getMessage().contains(UNSUPPORTED_TYPE_MESSAGE));
 +      }
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testMaxByWithDifferentXAndYInputTypes() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
 +        Statement statement = connection.createStatement()) {
 +      Map<String, String[]> expectedHeaders =
 +          generateExpectedHeadersForMaxByTest(
-               "root.db.d1", new String[] {"x1", "x2", "x3", "x4", "x5", 
"x6"}, new String[] {"y1", "y2", "y3", "y4"});
++              "root.db.d1",
++              new String[] {"x1", "x2", "x3", "x4", "x5", "x6"},
++              new String[] {"y1", "y2", "y3", "y4"});
 +      String[] retArray = new String[] {"3,3,3.0,3.0,false,3,"};
 +      for (Map.Entry<String, String[]> expectedHeader : 
expectedHeaders.entrySet()) {
 +        String y = expectedHeader.getKey();
 +        resultSetEqualTest(
 +            String.format(
 +                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 3",
 +                y, y, y, y, y, y),
 +            expectedHeader.getValue(),
 +            retArray);
 +      }
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testMaxByWithDifferentXAndYInputTypesAndNullXValue() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
 +        Statement statement = connection.createStatement()) {
 +      Map<String, String[]> expectedHeaders =
 +          generateExpectedHeadersForMaxByTest(
-               "root.db.d1", new String[] {"x1", "x2", "x3", "x4", "x5", 
"x6"},new String[] {"y1", "y2", "y3", "y4"});
++              "root.db.d1",
++              new String[] {"x1", "x2", "x3", "x4", "x5", "x6"},
++              new String[] {"y1", "y2", "y3", "y4"});
 +      String[] retArray = new String[] {"null,null,null,null,null,null,"};
 +      for (Map.Entry<String, String[]> expectedHeader : 
expectedHeaders.entrySet()) {
 +        String y = expectedHeader.getKey();
 +        resultSetEqualTest(
 +            String.format(
 +                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 4",
 +                y, y, y, y, y, y),
 +            expectedHeader.getValue(),
 +            retArray);
 +      }
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testMaxByWithDifferentYInputTypesAndXAsTime() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
 +        Statement statement = connection.createStatement()) {
 +      Map<String, String[]> expectedHeaders =
 +          generateExpectedHeadersForMaxByTest(
-               "root.db.d1", new String[] {"Time", "Time", "Time", "Time", 
"Time", "Time"},new String[] {"y1", "y2", "y3", "y4"});
++              "root.db.d1",
++              new String[] {"Time", "Time", "Time", "Time", "Time", "Time"},
++              new String[] {"y1", "y2", "y3", "y4"});
 +      String[] retArray = new String[] {"3,3,3,3,3,3,"};
 +      for (Map.Entry<String, String[]> expectedHeader : 
expectedHeaders.entrySet()) {
 +        String y = expectedHeader.getKey();
 +        resultSetEqualTest(
 +            String.format(
 +                "select 
max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s)
 from root.db.d1 where time <= 3",
 +                y, y, y, y, y, y),
 +            expectedHeader.getValue(),
 +            retArray);
 +      }
 +      String[] retArray1 = new String[] {"4,4,4,4,4,4,"};
 +      for (Map.Entry<String, String[]> expectedHeader : 
expectedHeaders.entrySet()) {
 +        String y = expectedHeader.getKey();
 +        resultSetEqualTest(
-                 String.format(
-                         "select 
max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s)
 from root.db.d1 where time <= 4",
-                         y, y, y, y, y, y),
-                 expectedHeader.getValue(),
-                 retArray1);
++            String.format(
++                "select 
max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s)
 from root.db.d1 where time <= 4",
++                y, y, y, y, y, y),
++            expectedHeader.getValue(),
++            retArray1);
 +      }
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testMaxByWithExpression() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
-          Statement statement = connection.createStatement()) {
-       String[] expectedHeader = new String[]{"max_by(root.db.d1.x1 + 1 - 3, 
-cos(sin(root.db.d1.y2 / 10)))","max_by(root.db.d1.x2 * 2 / 3, 
-cos(sin(root.db.d1.y2 / 10)))","max_by(floor(root.db.d1.x3), 
-cos(sin(root.db.d1.y2 / 10)))","max_by(ceil(root.db.d1.x4), 
-cos(sin(root.db.d1.y2 / 10)))","max_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 
10)))","max_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 / 
10)))",};
++        Statement statement = connection.createStatement()) {
++      String[] expectedHeader =
++          new String[] {
++            "max_by(root.db.d1.x1 + 1 - 3, -cos(sin(root.db.d1.y2 / 10)))",
++            "max_by(root.db.d1.x2 * 2 / 3, -cos(sin(root.db.d1.y2 / 10)))",
++            "max_by(floor(root.db.d1.x3), -cos(sin(root.db.d1.y2 / 10)))",
++            "max_by(ceil(root.db.d1.x4), -cos(sin(root.db.d1.y2 / 10)))",
++            "max_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 10)))",
++            "max_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 
/ 10)))",
++          };
 +      String[] retArray = new String[] {"1.0,2.0,3.0,3.0,false,4,"};
 +      String y = "-cos(sin(y2 / 10))";
 +      resultSetEqualTest(
-               String.format(
-                       "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 
3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, 
'3', '4'),%s) from root.db.d1 where time <= 3",
-                       y, y, y, y, y, y),
-               expectedHeader,
-               retArray);
++          String.format(
++              "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 
3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, 
'3', '4'),%s) from root.db.d1 where time <= 3",
++              y, y, y, y, y, y),
++          expectedHeader,
++          retArray);
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testMaxByWithAlignByDevice() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
-          Statement statement = connection.createStatement()) {
-       String[] expectedHeader = new String[]{DEVICE, "max_by(x1 + 1 - 3, 
-cos(sin(y2 / 10)))","max_by(x2 * 2 / 3, -cos(sin(y2 / 
10)))","max_by(floor(x3), -cos(sin(y2 / 10)))","max_by(ceil(x4), -cos(sin(y2 / 
10)))","max_by(x5, -cos(sin(y2 / 10)))","max_by(REPLACE(x6, '3', '4'), 
-cos(sin(y2 / 10)))",};
-       String[] retArray = new String[] 
{"root.db.d1,1.0,2.0,3.0,3.0,false,4,",  "root.db.d2,1.0,2.0,3.0,3.0,false,4,"};
++        Statement statement = connection.createStatement()) {
++      String[] expectedHeader =
++          new String[] {
++            DEVICE,
++            "max_by(x1 + 1 - 3, -cos(sin(y2 / 10)))",
++            "max_by(x2 * 2 / 3, -cos(sin(y2 / 10)))",
++            "max_by(floor(x3), -cos(sin(y2 / 10)))",
++            "max_by(ceil(x4), -cos(sin(y2 / 10)))",
++            "max_by(x5, -cos(sin(y2 / 10)))",
++            "max_by(REPLACE(x6, '3', '4'), -cos(sin(y2 / 10)))",
++          };
++      String[] retArray =
++          new String[] {
++            "root.db.d1,1.0,2.0,3.0,3.0,false,4,", 
"root.db.d2,1.0,2.0,3.0,3.0,false,4,"
++          };
 +      String y = "-cos(sin(y2 / 10))";
 +      resultSetEqualTest(
-               String.format(
-                       "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 
3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, 
'3', '4'),%s) from root.db.** where time <= 3 align by device",
-                       y, y, y, y, y, y),
-               expectedHeader,
-               retArray);
++          String.format(
++              "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 
3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, 
'3', '4'),%s) from root.db.** where time <= 3 align by device",
++              y, y, y, y, y, y),
++          expectedHeader,
++          retArray);
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testMaxByWithGroupBy() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
-          Statement statement = connection.createStatement()) {
-       String[] expectedHeader = new String[]{TIMESTAMP_STR, 
"max_by(root.db.d1.x1, root.db.d1.y2)","max_by(root.db.d1.x2, 
root.db.d1.y2)","max_by(root.db.d1.x3, root.db.d1.y2)","max_by(root.db.d1.x4, 
root.db.d1.y2)","max_by(root.db.d1.x5, root.db.d1.y2)","max_by(root.db.d1.x6, 
root.db.d1.y2)",};
-       String[] retArray = new String[] {"0,3,3,3.0,3.0,false,3,",  
"4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3,"};
++        Statement statement = connection.createStatement()) {
++      String[] expectedHeader =
++          new String[] {
++            TIMESTAMP_STR,
++            "max_by(root.db.d1.x1, root.db.d1.y2)",
++            "max_by(root.db.d1.x2, root.db.d1.y2)",
++            "max_by(root.db.d1.x3, root.db.d1.y2)",
++            "max_by(root.db.d1.x4, root.db.d1.y2)",
++            "max_by(root.db.d1.x5, root.db.d1.y2)",
++            "max_by(root.db.d1.x6, root.db.d1.y2)",
++          };
++      String[] retArray =
++          new String[] {
++            "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", 
"8,3,3,3.0,3.0,false,3,"
++          };
 +      String y = "y2";
 +      resultSetEqualTest(
-               String.format(
-                       "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms)",
-                       y, y, y, y, y, y),
-               expectedHeader,
-               retArray);
++          String.format(
++              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms)",
++              y, y, y, y, y, y),
++          expectedHeader,
++          retArray);
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testMaxByWithSlidingWindow() {
 +    try (Connection connection = EnvFactory.getEnv().getConnection();
-          Statement statement = connection.createStatement()) {
-       String[] expectedHeader = new String[]{TIMESTAMP_STR, 
"max_by(root.db.d1.x1, root.db.d1.y2)","max_by(root.db.d1.x2, 
root.db.d1.y2)","max_by(root.db.d1.x3, root.db.d1.y2)","max_by(root.db.d1.x4, 
root.db.d1.y2)","max_by(root.db.d1.x5, root.db.d1.y2)","max_by(root.db.d1.x6, 
root.db.d1.y2)",};
-       String[] retArray = new String[] {"0,3,3,3.0,3.0,false,3,",  
"4,null,null,null,null,null,null,", "8,3,3,3.0,3.0,false,3,"};
++        Statement statement = connection.createStatement()) {
++      String[] expectedHeader =
++          new String[] {
++            TIMESTAMP_STR,
++            "max_by(root.db.d1.x1, root.db.d1.y2)",
++            "max_by(root.db.d1.x2, root.db.d1.y2)",
++            "max_by(root.db.d1.x3, root.db.d1.y2)",
++            "max_by(root.db.d1.x4, root.db.d1.y2)",
++            "max_by(root.db.d1.x5, root.db.d1.y2)",
++            "max_by(root.db.d1.x6, root.db.d1.y2)",
++          };
++      String[] retArray =
++          new String[] {
++            "0,3,3,3.0,3.0,false,3,", "4,null,null,null,null,null,null,", 
"8,3,3,3.0,3.0,false,3,"
++          };
 +      String y = "y2";
 +      resultSetEqualTest(
-               String.format(
-                       "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms)",
-                       y, y, y, y, y, y),
-               expectedHeader,
-               retArray);
++          String.format(
++              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms)",
++              y, y, y, y, y, y),
++          expectedHeader,
++          retArray);
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
- 
 +  // test max_by different types of x
 +  // test max_by time
 +  // test max_by with expression
 +  // test max_by align by device
 +  // test max_by group by time
 +  // test max_by sliding window
 +  // test max_by aligned series
 +  // test max_by multi data region
 +  // test max_by group by level
 +  // test max_by having
 +
 +  /** @return yInput -> expectedHeader */
 +  private Map<String, String[]> generateExpectedHeadersForMaxByTest(
 +      String device, String[] xInput, String[] yInput) {
 +    Map<String, String[]> res = new HashMap<>();
 +    Arrays.stream(yInput)
 +        .forEach(
 +            y -> {
 +              res.put(
 +                  y,
 +                  Arrays.stream(xInput)
 +                      .map(x -> maxBy("Time".equals(x) ? x : device + "." + 
x, device + "." + y))
 +                      .toArray(String[]::new));
 +            });
 +    return res;
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
index 21add5ae17e,00000000000..47092e43962
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
@@@ -1,347 -1,0 +1,407 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.aggregation;
 +
 +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
++import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 +import org.apache.iotdb.tsfile.read.common.block.column.Column;
 +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
++import org.apache.iotdb.tsfile.utils.Binary;
 +import org.apache.iotdb.tsfile.utils.BitMap;
++import org.apache.iotdb.tsfile.utils.BytesUtils;
 +import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 +
++import java.io.ByteArrayOutputStream;
++import java.io.DataOutputStream;
++import java.io.IOException;
++import java.util.Collections;
++
 +import static com.google.common.base.Preconditions.checkArgument;
 +
 +/** max(x,y) returns the value of x associated with the maximum value of y 
over all input values. */
 +public class MaxByAccumulator implements Accumulator {
 +
 +  private final TSDataType xDataType;
 +
 +  private final TSDataType yDataType;
 +
 +  private final TsPrimitiveType yMaxValue;
 +
 +  private final TsPrimitiveType xResult;
 +
 +  private boolean xNull = true;
 +
 +  private boolean initResult;
 +
 +  private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy: %s";
 +
 +  public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
 +    this.xDataType = xDataType;
 +    this.yDataType = yDataType;
 +    this.xResult = TsPrimitiveType.getByType(xDataType);
 +    this.yMaxValue = TsPrimitiveType.getByType(yDataType);
 +  }
 +
 +  // Column should be like: | Time | x | y |
 +  @Override
 +  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
++    checkArgument(column.length == 3, "Length of input Column[] for MaxBy 
should be 3");
 +    switch (yDataType) {
 +      case INT32:
 +        addIntInput(column, bitMap, lastIndex);
 +        return;
 +      case INT64:
 +        addLongInput(column, bitMap, lastIndex);
 +        return;
 +      case FLOAT:
 +        addFloatInput(column, bitMap, lastIndex);
 +        return;
 +      case DOUBLE:
 +        addDoubleInput(column, bitMap, lastIndex);
 +        return;
 +      case TEXT:
 +      case BOOLEAN:
 +      default:
 +        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
 +    }
 +  }
 +
 +  // partialResult should be like: | partialMaxByBinary |
 +  @Override
 +  public void addIntermediate(Column[] partialResult) {
-     checkArgument(partialResult.length == 2, "partialResult of MaxBy should 
be 2");
++    checkArgument(partialResult.length == 1, "partialResult of MaxBy should 
be 1");
 +    // Return if y is null.
 +    if (partialResult[0].isNull(0)) {
 +      return;
 +    }
-     switch (yDataType) {
-       case INT32:
-         updateIntResult(partialResult[1].getInt(0), partialResult[0], 0);
-         break;
-       case INT64:
-         updateLongResult(partialResult[1].getLong(0), partialResult[0], 0);
-         break;
-       case FLOAT:
-         updateFloatResult(partialResult[1].getFloat(0), partialResult[0], 0);
-         break;
-       case DOUBLE:
-         updateDoubleResult(partialResult[1].getDouble(0), partialResult[0], 
0);
-         break;
-       case TEXT:
-       case BOOLEAN:
-       default:
-         throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
-     }
++    byte[] bytes = partialResult[0].getBinary(0).getValues();
++    updateFromBytesIntermediateInput(bytes);
 +  }
 +
 +  @Override
 +  public void addStatistics(Statistics statistics) {
 +    throw new UnsupportedOperationException(getClass().getName());
 +  }
 +
 +  // finalResult should be single column, like: | finalXValue |
 +  @Override
 +  public void setFinal(Column finalResult) {
 +    if (finalResult.isNull(0)) {
 +      return;
 +    }
 +    initResult = true;
-     if (finalResult.isNull(0)) {
-       xNull = true;
-       return;
-     }
-     switch (xDataType) {
-       case INT32:
-         xResult.setInt(finalResult.getInt(0));
-         break;
-       case INT64:
-         xResult.setLong(finalResult.getLong(0));
-         break;
-       case FLOAT:
-         xResult.setFloat(finalResult.getFloat(0));
-         break;
-       case DOUBLE:
-         xResult.setDouble(finalResult.getDouble(0));
-         break;
-       case TEXT:
-         xResult.setBinary(finalResult.getBinary(0));
-         break;
-       case BOOLEAN:
-         xResult.setBoolean(finalResult.getBoolean(0));
-         break;
-       default:
-         throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
xDataType));
-     }
++    updateX(finalResult, 0);
 +  }
 +
-   // columnBuilders should be like | xColumnBuilder | yColumnBuilder |
++  // columnBuilders should be like | TextIntermediateColumnBuilder |
 +  @Override
 +  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-     checkArgument(columnBuilders.length == 2, "partialResult of MaxValue 
should be 2");
++    checkArgument(columnBuilders.length == 1, "partialResult of MaxValue 
should be 1");
 +    if (!initResult) {
 +      columnBuilders[0].appendNull();
-       columnBuilders[1].appendNull();
 +      return;
 +    }
-     switch (yDataType) {
-       case INT32:
-         writeX(columnBuilders[0]);
-         columnBuilders[1].writeInt(yMaxValue.getInt());
-         break;
-       case INT64:
-         writeX(columnBuilders[0]);
-         columnBuilders[1].writeLong(yMaxValue.getLong());
-         break;
-       case FLOAT:
-         writeX(columnBuilders[0]);
-         columnBuilders[1].writeFloat(yMaxValue.getFloat());
-         break;
-       case DOUBLE:
-         writeX(columnBuilders[0]);
-         columnBuilders[1].writeDouble(yMaxValue.getDouble());
-         break;
-       case TEXT:
-       case BOOLEAN:
-       default:
-         throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
-     }
++    columnBuilders[0].writeBinary(new Binary(serialize()));
 +  }
 +
 +  @Override
 +  public void outputFinal(ColumnBuilder columnBuilder) {
 +    if (!initResult) {
 +      columnBuilder.appendNull();
 +      return;
 +    }
 +    writeX(columnBuilder);
 +  }
 +
 +  @Override
 +  public void reset() {
 +    initResult = false;
 +    xNull = true;
 +    this.xResult.reset();
 +    this.yMaxValue.reset();
 +  }
 +
 +  @Override
 +  public boolean hasFinalResult() {
 +    return false;
 +  }
 +
 +  @Override
 +  public TSDataType[] getIntermediateType() {
-     return new TSDataType[] {xDataType, yDataType};
++    return new TSDataType[] {TSDataType.TEXT};
 +  }
 +
 +  @Override
 +  public TSDataType getFinalType() {
 +    return xDataType;
 +  }
 +
 +  private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
 +    for (int i = 0; i <= lastIndex; i++) {
 +      if (bitMap != null && !bitMap.isMarked(i)) {
 +        continue;
 +      }
 +      if (!column[2].isNull(i)) {
 +        updateIntResult(column[2].getInt(i), column[1], i);
 +      }
 +    }
 +  }
 +
 +  private void updateIntResult(int yMaxVal, Column xColumn, int xIndex) {
 +    if (!initResult || yMaxVal > yMaxValue.getInt()) {
 +      initResult = true;
 +      yMaxValue.setInt(yMaxVal);
 +      updateX(xColumn, xIndex);
 +    }
 +  }
 +
 +  private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
 +    for (int i = 0; i <= lastIndex; i++) {
 +      if (bitMap != null && !bitMap.isMarked(i)) {
 +        continue;
 +      }
 +      if (!column[2].isNull(i)) {
 +        updateLongResult(column[2].getLong(i), column[1], i);
 +      }
 +    }
 +  }
 +
 +  private void updateLongResult(long yMaxVal, Column xColumn, int xIndex) {
 +    if (!initResult || yMaxVal > yMaxValue.getLong()) {
 +      initResult = true;
 +      yMaxValue.setLong(yMaxVal);
 +      updateX(xColumn, xIndex);
 +    }
 +  }
 +
 +  private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
 +    for (int i = 0; i <= lastIndex; i++) {
 +      if (bitMap != null && !bitMap.isMarked(i)) {
 +        continue;
 +      }
 +      if (!column[2].isNull(i)) {
 +        updateFloatResult(column[2].getFloat(i), column[1], i);
 +      }
 +    }
 +  }
 +
 +  private void updateFloatResult(float yMaxVal, Column xColumn, int xIndex) {
 +    if (!initResult || yMaxVal > yMaxValue.getFloat()) {
 +      initResult = true;
 +      yMaxValue.setFloat(yMaxVal);
 +      updateX(xColumn, xIndex);
 +    }
 +  }
 +
 +  private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
 +    for (int i = 0; i <= lastIndex; i++) {
 +      if (bitMap != null && !bitMap.isMarked(i)) {
 +        continue;
 +      }
 +      if (!column[2].isNull(i)) {
 +        updateDoubleResult(column[2].getDouble(i), column[1], i);
 +      }
 +    }
 +  }
 +
 +  private void updateDoubleResult(double yMaxVal, Column xColumn, int xIndex) 
{
 +    if (!initResult || yMaxVal > yMaxValue.getDouble()) {
 +      initResult = true;
 +      yMaxValue.setDouble(yMaxVal);
 +      updateX(xColumn, xIndex);
 +    }
 +  }
 +
 +  private void writeX(ColumnBuilder columnBuilder) {
 +    if (xNull) {
 +      columnBuilder.appendNull();
 +      return;
 +    }
 +    switch (xDataType) {
 +      case INT32:
 +        columnBuilder.writeInt(xResult.getInt());
 +        break;
 +      case INT64:
 +        columnBuilder.writeLong(xResult.getLong());
 +        break;
 +      case FLOAT:
 +        columnBuilder.writeFloat(xResult.getFloat());
 +        break;
 +      case DOUBLE:
 +        columnBuilder.writeDouble(xResult.getDouble());
 +        break;
 +      case TEXT:
 +        columnBuilder.writeBinary(xResult.getBinary());
 +        break;
 +      case BOOLEAN:
 +        columnBuilder.writeBoolean(xResult.getBoolean());
 +        break;
 +      default:
 +        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
xDataType));
 +    }
 +  }
 +
 +  private void updateX(Column xColumn, int xIndex) {
 +    if (xColumn.isNull(xIndex)) {
 +      xNull = true;
 +    } else {
 +      xNull = false;
 +      switch (xDataType) {
 +        case INT32:
 +          xResult.setInt(xColumn.getInt(xIndex));
 +          break;
 +        case INT64:
 +          xResult.setLong(xColumn.getLong(xIndex));
 +          break;
 +        case FLOAT:
 +          xResult.setFloat(xColumn.getFloat(xIndex));
 +          break;
 +        case DOUBLE:
 +          xResult.setDouble(xColumn.getDouble(xIndex));
 +          break;
 +        case TEXT:
 +          xResult.setBinary(xColumn.getBinary(xIndex));
 +          break;
 +        case BOOLEAN:
 +          xResult.setBoolean(xColumn.getBoolean(xIndex));
 +          break;
 +        default:
 +          throw new UnSupportedDataTypeException(
 +              String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
 +      }
 +    }
 +  }
++
++  private byte[] serialize() {
++    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
++    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
++    try {
++      writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream);
++      dataOutputStream.writeBoolean(xNull);
++      if (!xNull) {
++        writeIntermediateToStream(xDataType, xResult, dataOutputStream);
++      }
++    } catch (IOException e) {
++      throw new UnsupportedOperationException(
++          "Failed to serialize intermediate result for MaxByAccumulator.", e);
++    }
++    return byteArrayOutputStream.toByteArray();
++  }
++
++  private void writeIntermediateToStream(
++      TSDataType dataType, TsPrimitiveType value, DataOutputStream 
dataOutputStream)
++      throws IOException {
++    switch (dataType) {
++      case INT32:
++        dataOutputStream.writeInt(value.getInt());
++        break;
++      case INT64:
++        dataOutputStream.writeLong(value.getLong());
++        break;
++      case FLOAT:
++        dataOutputStream.writeFloat(value.getFloat());
++        break;
++      case DOUBLE:
++        dataOutputStream.writeDouble(value.getDouble());
++        break;
++      case TEXT:
++        dataOutputStream.writeBytes(value.getBinary().toString());
++        break;
++      case BOOLEAN:
++        dataOutputStream.writeBoolean(value.getBoolean());
++        break;
++      default:
++        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, dataType));
++    }
++  }
++
++  private void updateFromBytesIntermediateInput(byte[] bytes) {
++    int offset = 0;
++    // Use Column to store x value
++    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(xDataType));
++    ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
++    switch (yDataType) {
++      case INT32:
++        int intMaxVal = BytesUtils.bytesToInt(bytes);
++        offset += Integer.BYTES;
++        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
++        updateIntResult(intMaxVal, columnBuilder.build(), 0);
++        break;
++      case INT64:
++        long longMaxVal = BytesUtils.bytesToLong(bytes);
++        offset += Long.BYTES;
++        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
++        updateLongResult(longMaxVal, columnBuilder.build(), 0);
++        break;
++      case FLOAT:
++        float floatMaxVal = BytesUtils.bytesToFloat(bytes);
++        offset += Float.BYTES;
++        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
++        updateFloatResult(floatMaxVal, columnBuilder.build(), 0);
++        break;
++      case DOUBLE:
++        double doubleMaxVal = BytesUtils.bytesToDouble(bytes);
++        offset += Long.BYTES;
++        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
++        updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0);
++        break;
++      case TEXT:
++      case BOOLEAN:
++      default:
++        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
++    }
++  }
++
++  private void readXFromBytesIntermediateInput(
++      byte[] bytes, int offset, ColumnBuilder columnBuilder) {
++    boolean isXNull = BytesUtils.bytesToBool(bytes, offset);
++    offset += 1;
++    if (isXNull) {
++      columnBuilder.appendNull();
++    } else {
++      switch (xDataType) {
++        case INT32:
++          columnBuilder.writeInt(BytesUtils.bytesToInt(bytes, offset));
++          break;
++        case INT64:
++          columnBuilder.writeLong(BytesUtils.bytesToLongFromOffset(bytes, 8, 
offset));
++          break;
++        case FLOAT:
++          columnBuilder.writeFloat(BytesUtils.bytesToFloat(bytes, offset));
++          break;
++        case DOUBLE:
++          columnBuilder.writeDouble(BytesUtils.bytesToDouble(bytes, offset));
++          break;
++        case TEXT:
++          columnBuilder.writeBinary(
++              new Binary(BytesUtils.subBytes(bytes, offset, bytes.length - 
offset)));
++          break;
++        case BOOLEAN:
++          columnBuilder.writeBoolean(BytesUtils.bytesToBool(bytes, offset));
++          break;
++        default:
++          throw new UnSupportedDataTypeException(
++              String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
++      }
++    }
++  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 4d013814ace,15fb330bab2..68dff31c74f
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@@ -698,46 -697,27 +698,35 @@@ public class LogicalPlanBuilder 
  
    public static void updateTypeProviderByPartialAggregation(
        AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) 
{
-     List<TAggregationType> splitAggregations =
+     List<String> partialAggregationsNames =
          
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
-     for (TAggregationType aggregation : splitAggregations) {
-       String functionName = aggregation.toString().toLowerCase();
-       TSDataType aggregationType = 
SchemaUtils.getAggregationType(functionName);
-       String inputExpressionStr =
-           getExpressionStringThatDeterminesReturnType(aggregationDescriptor);
 -    String inputExpressionStr =
 -        
aggregationDescriptor.getInputExpressions().get(0).getExpressionString();
++    String inputExpressionStr = 
getInputExpressionString(aggregationDescriptor);
+     for (String partialAggregationName : partialAggregationsNames) {
+       TSDataType aggregationType = 
SchemaUtils.getAggregationType(partialAggregationName);
        typeProvider.setType(
-           String.format("%s(%s)", functionName, 
aggregationDescriptor.getParametersString()),
+           String.format("%s(%s)", partialAggregationName, inputExpressionStr),
            aggregationType == null ? typeProvider.getType(inputExpressionStr) 
: aggregationType);
      }
    }
  
-   /**
-    * For aggregate functions that accept single input, we return the first 
input Expression. For
-    * aggregate functions that can accept multiple inputs, if the type of 
intermediate results is
-    * determined by the input Expression, we return the corresponding 
Expression used to determine
-    * the type of intermediate results.
-    */
-   private static String getExpressionStringThatDeterminesReturnType(
-       AggregationDescriptor aggregationDescriptor) {
-     switch (aggregationDescriptor.getAggregationType()) {
-       case MAX_BY_Y_INPUT:
-         return 
aggregationDescriptor.getInputExpressions().get(1).getExpressionString();
-       case MAX_BY_X_INPUT:
-       default:
-         return 
aggregationDescriptor.getInputExpressions().get(0).getExpressionString();
++  private static String getInputExpressionString(AggregationDescriptor 
aggregationDescriptor) {
++    // We just process first input Expression of Count_IF
++    if 
(TAggregationType.COUNT_IF.equals(aggregationDescriptor.getAggregationType())) {
++      return 
aggregationDescriptor.getInputExpressions().get(0).getExpressionString();
++    } else {
++      return aggregationDescriptor.getParametersString();
 +    }
 +  }
 +
    public static void updateTypeProviderByPartialAggregation(
        CrossSeriesAggregationDescriptor aggregationDescriptor, TypeProvider 
typeProvider) {
-     List<TAggregationType> splitAggregations =
+     List<String> partialAggregationsNames =
          
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
      PartialPath path = ((TimeSeriesOperand) 
aggregationDescriptor.getOutputExpression()).getPath();
-     for (TAggregationType aggregationType : splitAggregations) {
-       String functionName = aggregationType.toString().toLowerCase();
+     for (String partialAggregationName : partialAggregationsNames) {
        typeProvider.setType(
-           String.format("%s(%s)", functionName, path.getFullPath()),
-           SchemaUtils.getSeriesTypeByPath(path, functionName));
+           String.format("%s(%s)", partialAggregationName, path.getFullPath()),
+           SchemaUtils.getSeriesTypeByPath(path, partialAggregationName));
      }
    }
  
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 3c223cd54ee,3b32baef3fa..5192f5a0186
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@@ -34,8 -34,9 +34,10 @@@ import java.util.Iterator
  import java.util.List;
  import java.util.Map;
  import java.util.Objects;
 +import java.util.stream.Collectors;
  
+ import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.addPartialSuffix;
+ 
  public class AggregationDescriptor {
  
    // aggregation function type
@@@ -152,28 -148,23 +154,26 @@@
            outputAggregationNames.add(SqlConstant.MIN_TIME);
            break;
          case STDDEV:
-           outputAggregationNames.add(SqlConstant.STDDEV);
+           outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV));
            break;
          case STDDEV_POP:
-           outputAggregationNames.add(SqlConstant.STDDEV_POP);
+           
outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_POP));
            break;
          case STDDEV_SAMP:
-           outputAggregationNames.add(SqlConstant.STDDEV_SAMP);
+           
outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_SAMP));
            break;
          case VARIANCE:
-           outputAggregationNames.add(SqlConstant.VARIANCE);
+           outputAggregationNames.add(addPartialSuffix(SqlConstant.VARIANCE));
            break;
          case VAR_POP:
-           outputAggregationNames.add(SqlConstant.VAR_POP);
+           outputAggregationNames.add(addPartialSuffix(SqlConstant.VAR_POP));
            break;
          case VAR_SAMP:
-           outputAggregationNames.add(SqlConstant.VAR_SAMP);
+           outputAggregationNames.add(addPartialSuffix(SqlConstant.VAR_SAMP));
            break;
 +        case MAX_BY:
-           // max_by(x, y) takes x and y as the input
-           outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_X);
-           outputAggregationNames.add(SqlConstant.MAX_BY_INPUT_Y);
++          outputAggregationNames.add(addPartialSuffix(SqlConstant.MAX_BY));
 +          break;
          default:
            outputAggregationNames.add(aggregationFuncName);
        }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 4e6e9619669,ddddec125d1..ec88324163e
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@@ -136,9 -138,9 +138,10 @@@ public class SchemaUtils 
        case SqlConstant.MIN_VALUE:
        case SqlConstant.MAX_VALUE:
        case SqlConstant.MODE:
 +      case SqlConstant.MAX_BY:
-       default:
          return null;
+       default:
+         return TSDataType.TEXT;
      }
    }
  
@@@ -186,30 -187,28 +189,30 @@@
      }
    }
  
-   public static List<TAggregationType> 
splitPartialAggregation(TAggregationType aggregationType) {
+   public static List<String> splitPartialAggregation(TAggregationType 
aggregationType) {
      switch (aggregationType) {
        case FIRST_VALUE:
-         return Collections.singletonList(TAggregationType.MIN_TIME);
+         return Collections.singletonList(SqlConstant.MIN_TIME);
        case LAST_VALUE:
-         return Collections.singletonList(TAggregationType.MAX_TIME);
+         return Collections.singletonList(SqlConstant.MAX_TIME);
        case STDDEV:
-         return Collections.singletonList(TAggregationType.STDDEV);
+         return 
Collections.singletonList(addPartialSuffix(SqlConstant.STDDEV));
        case STDDEV_POP:
-         return Collections.singletonList(TAggregationType.STDDEV_POP);
+         return 
Collections.singletonList(addPartialSuffix(SqlConstant.STDDEV_POP));
        case STDDEV_SAMP:
-         return Collections.singletonList(TAggregationType.STDDEV_SAMP);
+         return 
Collections.singletonList(addPartialSuffix(SqlConstant.STDDEV_SAMP));
        case VARIANCE:
-         return Collections.singletonList(TAggregationType.VARIANCE);
+         return 
Collections.singletonList(addPartialSuffix(SqlConstant.VARIANCE));
        case VAR_POP:
-         return Collections.singletonList(TAggregationType.VAR_POP);
+         return 
Collections.singletonList(addPartialSuffix(SqlConstant.VAR_POP));
        case VAR_SAMP:
-         return Collections.singletonList(TAggregationType.VAR_SAMP);
+         return 
Collections.singletonList(addPartialSuffix(SqlConstant.VAR_SAMP));
 +      case MAX_BY:
-         return Arrays.asList(TAggregationType.MAX_BY_X_INPUT, 
TAggregationType.MAX_BY_Y_INPUT);
++        return 
Collections.singletonList(addPartialSuffix(SqlConstant.MAX_BY));
        case AVG:
-         return Arrays.asList(TAggregationType.COUNT, TAggregationType.SUM);
+         return Arrays.asList(SqlConstant.COUNT, SqlConstant.SUM);
        case TIME_DURATION:
-         return Arrays.asList(TAggregationType.MAX_TIME, 
TAggregationType.MIN_TIME);
+         return Arrays.asList(SqlConstant.MAX_TIME, SqlConstant.MIN_TIME);
        case SUM:
        case MIN_VALUE:
        case MAX_VALUE:
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
index f3e13d1cac1,75d8a04aac1..686476ff6a0
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
@@@ -47,9 -47,6 +47,7 @@@ public class SqlConstant 
    public static final String MAX_TIME = "max_time";
    public static final String MAX_VALUE = "max_value";
    public static final String MIN_VALUE = "min_value";
 +  public static final String MAX_BY = "max_by";
-   public static final String MAX_BY_INPUT_X = "max_by_input_x";
-   public static final String MAX_BY_INPUT_Y = "max_by_input_y";
    public static final String EXTREME = "extreme";
    public static final String FIRST_VALUE = "first_value";
    public static final String LAST_VALUE = "last_value";
diff --cc 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
index 55fe3edeead,941a39dcb53..310eb019789
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
@@@ -876,44 -864,4 +876,37 @@@ public class AccumulatorTest 
      varSampAccumulator.outputFinal(finalResult);
      Assert.assertEquals(841.6666666666666, finalResult.build().getDouble(0), 
0.001);
    }
 +
 +  @Test
 +  public void maxByAccumulatorTest() {
 +    Accumulator maxByAccumulator =
 +        AccumulatorFactory.createAccumulator(
 +            TAggregationType.MAX_BY,
 +            Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE),
 +            Collections.emptyList(),
 +            Collections.emptyMap(),
 +            true);
-     Assert.assertEquals(TSDataType.INT32, 
maxByAccumulator.getIntermediateType()[0]);
-     Assert.assertEquals(TSDataType.DOUBLE, 
maxByAccumulator.getIntermediateType()[1]);
++    Assert.assertEquals(TSDataType.TEXT, 
maxByAccumulator.getIntermediateType()[0]);
 +    Assert.assertEquals(TSDataType.INT32, maxByAccumulator.getFinalType());
 +    // Returns null if there's no data
-     ColumnBuilder[] intermediateResult = new ColumnBuilder[2];
-     intermediateResult[0] = new IntColumnBuilder(null, 1);
-     intermediateResult[1] = new DoubleColumnBuilder(null, 1);
++    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
++    intermediateResult[0] = new BinaryColumnBuilder(null, 1);
 +    maxByAccumulator.outputIntermediate(intermediateResult);
 +    Assert.assertTrue(intermediateResult[0].build().isNull(0));
-     Assert.assertTrue(intermediateResult[1].build().isNull(0));
 +    ColumnBuilder finalResult = new IntColumnBuilder(null, 1);
 +    maxByAccumulator.outputFinal(finalResult);
 +    Assert.assertTrue(finalResult.build().isNull(0));
 +
 +    Column[] timeAndValueColumn = getTimeAndTwoValueColumns(1, 0);
 +    maxByAccumulator.addInput(timeAndValueColumn, null, 
rawData.getPositionCount() - 1);
 +    Assert.assertFalse(maxByAccumulator.hasFinalResult());
-     intermediateResult[0] = new IntColumnBuilder(null, 1);
-     intermediateResult[1] = new DoubleColumnBuilder(null, 1);
++    intermediateResult[0] = new BinaryColumnBuilder(null, 1);
 +    maxByAccumulator.outputIntermediate(intermediateResult);
-     Assert.assertEquals(-99, intermediateResult[0].build().getInt(0));
-     Assert.assertEquals(99d, intermediateResult[1].build().getDouble(0), 
0.001);
 +
 +    // add intermediate result as input
-     maxByAccumulator.addIntermediate(
-         new Column[] {intermediateResult[0].build(), 
intermediateResult[1].build()});
++    maxByAccumulator.addIntermediate(new Column[] 
{intermediateResult[0].build()});
 +    finalResult = new IntColumnBuilder(null, 1);
 +    maxByAccumulator.outputFinal(finalResult);
 +    Assert.assertEquals(-99, finalResult.build().getInt(0));
 +  }
  }
diff --cc iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index 21a69cf689e,dbdb8bd406f..9ec7b42d0ce
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@@ -196,8 -196,5 +196,6 @@@ enum TAggregationType 
    STDDEV_SAMP,
    VARIANCE,
    VAR_POP,
 -  VAR_SAMP
 +  VAR_SAMP,
 +  MAX_BY,
-   MAX_BY_X_INPUT,
-   MAX_BY_Y_INPUT
  }

Reply via email to