This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 10b8be2c661 [IOTDB-5765] Support Order By Expression [BE Part] (#9746)
10b8be2c661 is described below
commit 10b8be2c661cc19e6d280720d5241fe365d91e89
Author: YangCaiyin <[email protected]>
AuthorDate: Mon May 8 08:29:26 2023 +0800
[IOTDB-5765] Support Order By Expression [BE Part] (#9746)
---
.../iotdb/it/env/cluster/MppCommonConfig.java | 12 +
.../it/env/cluster/MppSharedCommonConfig.java | 14 ++
.../iotdb/it/env/remote/RemoteCommonConfig.java | 10 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 4 +
.../iotdb/db/it/orderBy/IoTDBOrderBy2IT.java | 48 ++++
.../apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java | 6 +-
.../resources/conf/iotdb-common.properties | 4 +
.../resources/conf/iotdb-datanode.properties | 13 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 +
.../iotdb/db/mpp/execution/driver/Driver.java | 21 ++
.../db/mpp/execution/driver/DriverContext.java | 9 +
.../fragment/FragmentInstanceContext.java | 10 +
.../fragment/FragmentInstanceExecution.java | 17 ++
.../operator/process/MergeSortOperator.java | 9 +-
.../execution/operator/process/SortOperator.java | 258 ++++++++++++++++---
.../process/join/merge/MergeSortComparator.java | 45 ++--
...rtKeyComparator.java => SortKeyComparator.java} | 11 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 5 +-
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 14 +-
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 6 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 28 ++-
.../plan/planner/distribution/SourceRewriter.java | 4 +-
.../plan/statement/component/OrderByComponent.java | 6 +-
.../component/{SortKey.java => OrderByKey.java} | 2 +-
.../db/mpp/plan/statement/crud/QueryStatement.java | 2 +-
.../plan/statement/sys/ShowQueriesStatement.java | 4 +-
.../org/apache/iotdb/db/tools/DiskSpiller.java | 166 +++++++++++++
.../apache/iotdb/db/tools/FileSpillerReader.java | 143 +++++++++++
.../org/apache/iotdb/db/tools/MemoryReader.java | 56 +++++
.../apache/iotdb/db/tools/SortBufferManager.java | 73 ++++++
.../SortKey.java => tools/SortReader.java} | 25 +-
.../db/utils/datastructure/MergeSortHeap.java | 4 +-
.../iotdb/db/utils/datastructure/MergeSortKey.java | 25 +-
.../{MergeSortKey.java => SortKey.java} | 14 +-
.../execution/operator/MergeSortOperatorTest.java | 36 +--
.../mpp/execution/operator/OperatorMemoryTest.java | 14 +-
.../mpp/execution/operator/SortOperatorTest.java | 273 +++++++++++++++++++++
.../db/mpp/plan/optimization/TestPlanBuilder.java | 6 +-
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 16 +-
.../plan/node/process/DeviceViewNodeSerdeTest.java | 6 +-
.../plan/plan/node/process/SortNodeSerdeTest.java | 4 +-
42 files changed, 1292 insertions(+), 164 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index 6e3c8d365d4..3a6b0543371 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -346,4 +346,16 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
setProperty("quota_enable", String.valueOf(quotaEnable));
return this;
}
+
+ @Override
+ public CommonConfig setSortBufferSize(long sortBufferSize) {
+ setProperty("sort_buffer_size_in_bytes", String.valueOf(sortBufferSize));
+ return this;
+ }
+
+ @Override
+ public CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte) {
+ setProperty("max_tsblock_size_in_bytes",
String.valueOf(maxTsBlockSizeInByte));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 199274140ee..3346acf5393 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -357,4 +357,18 @@ public class MppSharedCommonConfig implements CommonConfig
{
cnConfig.setQuotaEnable(quotaEnable);
return this;
}
+
+ @Override
+ public CommonConfig setSortBufferSize(long sortBufferSize) {
+ dnConfig.setSortBufferSize(sortBufferSize);
+ cnConfig.setSortBufferSize(sortBufferSize);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte) {
+ dnConfig.setMaxTsBlockSizeInByte(maxTsBlockSizeInByte);
+ cnConfig.setMaxTsBlockSizeInByte(maxTsBlockSizeInByte);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index e6af4826a87..37f9c99340b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -256,4 +256,14 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setQuotaEnable(boolean quotaEnable) {
return this;
}
+
+ @Override
+ public CommonConfig setSortBufferSize(long sortBufferSize) {
+ return this;
+ }
+
+ @Override
+ public CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index bb39edba207..75c6fa8ec8b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -114,4 +114,8 @@ public interface CommonConfig {
CommonConfig setWriteMemoryProportion(String writeMemoryProportion);
CommonConfig setQuotaEnable(boolean quotaEnable);
+
+ CommonConfig setSortBufferSize(long sortBufferSize);
+
+ CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderBy2IT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderBy2IT.java
new file mode 100644
index 00000000000..131ea8bf3b5
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderBy2IT.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.orderBy;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBOrderBy2IT extends IoTDBOrderByIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(2048);
+
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setMaxTsBlockSizeInByte(200);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
index a39b965a182..70dac5729d3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
@@ -99,6 +99,7 @@ public class IoTDBOrderByIT {
@BeforeClass
public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024
* 1024L);
EnvFactory.getEnv().initClusterEnvironment();
insertData();
}
@@ -201,7 +202,7 @@ public class IoTDBOrderByIT {
@Test
public void orderByTest2() {
- String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by
bigNum";
+ String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by
bigNum,time";
int[] ans = {13, 11, 10, 3, 1, 5, 4, 7, 9, 8, 2, 12, 0, 6, 14};
testNormalOrderBy(sql, ans);
}
@@ -229,7 +230,8 @@ public class IoTDBOrderByIT {
@Test
public void orderByTest6() {
- String sql = "select num,bigNum,floatNum,str,bool from root.sg.d order by
bigNum desc";
+ String sql =
+ "select num,bigNum,floatNum,str,bool from root.sg.d order by bigNum
desc, time asc";
int[] ans = {6, 14, 0, 12, 2, 8, 9, 7, 4, 5, 1, 3, 10, 11, 13};
testNormalOrderBy(sql, ans);
}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index ad523550f1d..b1850ae6fcd 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -431,6 +431,10 @@ cluster_name=defaultCluster
# Datatype: int
# batch_size=100000
+# The memory for external sort in sort operator, when the data size is smaller
than sort_buffer_size_in_bytes, the sort operator will use in-memory sort.
+# Datatype: long
+# sort_buffer_size_in_bytes=52428800
+
####################
### Storage Engine Configuration
####################
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index e0e1c4b2c50..d6b44925f1f 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -212,6 +212,19 @@ dn_target_config_node_list=127.0.0.1:10710
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# dn_sync_dir=data/datanode/sync
+# sort_tmp_dir
+# This property is used to configure the temporary directory for sorting
operation.
+# If this property is unset, system will save the data in the default relative
path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/datanode).
+# If it is absolute, system will save the data in the exact location it points
to.
+# If it is relative, system will save the data in the relative path directory
it indicates under the IoTDB folder.
+# Note: If sort_tmp_dir is assigned an empty string(i.e.,zero-size), it will
be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute. Otherwise, it is relative.
+# sort_tmp_dir=data\\datanode\\tmp
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# sort_tmp_dir=data/datanode/tmp
+
####################
### Metric Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f90c84650ca..2d0285c429e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -313,6 +313,10 @@ public class IoTDBConfig {
private String schemaRegionConsensusDir = consensusDir + File.separator +
"schema_region";
+ /** temp result directory for sortOperator */
+ private String sortTmpDir =
+ IoTDBConstant.DEFAULT_BASE_DIR + File.separator +
IoTDBConstant.TMP_FOLDER_NAME;
+
/** Maximum MemTable number. Invalid when enableMemControl is true. */
private int maxMemtableNumber = 0;
@@ -410,6 +414,9 @@ public class IoTDBConfig {
/** Enable the service for MLNode */
private boolean enableMLNodeService = false;
+ /** The buffer for sort operation */
+ private long sortBufferSize = 50 * 1024 * 1024L;
+
/**
* The strategy of inner space compaction task. There are just one inner
space compaction strategy
* SIZE_TIRED_COMPACTION:
@@ -3774,4 +3781,20 @@ public class IoTDBConfig {
public void setRateLimiterType(String rateLimiterType) {
RateLimiterType = rateLimiterType;
}
+
+ public void setSortBufferSize(long sortBufferSize) {
+ this.sortBufferSize = sortBufferSize;
+ }
+
+ public long getSortBufferSize() {
+ return sortBufferSize;
+ }
+
+ public void setSortTmpDir(String sortTmpDir) {
+ this.sortTmpDir = sortTmpDir;
+ }
+
+ public String getSortTmpDir() {
+ return sortTmpDir;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ae7f82f02b2..34614f1194b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1062,6 +1062,16 @@ public class IoTDBDescriptor {
Boolean.parseBoolean(
properties.getProperty("quota_enable",
String.valueOf(conf.isQuotaEnable()))));
+ // the buffer for sort operator to calculate
+ conf.setSortBufferSize(
+ Long.parseLong(
+ properties
+ .getProperty("sort_buffer_size_in_bytes",
Long.toString(conf.getSortBufferSize()))
+ .trim()));
+
+ // tmp filePath for sort operator
+ conf.setSortTmpDir(properties.getProperty("sort_tmp_dir",
conf.getSortTmpDir()));
+
conf.setRateLimiterType(properties.getProperty("rate_limiter_type",
conf.getRateLimiterType()));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index f92650bcde3..a214522ca65 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.mpp.execution.driver;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -34,6 +36,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
+import java.io.File;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -372,6 +375,11 @@ public abstract class Driver implements IDriver {
try {
root.close();
+
+ if (driverContext.mayHaveTmpFile()) {
+ cleanTmpFile();
+ }
+
sink.setNoMoreTsBlocks();
// record operator execution statistics to metrics
@@ -404,6 +412,19 @@ public abstract class Driver implements IDriver {
return inFlightException;
}
+ private void cleanTmpFile() {
+ String pipeLineSortDir =
+ IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+ + File.separator
+ + driverContext.getFragmentInstanceContext().getId().getFullId()
+ + File.separator
+ + driverContext.getPipelineId()
+ + File.separator;
+ File tmpPipeLineDir = new File(pipeLineSortDir);
+ if (!tmpPipeLineDir.exists()) return;
+ FileUtils.deleteDirectory(tmpPipeLineDir);
+ }
+
private static Throwable addSuppressedException(
Throwable inFlightException, Throwable newException, String message,
Object... args) {
if (newException instanceof Error) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 2c2f5878c2d..001cc5c4ede 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -44,6 +44,7 @@ public class DriverContext {
private ExchangeOperator downstreamOperator;
private final AtomicBoolean finished = new AtomicBoolean();
+ private boolean mayHaveTmpFile = false;
@TestOnly
public DriverContext() {
@@ -138,4 +139,12 @@ public class DriverContext {
public boolean isDone() {
return finished.get();
}
+
+ public void setHaveTmpFile(boolean mayHaveTmpFile) {
+ this.mayHaveTmpFile = mayHaveTmpFile;
+ }
+
+ public boolean mayHaveTmpFile() {
+ return mayHaveTmpFile;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 7e6257ef201..01882493a5d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -62,6 +62,8 @@ public class FragmentInstanceContext extends QueryContext {
private Set<TsFileResource> closedFilePaths;
/** unClosed tsfile used in this fragment instance */
private Set<TsFileResource> unClosedFilePaths;
+ /** check if there is tmp file to be deleted */
+ private boolean mayHaveTmpFile = false;
private final long createNanos = System.nanoTime();
@@ -375,4 +377,12 @@ public class FragmentInstanceContext extends QueryContext {
sourcePaths = null;
sharedQueryDataSource = null;
}
+
+ public void setMayHaveTmpFile(boolean mayHaveTmpFile) {
+ this.mayHaveTmpFile = mayHaveTmpFile;
+ }
+
+ public boolean mayHaveTmpFile() {
+ return mayHaveTmpFile;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 2c7e2a73b6d..b68ec33f096 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.mpp.execution.fragment;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.exception.CpuNotEnoughException;
import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
@@ -31,6 +33,7 @@ import io.airlift.stats.CounterStat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.List;
import static java.util.Objects.requireNonNull;
@@ -137,6 +140,20 @@ public class FragmentInstanceExecution {
}
// help for gc
sink = null;
+
+ // delete tmp file if exists
+ if (context.mayHaveTmpFile()) {
+ String tmpFilePath =
+ IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+ + File.separator
+ + context.getId().getFullId()
+ + File.separator;
+ File tmpFile = new File(tmpFilePath);
+ if (tmpFile.exists()) {
+ FileUtils.deleteDirectory(tmpFile);
+ }
+ }
+
// close the driver after sink is aborted or closed because in
driver.close() it
// will try to call ISink.setNoMoreTsBlocks()
for (IDriver driver : drivers) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index 5f3c7844912..d13798a5176 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -44,7 +45,7 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
private final TsBlockBuilder tsBlockBuilder;
private final boolean[] noMoreTsBlocks;
private final MergeSortHeap mergeSortHeap;
- private final Comparator<MergeSortKey> comparator;
+ private final Comparator<SortKey> comparator;
private boolean finished;
@@ -52,7 +53,7 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
OperatorContext operatorContext,
List<Operator> inputOperators,
List<TSDataType> dataTypes,
- Comparator<MergeSortKey> comparator) {
+ Comparator<SortKey> comparator) {
super(operatorContext, inputOperators);
this.dataTypes = dataTypes;
this.mergeSortHeap = new MergeSortHeap(inputOperatorsCount, comparator);
@@ -101,7 +102,7 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
minMergeSortKey.tsBlock,
minMergeSortKey.tsBlock.getPositionCount() - 1),
mergeSortHeap.peek())
< 0) {
- inputTsBlocks[minMergeSortKey.columnIndex] = null;
+ inputTsBlocks[minMergeSortKey.inputChannelIndex] = null;
return minMergeSortKey.rowIndex == 0
? minMergeSortKey.tsBlock
: minMergeSortKey.tsBlock.subTsBlock(minMergeSortKey.rowIndex);
@@ -126,7 +127,7 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
}
tsBlockBuilder.declarePosition();
if (mergeSortKey.rowIndex == mergeSortKey.tsBlock.getPositionCount() -
1) {
- inputTsBlocks[mergeSortKey.columnIndex] = null;
+ inputTsBlocks[mergeSortKey.inputChannelIndex] = null;
if (!mergeSortHeap.isEmpty()
&& comparator.compare(mergeSortHeap.peek(), mergeSortKey) > 0) {
break;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 6e9d185251d..20dcac02f7e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -18,39 +18,74 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.tools.DiskSpiller;
+import org.apache.iotdb.db.tools.MemoryReader;
+import org.apache.iotdb.db.tools.SortBufferManager;
+import org.apache.iotdb.db.tools.SortReader;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.tools.SortBufferManager.SORT_BUFFER_SIZE;
public class SortOperator implements ProcessOperator {
private final OperatorContext operatorContext;
private final Operator inputOperator;
private final TsBlockBuilder tsBlockBuilder;
- private List<MergeSortKey> cachedData;
- private final Comparator<MergeSortKey> comparator;
+ // Use to output the result in memory.
+ // Because the memory may be larger than tsBlockBuilder's max size
+ // so the data may be return in multiple times.
+ private int curRow = -1;
+
+ private List<SortKey> cachedData;
+ private final Comparator<SortKey> comparator;
+ private long cachedBytes;
+ private final DiskSpiller diskSpiller;
+ private final SortBufferManager sortBufferManager;
+
+ // For mergeSort
+
+ private MergeSortHeap mergeSortHeap;
+ private List<SortReader> sortReaders;
+ private boolean[] noMoreData;
+
+ private static final Logger logger =
LoggerFactory.getLogger(SortOperator.class);
public SortOperator(
OperatorContext operatorContext,
Operator inputOperator,
List<TSDataType> dataTypes,
- Comparator<MergeSortKey> comparator) {
+ String folderPath,
+ Comparator<SortKey> comparator) {
this.operatorContext = operatorContext;
this.inputOperator = inputOperator;
this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
this.cachedData = new ArrayList<>();
this.comparator = comparator;
+ this.cachedBytes = 0;
+ this.diskSpiller =
+ new DiskSpiller(folderPath, folderPath +
operatorContext.getOperatorId(), dataTypes);
+ this.sortBufferManager = new SortBufferManager();
}
@Override
@@ -67,75 +102,232 @@ public class SortOperator implements ProcessOperator {
public TsBlock next() throws Exception {
if (!inputOperator.hasNextWithTimer()) {
- if (cachedData.size() > 1) {
- cachedData.sort(comparator);
+ if (diskSpiller.hasSpilledData()) {
+ try {
+ prepareSortReaders();
+ return mergeSort();
+ } catch (Exception e) {
+ clear();
+ throw e;
+ }
+ } else {
+ if (curRow == -1) {
+ cachedData.sort(comparator);
+ curRow = 0;
+ }
+ return buildTsBlockInMemory();
}
- TsBlock result = buildTsBlock();
- cachedData = null;
- return result;
}
TsBlock tsBlock = inputOperator.nextWithTimer();
if (tsBlock == null) {
return null;
}
- // add data of each TsBlock from child into list
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- cachedData.add(new MergeSortKey(tsBlock, i));
+
+ try {
+ cacheTsBlock(tsBlock);
+ } catch (IoTDBException e) {
+ clear();
+ throw e;
}
+
return null;
}
- private TsBlock buildTsBlock() {
+ private void prepareSortReaders() throws IoTDBException {
+ if (sortReaders != null) return;
+
+ sortReaders = new ArrayList<>();
+ if (cachedBytes != 0) {
+ cachedData.sort(comparator);
+ if (sortBufferManager.allocate(cachedBytes)) {
+ sortReaders.add(
+ new MemoryReader(
+
cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList())));
+ } else {
+ sortBufferManager.allocateOneSortBranch();
+ diskSpiller.spillSortedData(cachedData);
+ cachedData = null;
+ }
+ }
+ sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
+ // if reader is finished
+ noMoreData = new boolean[sortReaders.size()];
+ }
+
+ private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+ long bytesSize = tsBlock.getRetainedSizeInBytes();
+ if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
+ cachedBytes += bytesSize;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ cachedData.add(new MergeSortKey(tsBlock, i));
+ }
+ } else {
+ cachedData.sort(comparator);
+ spill();
+ cachedData.clear();
+ cachedBytes = bytesSize;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ cachedData.add(new MergeSortKey(tsBlock, i));
+ }
+ }
+ }
+
+ private void spill() throws IoTDBException {
+ // if current memory cannot put this tsBlock, an exception will be thrown
in spillSortedData()
+ // because there should be at least
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
+ // one branch.
+ sortBufferManager.allocateOneSortBranch();
+ diskSpiller.spillSortedData(cachedData);
+ }
+
+ private TsBlock buildTsBlockInMemory() {
+ tsBlockBuilder.reset();
TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
- cachedData.forEach(
- mergeSortKey -> {
- TsBlock tsBlock = mergeSortKey.tsBlock;
- int row = mergeSortKey.rowIndex;
- timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(row));
- for (int i = 0; i < valueColumnBuilders.length; i++) {
- if (tsBlock.getColumn(i).isNull(row)) {
- valueColumnBuilders[i].appendNull();
- continue;
- }
- valueColumnBuilders[i].write(tsBlock.getColumn(i), row);
- }
- tsBlockBuilder.declarePosition();
- });
+ for (int i = curRow; i < cachedData.size(); i++) {
+ SortKey sortKey = cachedData.get(i);
+ TsBlock tsBlock = sortKey.tsBlock;
+ timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex));
+ for (int j = 0; j < valueColumnBuilders.length; j++) {
+ if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) {
+ valueColumnBuilders[j].appendNull();
+ continue;
+ }
+ valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex);
+ }
+ tsBlockBuilder.declarePosition();
+ curRow++;
+ if (tsBlockBuilder.isFull()) {
+ break;
+ }
+ }
+
return tsBlockBuilder.build();
}
+ private TsBlock mergeSort() throws IoTDBException {
+
+ if (mergeSortHeap == null) {
+ mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
+ // 1. fill the input from each reader
+ for (int i = 0; i < sortReaders.size(); i++) {
+ SortReader sortReader = sortReaders.get(i);
+ if (sortReader.hasNext()) {
+ MergeSortKey mergeSortKey = sortReader.next();
+ mergeSortKey.inputChannelIndex = i;
+ mergeSortHeap.push(mergeSortKey);
+ } else {
+ noMoreData[i] = true;
+ sortBufferManager.releaseOneSortBranch();
+ }
+ }
+ }
+
+ long startTime = System.nanoTime();
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+ // 2. do merge sort until one TsBlock is consumed up
+ tsBlockBuilder.reset();
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
+ while (!mergeSortHeap.isEmpty()) {
+
+ MergeSortKey mergeSortKey = mergeSortHeap.poll();
+ TsBlock targetBlock = mergeSortKey.tsBlock;
+ timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex));
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) {
+ valueColumnBuilders[i].appendNull();
+ continue;
+ }
+ valueColumnBuilders[i].write(targetBlock.getColumn(i),
mergeSortKey.rowIndex);
+ }
+ tsBlockBuilder.declarePosition();
+
+ int readerIndex = mergeSortKey.inputChannelIndex;
+ mergeSortKey = readNextMergeSortKey(readerIndex);
+ if (mergeSortKey != null) {
+ mergeSortHeap.push(mergeSortKey);
+ } else {
+ noMoreData[readerIndex] = true;
+ sortBufferManager.releaseOneSortBranch();
+ }
+
+ // break if time is out or tsBlockBuilder is full or sortBuffer is not
enough
+ if (System.nanoTime() - startTime > maxRuntime ||
tsBlockBuilder.isFull()) {
+ break;
+ }
+ }
+ return tsBlockBuilder.build();
+ }
+
+ private MergeSortKey readNextMergeSortKey(int readerIndex) throws
IoTDBException {
+ SortReader sortReader = sortReaders.get(readerIndex);
+ if (sortReader.hasNext()) {
+ MergeSortKey mergeSortKey = sortReader.next();
+ mergeSortKey.inputChannelIndex = readerIndex;
+ return mergeSortKey;
+ }
+ return null;
+ }
+
+ private boolean hasMoreData() {
+ if (noMoreData == null) return true;
+ for (boolean noMore : noMoreData) {
+ if (!noMore) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void clear() {
+ if (!diskSpiller.hasSpilledData()) return;
+ try {
+ if (sortReaders != null) {
+ for (SortReader sortReader : sortReaders) {
+ sortReader.close();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Fail to close fileChannel", e);
+ }
+ }
+
@Override
public boolean hasNext() throws Exception {
- return inputOperator.hasNextWithTimer() || cachedData != null;
+ return inputOperator.hasNextWithTimer()
+ || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
+ || (diskSpiller.hasSpilledData() && hasMoreData());
}
@Override
public void close() throws Exception {
+ cachedData = null;
+ clear();
inputOperator.close();
}
@Override
public boolean isFinished() throws Exception {
- return cachedData == null;
+ return !this.hasNextWithTimer();
}
@Override
public long calculateMaxPeekMemory() {
- // In fact, we need to cache all data from input.
- // Now the child of this Operator only will be ShowQueries, it only
returns one Block.
return inputOperator.calculateMaxPeekMemory()
- + inputOperator.calculateRetainedSizeAfterCallingNext();
+ + inputOperator.calculateRetainedSizeAfterCallingNext()
+ + SORT_BUFFER_SIZE;
}
@Override
public long calculateMaxReturnSize() {
- return inputOperator.calculateMaxReturnSize();
+ return TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return inputOperator.calculateRetainedSizeAfterCallingNext();
+ return inputOperator.calculateRetainedSizeAfterCallingNext() +
SORT_BUFFER_SIZE;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
index 3d8991bf24f..c949c2990d0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.mpp.plan.statement.component.NullOrdering;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.commons.collections4.comparators.ComparatorChain;
@@ -33,70 +33,65 @@ import java.util.List;
public class MergeSortComparator {
- private static final Comparator<MergeSortKey> TIME_ASC_COMPARATOR =
+ private static final Comparator<SortKey> TIME_ASC_COMPARATOR =
Comparator.comparingLong(
- (MergeSortKey sortKey) ->
sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex));
+ (SortKey sortKey) ->
sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex));
- private static final Comparator<MergeSortKey> TIME_DESC_COMPARATOR =
+ private static final Comparator<SortKey> TIME_DESC_COMPARATOR =
Comparator.comparingLong(
- (MergeSortKey sortKey) ->
sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex))
+ (SortKey sortKey) ->
sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex))
.reversed();
/** @param indexList -1 for time column */
- public static Comparator<MergeSortKey> getComparator(
+ public static Comparator<SortKey> getComparator(
List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType>
dataTypeList) {
// use code-gen compile this comparator
- List<Comparator<MergeSortKey>> list = new ArrayList<>(indexList.size());
+ List<Comparator<SortKey>> list = new ArrayList<>(indexList.size());
for (int i = 0; i < indexList.size(); i++) {
int index = indexList.get(i);
+ if (index == -2) continue;
TSDataType dataType = dataTypeList.get(i);
boolean asc = sortItemList.get(i).getOrdering() == Ordering.ASC;
boolean nullFirst = sortItemList.get(i).getNullOrdering() ==
NullOrdering.FIRST;
list.add(genSingleComparator(asc, index, dataType, nullFirst));
}
- return new ComparatorChain<>(list);
+
+ return list.size() == 1 ? list.get(0) : new ComparatorChain<>(list);
}
- public static Comparator<MergeSortKey> getComparator(
- TSDataType dataType, int index, boolean asc) {
- Comparator<MergeSortKey> comparator;
+ public static Comparator<SortKey> getComparator(TSDataType dataType, int
index, boolean asc) {
+ Comparator<SortKey> comparator;
switch (dataType) {
case INT32:
comparator =
Comparator.comparingInt(
- (MergeSortKey sortKey) ->
- sortKey.tsBlock.getColumn(index).getInt(sortKey.rowIndex));
+ (SortKey sortKey) ->
sortKey.tsBlock.getColumn(index).getInt(sortKey.rowIndex));
break;
case INT64:
comparator =
Comparator.comparingLong(
- (MergeSortKey sortKey) ->
-
sortKey.tsBlock.getColumn(index).getLong(sortKey.rowIndex));
+ (SortKey sortKey) ->
sortKey.tsBlock.getColumn(index).getLong(sortKey.rowIndex));
break;
case FLOAT:
comparator =
Comparator.comparingDouble(
- (MergeSortKey sortKey) ->
-
sortKey.tsBlock.getColumn(index).getFloat(sortKey.rowIndex));
+ (SortKey sortKey) ->
sortKey.tsBlock.getColumn(index).getFloat(sortKey.rowIndex));
break;
case DOUBLE:
comparator =
Comparator.comparingDouble(
- (MergeSortKey sortKey) ->
-
sortKey.tsBlock.getColumn(index).getDouble(sortKey.rowIndex));
+ (SortKey sortKey) ->
sortKey.tsBlock.getColumn(index).getDouble(sortKey.rowIndex));
break;
case TEXT:
comparator =
Comparator.comparing(
- (MergeSortKey sortKey) ->
-
sortKey.tsBlock.getColumn(index).getBinary(sortKey.rowIndex));
+ (SortKey sortKey) ->
sortKey.tsBlock.getColumn(index).getBinary(sortKey.rowIndex));
break;
case BOOLEAN:
comparator =
Comparator.comparing(
- (MergeSortKey sortKey) ->
-
sortKey.tsBlock.getColumn(index).getBoolean(sortKey.rowIndex));
+ (SortKey sortKey) ->
sortKey.tsBlock.getColumn(index).getBoolean(sortKey.rowIndex));
break;
default:
throw new IllegalArgumentException("Data type: " + dataType + " cannot
be ordered");
@@ -108,12 +103,12 @@ public class MergeSortComparator {
return comparator;
}
- private static Comparator<MergeSortKey> genSingleComparator(
+ private static Comparator<SortKey> genSingleComparator(
boolean asc, int index, TSDataType dataType, boolean nullFirst) {
if (index == -1) {
return asc ? TIME_ASC_COMPARATOR : TIME_DESC_COMPARATOR;
}
- return new MergeSortKeyComparator(index, nullFirst,
getComparator(dataType, index, asc));
+ return new SortKeyComparator(index, nullFirst, getComparator(dataType,
index, asc));
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortKeyComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
similarity index 79%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortKeyComparator.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
index 8e8e8c6f851..fa4a6a1a011 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortKeyComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
@@ -19,26 +19,25 @@
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
-import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
import java.io.Serializable;
import java.util.Comparator;
-public class MergeSortKeyComparator implements Comparator<MergeSortKey>,
Serializable {
+public class SortKeyComparator implements Comparator<SortKey>, Serializable {
private final boolean nullFirst;
private final int index;
- private final Comparator<MergeSortKey> originalComparator;
+ private final Comparator<SortKey> originalComparator;
- public MergeSortKeyComparator(
- int index, boolean nullFirst, Comparator<MergeSortKey>
originalComparator) {
+ public SortKeyComparator(int index, boolean nullFirst, Comparator<SortKey>
originalComparator) {
this.nullFirst = nullFirst;
this.index = index;
this.originalComparator = originalComparator;
}
@Override
- public int compare(MergeSortKey o1, MergeSortKey o2) {
+ public int compare(SortKey o1, SortKey o2) {
boolean o1IsNull = o1.tsBlock.getColumn(index).isNull(o1.rowIndex);
boolean o2IsNull = o2.tsBlock.getColumn(index).isNull(o2.rowIndex);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 30162d3ad06..248a7209fa2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -86,10 +86,10 @@ import
org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import
org.apache.iotdb.db.mpp.plan.statement.component.GroupByVariationComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
@@ -415,7 +415,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
if (orderByParameter != null &&
!orderByParameter.getSortItemList().isEmpty()) {
List<SortItem> sortItemList = orderByParameter.getSortItemList();
checkState(
- sortItemList.size() == 1 &&
sortItemList.get(0).getSortKey().equals(SortKey.TIMESERIES),
+ sortItemList.size() == 1
+ &&
sortItemList.get(0).getSortKey().equals(OrderByKey.TIMESERIES),
"Last queries only support sorting by timeseries now.");
boolean isAscending = sortItemList.get(0).getOrdering() == Ordering.ASC;
sourceExpressions =
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 4e9986d4fbf..28203751ee1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -87,12 +87,12 @@ import
org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.IntoItem;
import org.apache.iotdb.db.mpp.plan.statement.component.NullOrdering;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultSetFormat;
import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
@@ -1094,7 +1094,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
queryStatement.setOrderByComponent(
parseOrderByClause(
ctx.orderByClause(),
- ImmutableSet.of(SortKey.TIME, SortKey.DEVICE,
SortKey.TIMESERIES)));
+ ImmutableSet.of(OrderByKey.TIME, OrderByKey.DEVICE,
OrderByKey.TIMESERIES)));
}
// parse FILL
@@ -2904,11 +2904,11 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
parseOrderByClause(
ctx.orderByClause(),
ImmutableSet.of(
- SortKey.TIME,
- SortKey.QUERYID,
- SortKey.DATANODEID,
- SortKey.ELAPSEDTIME,
- SortKey.STATEMENT)));
+ OrderByKey.TIME,
+ OrderByKey.QUERYID,
+ OrderByKey.DATANODEID,
+ OrderByKey.ELAPSEDTIME,
+ OrderByKey.STATEMENT)));
}
// parse LIMIT & OFFSET
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 8ac7b9ac8b9..486b9f2f455 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -87,9 +87,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.utils.SchemaUtils;
@@ -549,10 +549,10 @@ public class LogicalPlanBuilder {
sortItemList = new ArrayList<>();
}
if (!queryStatement.isOrderByDevice()) {
- sortItemList.add(new SortItem(SortKey.DEVICE, Ordering.ASC));
+ sortItemList.add(new SortItem(OrderByKey.DEVICE, Ordering.ASC));
}
if (!queryStatement.isOrderByTime()) {
- sortItemList.add(new SortItem(SortKey.TIME, Ordering.ASC));
+ sortItemList.add(new SortItem(OrderByKey.TIME, Ordering.ASC));
}
OrderByParameter orderByParameter = new OrderByParameter(sortItemList);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index d51bedfd86f..4e945c8be62 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -200,9 +201,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
@@ -222,6 +223,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.Validate;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -797,7 +799,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
TimeSelector selector = null;
TimeComparator timeComparator = null;
for (SortItem sortItem : node.getMergeOrderParameter().getSortItemList()) {
- if (Objects.equals(sortItem.getSortKey(), SortKey.TIME)) {
+ if (Objects.equals(sortItem.getSortKey(), OrderByKey.TIME)) {
Ordering ordering = sortItem.getOrdering();
if (ordering == Ordering.ASC) {
selector = new TimeSelector(node.getChildren().size() << 1, true);
@@ -853,7 +855,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
List<TSDataType> sortItemDataTypeList) {
sortItemList.forEach(
sortItem -> {
- if (sortItem.getSortKey().equals(SortKey.TIME)) {
+ if (sortItem.getSortKey().equals(OrderByKey.TIME)) {
sortItemIndexList.add(-1);
sortItemDataTypeList.add(TSDataType.INT64);
} else {
@@ -863,6 +865,11 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
sortItemDataTypeList.add(dataTypes.get(i));
break;
}
+ // there is no related column in outputColumnNames
+ if (i == outputColumnNames.size() - 1) {
+ sortItemIndexList.add(-2);
+ sortItemDataTypeList.add(null);
+ }
}
}
});
@@ -1616,10 +1623,23 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
sortItemList,
sortItemIndexList,
sortItemDataTypeList);
+
+ String filePrefix =
+ IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+ + File.separator
+ +
operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId()
+ + File.separator
+ + operatorContext.getDriverContext().getPipelineId()
+ + File.separator;
+
+ context.getDriverContext().setHaveTmpFile(true);
+
context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true);
+
return new SortOperator(
operatorContext,
child,
dataTypes,
+ filePrefix,
MergeSortComparator.getComparator(sortItemList, sortItemIndexList,
sortItemDataTypeList));
}
@@ -2203,7 +2223,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
checkArgument(
sortItemList.isEmpty()
|| (sortItemList.size() == 1
- && Objects.equals(sortItemList.get(0).getSortKey(),
SortKey.TIMESERIES)),
+ && Objects.equals(sortItemList.get(0).getSortKey(),
OrderByKey.TIMESERIES)),
"Last query only support order by timeseries asc/desc");
context.setLastQueryTimeFilter(node.getTimeFilter());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 9bff84a2435..03cc0fba950 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -62,9 +62,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import java.util.ArrayList;
@@ -573,7 +573,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
if (newRoot instanceof LastQueryMergeNode &&
node.getMergeOrderParameter().isEmpty()) {
OrderByParameter orderByParameter =
new OrderByParameter(
- Collections.singletonList(new SortItem(SortKey.TIMESERIES,
Ordering.ASC)));
+ Collections.singletonList(new SortItem(OrderByKey.TIMESERIES,
Ordering.ASC)));
addSortForEachLastQueryNode(root, orderByParameter);
}
root.getChildren().forEach(newRoot::addChild);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
index 5847d079173..3606b93c024 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
@@ -50,15 +50,15 @@ public class OrderByComponent extends StatementNode {
public void addSortItem(SortItem sortItem) {
this.sortItemList.add(sortItem);
switch (sortItem.getSortKey()) {
- case SortKey.TIME:
+ case OrderByKey.TIME:
orderByTime = true;
timeOrderPriority = sortItemList.size() - 1;
break;
- case SortKey.TIMESERIES:
+ case OrderByKey.TIMESERIES:
orderByTimeseries = true;
timeseriesOrderPriority = sortItemList.size() - 1;
break;
- case SortKey.DEVICE:
+ case OrderByKey.DEVICE:
orderByDevice = true;
deviceOrderPriority = sortItemList.size() - 1;
break;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByKey.java
similarity index 97%
copy from
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
copy to
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByKey.java
index 650f6a3178e..4c1e3ae5b8c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByKey.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.plan.statement.component;
-public class SortKey {
+public class OrderByKey {
public static final String TIME = "TIME";
public static final String TIMESERIES = "TIMESERIES";
public static final String DEVICE = "DEVICE";
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 00ecae2d99e..c2cfae97f0d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -407,7 +407,7 @@ public class QueryStatement extends Statement {
List<SortItem> sortItems = getSortItemList();
List<SortItem> newSortItems = new ArrayList<>();
int expressionIndex = 0;
- for (int i = 0; i < sortItems.size() && expressionIndex <
sortItemExpressions.length; i++) {
+ for (int i = 0; i < sortItems.size(); i++) {
SortItem sortItem = sortItems.get(i);
SortItem newSortItem =
new SortItem(sortItem.getSortKey(), sortItem.getOrdering(),
sortItem.getNullOrdering());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
index c6f266d4636..ce8ad3a9de7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.mpp.plan.statement.sys;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStatement;
@@ -73,7 +73,7 @@ public class ShowQueriesStatement extends ShowStatement {
public List<SortItem> getSortItemList() {
if (orderByComponent == null) {
// default order
- return Collections.singletonList(new SortItem(SortKey.TIME,
Ordering.ASC));
+ return Collections.singletonList(new SortItem(OrderByKey.TIME,
Ordering.ASC));
}
return orderByComponent.getSortItemList();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/DiskSpiller.java
b/server/src/main/java/org/apache/iotdb/db/tools/DiskSpiller.java
new file mode 100644
index 00000000000..3f35945bc4a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/DiskSpiller.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiskSpiller {
+
+ private final List<TSDataType> dataTypeList;
+ private final String folderPath;
+ private final String filePrefix;
+ private final String fileSuffix = ".sortTemp";
+
+ private int fileIndex;
+ private boolean folderCreated = false;
+ private final TsBlockSerde serde = new TsBlockSerde();
+
+ public DiskSpiller(String folderPath, String filePrefix, List<TSDataType>
dataTypeList) {
+ this.folderPath = folderPath;
+ this.filePrefix = filePrefix + "-";
+ this.fileIndex = 0;
+ this.dataTypeList = dataTypeList;
+ }
+
+ private void createFolder(String folderPath) throws IOException {
+ Path path = Paths.get(folderPath);
+ Files.createDirectories(path);
+ folderCreated = true;
+ }
+
+ private void spill(List<TsBlock> tsBlocks) throws IOException,
IoTDBException {
+ if (!folderCreated) {
+ createFolder(folderPath);
+ }
+ String fileName = filePrefix + String.format("%05d", fileIndex) +
fileSuffix;
+ fileIndex++;
+
+ writeData(tsBlocks, fileName);
+ }
+
+ // todo: directly serialize the sorted line instead of copy into a new
tsBlock
+ public void spillSortedData(List<SortKey> sortedData) throws IoTDBException {
+ List<TsBlock> tsBlocks = new ArrayList<>();
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(dataTypeList);
+ ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ ColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+
+ for (SortKey sortKey : sortedData) {
+ writeSortKey(sortKey, columnBuilders, timeColumnBuilder);
+ tsBlockBuilder.declarePosition();
+ if (tsBlockBuilder.isFull()) {
+ tsBlocks.add(tsBlockBuilder.build());
+ tsBlockBuilder.reset();
+ timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ }
+ }
+
+ if (!tsBlockBuilder.isEmpty()) {
+ tsBlocks.add(tsBlockBuilder.build());
+ }
+
+ try {
+ spill(tsBlocks);
+ } catch (IOException e) {
+ throw new IoTDBException(
+ "Create file error: " + filePrefix + (fileIndex - 1) + fileSuffix,
+ e,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ private void writeData(List<TsBlock> sortedData, String fileName)
+ throws IOException, IoTDBException {
+ Path filePath = Paths.get(fileName);
+ Files.createFile(filePath);
+ try (FileChannel fileChannel = FileChannel.open(filePath,
StandardOpenOption.WRITE)) {
+ for (TsBlock tsBlock : sortedData) {
+ ByteBuffer tsBlockBuffer = serde.serialize(tsBlock);
+ ByteBuffer length = ByteBuffer.allocate(4);
+ length.putInt(tsBlockBuffer.capacity());
+ length.flip();
+ fileChannel.write(length);
+ fileChannel.write(tsBlockBuffer);
+ }
+ } catch (IOException e) {
+ throw new IoTDBException(
+ "Can't write intermediate sorted data to file: " + fileName,
+ e,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ private void writeSortKey(
+ SortKey sortKey, ColumnBuilder[] columnBuilders, ColumnBuilder
timeColumnBuilder) {
+
timeColumnBuilder.writeLong(sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex));
+ for (int i = 0; i < columnBuilders.length; i++) {
+ if (sortKey.tsBlock.getColumn(i).isNull(sortKey.rowIndex)) {
+ columnBuilders[i].appendNull();
+ } else {
+ columnBuilders[i].write(sortKey.tsBlock.getColumn(i),
sortKey.rowIndex);
+ }
+ }
+ }
+
+ public boolean hasSpilledData() {
+ return fileIndex != 0;
+ }
+
+ private List<String> getFilePaths() {
+ List<String> filePaths = new ArrayList<>();
+ for (int i = 0; i < fileIndex; i++) {
+ filePaths.add(filePrefix + String.format("%05d", i) + fileSuffix);
+ }
+ return filePaths;
+ }
+
+ public List<SortReader> getReaders(SortBufferManager sortBufferManager)
throws IoTDBException {
+ List<String> filePaths = getFilePaths();
+ List<SortReader> sortReaders = new ArrayList<>();
+ try {
+ for (String filePath : filePaths) {
+ sortReaders.add(new FileSpillerReader(filePath, sortBufferManager,
serde));
+ }
+ } catch (IOException e) {
+ throw new IoTDBException(
+ "Can't get file for FileSpillerReader, check if the file exists: " +
filePaths,
+ e,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ return sortReaders;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/tools/FileSpillerReader.java
b/server/src/main/java/org/apache/iotdb/db/tools/FileSpillerReader.java
new file mode 100644
index 00000000000..b5b41ab169b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/FileSpillerReader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class FileSpillerReader implements SortReader {
+
+ private final FileChannel fileChannel;
+ private final List<TsBlock> cacheBlocks;
+ private final SortBufferManager sortBufferManager;
+ private final String fileName;
+ private final TsBlockSerde serde;
+
+ private int tsBlockIndex;
+ private int rowIndex;
+ private boolean isEnd = false;
+
+ public FileSpillerReader(String fileName, SortBufferManager
sortBufferManager, TsBlockSerde serde)
+ throws IOException {
+ this.fileChannel = FileChannel.open(Paths.get(fileName),
StandardOpenOption.READ);
+ this.cacheBlocks = new ArrayList<>();
+ this.rowIndex = 0;
+ this.fileName = fileName;
+ this.sortBufferManager = sortBufferManager;
+ this.serde = serde;
+ }
+
+ @Override
+ public MergeSortKey next() {
+ MergeSortKey sortKey = new MergeSortKey(cacheBlocks.get(tsBlockIndex),
rowIndex);
+ rowIndex++;
+ return sortKey;
+ }
+
+ private boolean readTsBlockFromFile() throws IoTDBException {
+ long bufferSize = sortBufferManager.getReaderBufferAvailable();
+ cacheBlocks.clear();
+ while (bufferSize >= DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) {
+ long size = read();
+ if (size == -1) break;
+ bufferSize -= size;
+ }
+
+ if (cacheBlocks.isEmpty()) {
+ return false;
+ }
+ rowIndex = 0;
+ tsBlockIndex = 0;
+ return true;
+ }
+
+ private long read() throws IoTDBException {
+ try {
+ ByteBuffer bytes = ByteBuffer.allocate(4);
+ int readLen = fileChannel.read(bytes);
+ if (readLen == -1) {
+ return -1;
+ }
+ bytes.flip();
+ int capacity = bytes.getInt();
+ ByteBuffer tsBlockBytes = ByteBuffer.allocate(capacity);
+ fileChannel.read(tsBlockBytes);
+ tsBlockBytes.flip();
+ TsBlock cachedTsBlock = serde.deserialize(tsBlockBytes);
+ cacheBlocks.add(cachedTsBlock);
+ return cachedTsBlock.getRetainedSizeInBytes();
+ } catch (IOException e) {
+ throw new IoTDBException(
+ "Can't read a new tsBlock in FileSpillerReader: " + fileName,
+ e,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IoTDBException {
+
+ if (isEnd) return false;
+
+ if (cacheBlocks.isEmpty()
+ || (rowIndex == cacheBlocks.get(tsBlockIndex).getPositionCount()
+ && tsBlockIndex == cacheBlocks.size() - 1)) {
+ boolean hasData = readTsBlockFromFile();
+ if (!hasData) {
+ isEnd = true;
+ return false;
+ }
+ return true;
+ }
+
+ if (rowIndex < cacheBlocks.get(tsBlockIndex).getPositionCount()) {
+ return true;
+ }
+
+ tsBlockIndex++;
+ rowIndex = 0;
+ return true;
+ }
+
+ @Override
+ public void close() throws IoTDBException {
+ try {
+ fileChannel.close();
+ } catch (IOException e) {
+ throw new IoTDBException(
+ "Can't close fileChannel in FileSpillerReader: " + fileName,
+ e,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemoryReader.java
b/server/src/main/java/org/apache/iotdb/db/tools/MemoryReader.java
new file mode 100644
index 00000000000..9e5252c6a4e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/MemoryReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+
+import java.util.List;
+
+public class MemoryReader implements SortReader {
+
+ // all the data in MemoryReader lies in memory
+ private final List<MergeSortKey> cachedData;
+ private final int size;
+ private int rowIndex;
+
+ public MemoryReader(List<MergeSortKey> cachedTsBlock) {
+ this.cachedData = cachedTsBlock;
+ this.size = cachedTsBlock.size();
+ this.rowIndex = 0;
+ }
+
+ @Override
+ public MergeSortKey next() {
+ MergeSortKey sortKey = cachedData.get(rowIndex);
+ rowIndex++;
+ return sortKey;
+ }
+
+ @Override
+ public boolean hasNext() throws IoTDBException {
+ return cachedData != null && rowIndex != size;
+ }
+
+ @Override
+ public void close() throws IoTDBException {
+ // do nothing
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/tools/SortBufferManager.java
b/server/src/main/java/org/apache/iotdb/db/tools/SortBufferManager.java
new file mode 100644
index 00000000000..a8c8ecc1a06
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/SortBufferManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class SortBufferManager {
+
+ public static final long SORT_BUFFER_SIZE =
+ IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+
+ private long bufferUsed;
+
+ private final long BUFFER_SIZE_FOR_ONE_BRANCH =
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+ private final long BUFFER_AVAILABLE_FOR_ALL_BRANCH;
+ private long readerBuffer = 0;
+ private long branchNum = 0;
+
+ public SortBufferManager() {
+ this.BUFFER_AVAILABLE_FOR_ALL_BRANCH = SORT_BUFFER_SIZE -
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ // the initial value is the buffer for output.
+ this.bufferUsed = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ public void allocateOneSortBranch() {
+ boolean success = allocate(BUFFER_SIZE_FOR_ONE_BRANCH);
+ if (!success) throw new IllegalArgumentException("Not enough memory for
sorting");
+ branchNum++;
+ }
+
+ private boolean check(long size) {
+ return bufferUsed + size < SORT_BUFFER_SIZE;
+ }
+
+ public boolean allocate(long size) {
+ if (check(size)) {
+ bufferUsed += size;
+ return true;
+ }
+ return false;
+ }
+
+ public void releaseOneSortBranch() {
+ branchNum--;
+ if (branchNum != 0) readerBuffer = BUFFER_AVAILABLE_FOR_ALL_BRANCH /
branchNum;
+ }
+
+ public long getReaderBufferAvailable() {
+ if (readerBuffer != 0) return readerBuffer;
+ readerBuffer = BUFFER_AVAILABLE_FOR_ALL_BRANCH / branchNum;
+ return readerBuffer;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
b/server/src/main/java/org/apache/iotdb/db/tools/SortReader.java
similarity index 58%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
rename to server/src/main/java/org/apache/iotdb/db/tools/SortReader.java
index 650f6a3178e..c6bb60410bc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/SortReader.java
@@ -17,14 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.statement.component;
+package org.apache.iotdb.db.tools;
-public class SortKey {
- public static final String TIME = "TIME";
- public static final String TIMESERIES = "TIMESERIES";
- public static final String DEVICE = "DEVICE";
- public static final String QUERYID = "QUERYID";
- public static final String DATANODEID = "DATANODEID";
- public static final String ELAPSEDTIME = "ELAPSEDTIME";
- public static final String STATEMENT = "STATEMENT";
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+
+public interface SortReader {
+
+ /** output the cached data in sortReader, it needs to be called after
hasNext() returns true. */
+ MergeSortKey next();
+
+ /**
+ * Check if there is cached data in sortReader, cache more data if current
ones are run out. This
+ * method should be called before next() to ensure that there is data to
read.
+ */
+ boolean hasNext() throws IoTDBException;
+
+ void close() throws IoTDBException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
index f80c9c8eb2f..7cab3e5e736 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
@@ -26,9 +26,9 @@ public class MergeSortHeap {
private final MergeSortKey[] heap;
private int heapSize;
- private final Comparator<MergeSortKey> comparator;
+ private final Comparator<SortKey> comparator;
- public MergeSortHeap(int childNum, Comparator<MergeSortKey> comparator) {
+ public MergeSortHeap(int childNum, Comparator<SortKey> comparator) {
this.heap = new MergeSortKey[childNum];
this.heapSize = 0;
this.comparator = comparator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
index 542501d51c9..5cc23eeb219 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
@@ -21,21 +21,24 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-public class MergeSortKey {
+public class MergeSortKey extends SortKey {
- public TsBlock tsBlock;
- public int rowIndex;
-
- public int columnIndex;
+ // This filed only used in operation as a intermediate tool, which is used
to locate the origin of
+ // sortKey when there are more than one channels as input of mergeSort.
+ // It was initialized during the calculation in operator.
+ public int inputChannelIndex;
public MergeSortKey(TsBlock tsBlock, int rowIndex) {
- this.tsBlock = tsBlock;
- this.rowIndex = rowIndex;
+ super(tsBlock, rowIndex);
+ }
+
+ public MergeSortKey(TsBlock tsBlock, int rowIndex, int inputChannelIndex) {
+ super(tsBlock, rowIndex);
+ this.inputChannelIndex = inputChannelIndex;
}
- public MergeSortKey(TsBlock tsBlock, int rowIndex, int columnIndex) {
- this.tsBlock = tsBlock;
- this.rowIndex = rowIndex;
- this.columnIndex = columnIndex;
+ public MergeSortKey(SortKey sortKey) {
+ super(sortKey.tsBlock, sortKey.rowIndex);
+ this.inputChannelIndex = -1;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/SortKey.java
similarity index 74%
copy from
server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
copy to
server/src/main/java/org/apache/iotdb/db/utils/datastructure/SortKey.java
index 542501d51c9..0ef1f3b779d 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/SortKey.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,21 +21,13 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-public class MergeSortKey {
+public class SortKey {
public TsBlock tsBlock;
public int rowIndex;
- public int columnIndex;
-
- public MergeSortKey(TsBlock tsBlock, int rowIndex) {
- this.tsBlock = tsBlock;
- this.rowIndex = rowIndex;
- }
-
- public MergeSortKey(TsBlock tsBlock, int rowIndex, int columnIndex) {
+ public SortKey(TsBlock tsBlock, int rowIndex) {
this.tsBlock = tsBlock;
this.rowIndex = rowIndex;
- this.columnIndex = columnIndex;
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index a8c51f77f83..124549bed3c 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -51,11 +51,11 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -326,8 +326,8 @@ public class MergeSortOperatorTest {
tsDataTypes,
MergeSortComparator.getComparator(
Arrays.asList(
- new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering)),
+ new SortItem(OrderByKey.TIME, timeOrdering),
+ new SortItem(OrderByKey.DEVICE, deviceOrdering)),
Arrays.asList(-1, 0),
Arrays.asList(TSDataType.INT64, TSDataType.TEXT)));
mergeSortOperator
@@ -794,8 +794,8 @@ public class MergeSortOperatorTest {
tsDataTypes,
MergeSortComparator.getComparator(
Arrays.asList(
- new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering)),
+ new SortItem(OrderByKey.TIME, timeOrdering),
+ new SortItem(OrderByKey.DEVICE, deviceOrdering)),
Arrays.asList(-1, 0),
Arrays.asList(TSDataType.INT64, TSDataType.TEXT)));
mergeSortOperator1
@@ -808,8 +808,8 @@ public class MergeSortOperatorTest {
tsDataTypes,
MergeSortComparator.getComparator(
Arrays.asList(
- new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering)),
+ new SortItem(OrderByKey.TIME, timeOrdering),
+ new SortItem(OrderByKey.DEVICE, deviceOrdering)),
Arrays.asList(-1, 0),
Arrays.asList(TSDataType.INT64, TSDataType.TEXT)));
mergeSortOperator2
@@ -823,8 +823,8 @@ public class MergeSortOperatorTest {
tsDataTypes,
MergeSortComparator.getComparator(
Arrays.asList(
- new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering)),
+ new SortItem(OrderByKey.TIME, timeOrdering),
+ new SortItem(OrderByKey.DEVICE, deviceOrdering)),
Arrays.asList(-1, 0),
Arrays.asList(TSDataType.INT64, TSDataType.TEXT)));
mergeSortOperator
@@ -1267,8 +1267,8 @@ public class MergeSortOperatorTest {
tsDataTypes,
MergeSortComparator.getComparator(
Arrays.asList(
- new SortItem(SortKey.DEVICE, deviceOrdering),
- new SortItem(SortKey.TIME, timeOrdering)),
+ new SortItem(OrderByKey.DEVICE, deviceOrdering),
+ new SortItem(OrderByKey.TIME, timeOrdering)),
Arrays.asList(0, -1),
Arrays.asList(TSDataType.TEXT, TSDataType.INT64)));
mergeSortOperator
@@ -1533,11 +1533,11 @@ public class MergeSortOperatorTest {
List<OperatorContext> operatorContexts =
driverContext.getOperatorContexts();
List<TSDataType> dataTypes =
DatasetHeaderFactory.getShowQueriesHeader().getRespDataTypes();
- Comparator<MergeSortKey> comparator =
+ Comparator<SortKey> comparator =
MergeSortComparator.getComparator(
Arrays.asList(
- new SortItem(SortKey.TIME, Ordering.ASC),
- new SortItem(SortKey.DATANODEID, Ordering.DESC)),
+ new SortItem(OrderByKey.TIME, Ordering.ASC),
+ new SortItem(OrderByKey.DATANODEID, Ordering.DESC)),
ImmutableList.of(-1, 1),
ImmutableList.of(TSDataType.INT64, TSDataType.INT32));
@@ -1561,9 +1561,11 @@ public class MergeSortOperatorTest {
ShowQueriesOperator showQueriesOperator2 =
new ShowQueriesOperator(operatorContexts.get(1), planNodeId1,
coordinator2);
SortOperator sortOperator1 =
- new SortOperator(operatorContexts.get(2), showQueriesOperator1,
dataTypes, comparator);
+ new SortOperator(
+ operatorContexts.get(2), showQueriesOperator1, dataTypes, "",
comparator);
SortOperator sortOperator2 =
- new SortOperator(operatorContexts.get(3), showQueriesOperator2,
dataTypes, comparator);
+ new SortOperator(
+ operatorContexts.get(3), showQueriesOperator2, dataTypes, "",
comparator);
Operator root =
new MergeSortOperator(
operatorContexts.get(4),
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index ec3fa4a9b98..2fcdff2e76a 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
@@ -477,11 +478,18 @@ public class OperatorMemoryTest {
Mockito.mock(OperatorContext.class),
child,
Collections.singletonList(TSDataType.INT32),
+ "",
null);
- assertEquals(2048 + 512, sortOperator.calculateMaxPeekMemory());
- assertEquals(1024, sortOperator.calculateMaxReturnSize());
- assertEquals(512, sortOperator.calculateRetainedSizeAfterCallingNext());
+ assertEquals(
+ 2048 + 512 +
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize(),
+ sortOperator.calculateMaxPeekMemory());
+ assertEquals(
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+ sortOperator.calculateMaxReturnSize());
+ assertEquals(
+ 512 + IoTDBDescriptor.getInstance().getConfig().getSortBufferSize(),
+ sortOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SortOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SortOperatorTest.java
new file mode 100644
index 00000000000..d4d332adc8c
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SortOperatorTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import io.airlift.units.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SortOperatorTest {
+
+ private static final String SORT_OPERATOR_TEST_SG = "root.SortOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private int dataNodeId;
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(200);
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources,
SORT_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
+ }
+
+ //
------------------------------------------------------------------------------------------------
+ // sortOperatorTest
+ //
------------------------------------------------------------------------------------------------
+ // SortOperator
+ // |
+ // TimeJoinOperator
+ // _____________________|______________________________
+ // / | \
+ // SeriesScanOperator TimeJoinOperator
TimeJoinOperator
+ // / \ /
\
+ // SeriesScanOperator SeriesScanOperator SeriesScanOperator
SeriesScanOperator
+ //
------------------------------------------------------------------------------------------------
+ public Operator genSortOperator(Ordering timeOrdering, boolean
getSortOperator) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"sortOperator-test-instance-notification");
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1,
SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2,
SeriesScanOperator.class.getSimpleName());
+ driverContext.addOperatorContext(
+ 3, new PlanNodeId("3"),
RowBasedTimeJoinOperator.class.getSimpleName());
+ driverContext.addOperatorContext(4, new PlanNodeId("4"),
SortOperator.class.getSimpleName());
+
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(SORT_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(SORT_OPERATOR_TEST_SG + ".device1.sensor0",
TSDataType.INT32);
+
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(0),
+ planNodeId1,
+ measurementPath1,
+ timeOrdering,
+ SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath1));
+ seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(1),
+ planNodeId2,
+ measurementPath2,
+ timeOrdering,
+ SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath2));
+ seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ List<TSDataType> tsDataTypes =
+ new LinkedList<>(Arrays.asList(TSDataType.INT32, TSDataType.INT32));
+
+ RowBasedTimeJoinOperator timeJoinOperator1 =
+ new RowBasedTimeJoinOperator(
+ driverContext.getOperatorContexts().get(2),
+ Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+
+ if (!getSortOperator) return timeJoinOperator1;
+
+ Comparator<SortKey> comparator =
+ Comparator.comparing(
+ (SortKey sortKey) ->
sortKey.tsBlock.getColumn(0).getInt(sortKey.rowIndex));
+
+ OperatorContext operatorContext =
driverContext.getOperatorContexts().get(3);
+ String filePrefix =
+ "target"
+ + File.separator
+ + operatorContext
+ .getDriverContext()
+ .getFragmentInstanceContext()
+ .getId()
+ .getFragmentInstanceId()
+ + File.separator
+ + operatorContext.getDriverContext().getPipelineId()
+ + File.separator;
+ SortOperator sortOperator =
+ new SortOperator(operatorContext, timeJoinOperator1, tsDataTypes,
filePrefix, comparator);
+ sortOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
+ return sortOperator;
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ return null;
+ }
+ }
+
+ long getValue(long expectedTime) {
+ if (expectedTime < 200) {
+ return 20000 + expectedTime;
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ return 10000 + expectedTime;
+ } else {
+ return expectedTime;
+ }
+ }
+
+ // with data spilling
+ @Test
+ public void sortOperatorSpillingTest() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
+ SortOperator root = (SortOperator) genSortOperator(Ordering.ASC, true);
+ int lastValue = -1;
+ int count = 0;
+ while (root.isBlocked().isDone() && root.hasNext()) {
+ TsBlock tsBlock = root.next();
+ if (tsBlock == null) continue;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeByIndex(i);
+ int v1 = tsBlock.getColumn(0).getInt(i);
+ int v2 = tsBlock.getColumn(1).getInt(i);
+ assertTrue(lastValue == -1 || lastValue < v1);
+ assertEquals(getValue(time), v1);
+ assertEquals(v1, v2);
+ lastValue = v1;
+ count++;
+ }
+ }
+ root.close();
+ assertEquals(count, 500);
+ }
+
+ // no data spilling
+ @Test
+ public void sortOperatorNormalTest() throws Exception {
+ Operator root = genSortOperator(Ordering.ASC, true);
+ int lastValue = -1;
+ int count = 0;
+ while (root.isBlocked().isDone() && root.hasNext()) {
+ TsBlock tsBlock = root.next();
+ if (tsBlock == null) continue;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeByIndex(i);
+ int v1 = tsBlock.getColumn(0).getInt(i);
+ int v2 = tsBlock.getColumn(1).getInt(i);
+ assertTrue(lastValue == -1 || lastValue < v1);
+ assertEquals(getValue(time), v1);
+ assertEquals(v1, v2);
+ lastValue = v1;
+ count++;
+ }
+ }
+ root.close();
+ assertEquals(count, 500);
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
index c563c16e890..89c303cfdf5 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
@@ -39,9 +39,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.literal.LongLiteral;
import java.time.ZonedDateTime;
@@ -150,8 +150,8 @@ public class TestPlanBuilder {
new PlanNodeId(id),
new OrderByParameter(
Arrays.asList(
- new SortItem(SortKey.DEVICE, Ordering.ASC),
- new SortItem(SortKey.TIME, Ordering.ASC))),
+ new SortItem(OrderByKey.DEVICE, Ordering.ASC),
+ new SortItem(OrderByKey.TIME, Ordering.ASC))),
Arrays.asList(DEVICE, measurement),
deviceToMeasurementIndexesMap);
deviceViewNode.addChildDeviceNode(device, getRoot());
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 9b2c86213d0..5571dbb2d33 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -50,9 +50,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -145,7 +145,7 @@ public class QueryLogicalPlanUtil {
sourceNodeList,
TimeFilter.gt(100),
new OrderByParameter(
- Collections.singletonList(new SortItem(SortKey.TIMESERIES,
Ordering.ASC))));
+ Collections.singletonList(new SortItem(OrderByKey.TIMESERIES,
Ordering.ASC))));
querySQLs.add(sql);
sqlToPlanMap.put(sql, lastQueryNode);
@@ -348,8 +348,8 @@ public class QueryLogicalPlanUtil {
queryId.genPlanNodeId(),
new OrderByParameter(
Arrays.asList(
- new SortItem(SortKey.DEVICE, Ordering.ASC),
- new SortItem(SortKey.TIME, Ordering.DESC))),
+ new SortItem(OrderByKey.DEVICE, Ordering.ASC),
+ new SortItem(OrderByKey.TIME, Ordering.DESC))),
Arrays.asList(ColumnHeaderConstant.DEVICE, "s3", "s1", "s2", "s4"),
deviceToMeasurementIndexesMap);
deviceViewNode.addChildDeviceNode("root.sg.d1", filterNode1);
@@ -753,8 +753,8 @@ public class QueryLogicalPlanUtil {
queryId.genPlanNodeId(),
new OrderByParameter(
Arrays.asList(
- new SortItem(SortKey.DEVICE, Ordering.ASC),
- new SortItem(SortKey.TIME, Ordering.DESC))),
+ new SortItem(OrderByKey.DEVICE, Ordering.ASC),
+ new SortItem(OrderByKey.TIME, Ordering.DESC))),
Arrays.asList(
ColumnHeaderConstant.DEVICE, "count(s1)", "max_value(s2)",
"last_value(s1)"),
deviceToMeasurementIndexesMap);
@@ -1037,8 +1037,8 @@ public class QueryLogicalPlanUtil {
queryId.genPlanNodeId(),
new OrderByParameter(
Arrays.asList(
- new SortItem(SortKey.DEVICE, Ordering.ASC),
- new SortItem(SortKey.TIME, Ordering.DESC))),
+ new SortItem(OrderByKey.DEVICE, Ordering.ASC),
+ new SortItem(OrderByKey.TIME, Ordering.DESC))),
Arrays.asList(
ColumnHeaderConstant.DEVICE, "count(s1)", "max_value(s2)",
"last_value(s1)"),
deviceToMeasurementIndexesMap);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
index 0e04966e811..a5284e7bdb8 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
@@ -24,9 +24,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.junit.Test;
@@ -46,8 +46,8 @@ public class DeviceViewNodeSerdeTest {
new PlanNodeId("TestDeviceMergeNode"),
new OrderByParameter(
Arrays.asList(
- new SortItem(SortKey.DEVICE, Ordering.ASC),
- new SortItem(SortKey.TIME, Ordering.DESC))),
+ new SortItem(OrderByKey.DEVICE, Ordering.ASC),
+ new SortItem(OrderByKey.TIME, Ordering.DESC))),
Arrays.asList("s1", "s2"),
new HashMap<>());
deviceViewNode.addChildDeviceNode("root.sg.d1", timeJoinNode1);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
index eb108b0efbb..f99cd565f6b 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
@@ -25,9 +25,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
@@ -56,7 +56,7 @@ public class SortNodeSerdeTest {
new SortNode(
new PlanNodeId("TestSortNode"),
seriesScanNode,
- new OrderByParameter(ImmutableList.of(new SortItem(SortKey.TIME,
Ordering.ASC))));
+ new OrderByParameter(ImmutableList.of(new
SortItem(OrderByKey.TIME, Ordering.ASC))));
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
sortNode.serialize(byteBuffer);