This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch enhance_merge_management in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit efa77dd52f0f073dfc47c8af8d4709e828d1a12c Author: jt2594838 <[email protected]> AuthorDate: Tue Jun 9 19:24:23 2020 +0800 enhance merge task management --- docs/UserGuide/Operation Manual/SQL Reference.md | 8 + .../org/apache/iotdb/db/qp/strategy/SqlBase.g4 | 5 + .../org/apache/iotdb/db/conf/IoTDBConstant.java | 6 + .../iotdb/db/engine/merge/manage/MergeFuture.java | 153 +++++++++++++++ .../iotdb/db/engine/merge/manage/MergeManager.java | 193 +++++++++++++++++- .../db/engine/merge/manage/MergeManagerMBean.java | 26 +++ .../db/engine/merge/manage/MergeThreadPool.java | 48 +++++ .../iotdb/db/engine/merge/task/MergeFileTask.java | 107 +++++++--- .../db/engine/merge/task/MergeMultiChunkTask.java | 216 ++++++++++++++------- .../iotdb/db/engine/merge/task/MergeTask.java | 77 +++++++- .../db/engine/merge/task/RecoverMergeTask.java | 13 +- .../main/java/org/apache/iotdb/db/qp/Planner.java | 2 + .../apache/iotdb/db/qp/constant/SQLConstant.java | 4 + .../apache/iotdb/db/qp/executor/PlanExecutor.java | 49 +++++ .../org/apache/iotdb/db/qp/logical/Operator.java | 3 +- .../db/qp/logical/sys/ShowMergeStatusOperator.java | 30 +++ .../db/qp/physical/sys/ShowMergeStatusPlan.java | 27 +++ .../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 2 +- .../iotdb/db/qp/strategy/LogicalGenerator.java | 8 + .../iotdb/db/qp/strategy/PhysicalGenerator.java | 3 + .../org/apache/iotdb/db/service/RPCService.java | 4 +- .../org/apache/iotdb/db/service/ServiceType.java | 2 +- .../org/apache/iotdb/db/service/StaticResps.java | 12 ++ .../org/apache/iotdb/db/service/TSServiceImpl.java | 2 + .../iotdb/db/engine/merge/MergeManagerTest.java | 168 ++++++++++++++++ .../iotdb/db/integration/IoTDBMergeTest.java | 47 ++++- 26 files changed, 1092 insertions(+), 123 deletions(-) diff --git a/docs/UserGuide/Operation Manual/SQL Reference.md b/docs/UserGuide/Operation Manual/SQL Reference.md index 3e15816..d23b36b 100644 --- a/docs/UserGuide/Operation Manual/SQL Reference.md +++ b/docs/UserGuide/Operation Manual/SQL Reference.md @@ -208,6 +208,14 @@ Eg: IoTDB > SHOW STORAGE GROUP Note: This statement can be used in IoTDB Client and JDBC. ``` +* Show Merge Status Statement + +``` +SHOW MERGE STATUS +Eg: IoTDB > SHOW MERGE STATUS +Note: This statement can be used in IoTDB Client and JDBC. +``` + * Count Timeseries Statement ``` diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 index 7d5b9f2..c464446 100644 --- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 +++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 @@ -73,6 +73,7 @@ statement | SHOW STORAGE GROUP #showStorageGroup | SHOW CHILD PATHS prefixPath? #showChildPaths | SHOW DEVICES prefixPath? #showDevices + | SHOW MERGE STATUS #showMergeStatus | COUNT TIMESERIES prefixPath? (GROUP BY LEVEL OPERATOR_EQ INT)? #countTimeseries | COUNT NODES prefixPath LEVEL OPERATOR_EQ INT #countNodes | LOAD CONFIGURATION (MINUS GLOBAL)? #loadConfigurationStatement @@ -876,6 +877,10 @@ TRUE FALSE : F A L S E ; + +STATUS + : S T A T U S + ; //============================ // End of the keywords list //============================ diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index 58e3154..836afbd 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -73,6 +73,12 @@ public class IoTDBConstant { public static final String COLUMN_STORAGE_GROUP = "storage group"; public static final String COLUMN_TTL = "ttl"; + public static final String COLUMN_TASK_NAME = "task name"; + public static final String COLUMN_CREATED_TIME = "created time"; + public static final String COLUMN_PROGRESS = "progress"; + public static final String COLUMN_CANCELLED = "cancelled"; + public static final String COLUMN_DONE = "done"; + public static final String PATH_WILDCARD = "*"; // data folder name diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java new file mode 100644 index 0000000..8d42003 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; +import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask; +import org.apache.iotdb.db.engine.merge.task.MergeTask; + +public abstract class MergeFuture extends FutureTask<Void> implements Comparable<MergeFuture> { + + private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss" + + ".SSS'Z'"); + private Date createdTime; + + public MergeFuture(Callable callable) { + super(callable); + createdTime = new Date(System.currentTimeMillis()); + } + + public String getCreatedTime() { + return dateFormat.format(createdTime); + } + + public abstract String getTaskName(); + + public abstract String getProgress(); + + @Override + public int compareTo(MergeFuture future) { + return this.createdTime.compareTo(future.createdTime); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MergeFuture future = (MergeFuture) o; + return Objects.equals(createdTime, future.createdTime); + } + + @Override + public int hashCode() { + return Objects.hash(createdTime); + } + + public static class MainMergeFuture extends MergeFuture { + + private MergeTask bindingTask; + + public MainMergeFuture(MergeTask task) { + super(task); + bindingTask = task; + } + + @Override + public String getTaskName() { + return bindingTask.getTaskName(); + } + + @Override + public String getProgress() { + return bindingTask.getProgress(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + MainMergeFuture that = (MainMergeFuture) o; + return Objects.equals(bindingTask, that.bindingTask); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), bindingTask); + } + } + + public static class SubMergeFuture extends MergeFuture { + + private MergeChunkHeapTask bindingTask; + + public SubMergeFuture(MergeChunkHeapTask task) { + super(task); + bindingTask = task; + } + + @Override + public String getTaskName() { + return bindingTask.getTaskName(); + } + + @Override + public String getProgress() { + return bindingTask.getProgress(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + SubMergeFuture that = (SubMergeFuture) o; + return Objects.equals(bindingTask, that.bindingTask); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), bindingTask); + } + } +} + + diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java index 06de716..1c87a40 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java @@ -19,7 +19,16 @@ package org.apache.iotdb.db.engine.merge.manage; -import java.util.concurrent.Callable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -27,11 +36,14 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask; import org.apache.iotdb.db.engine.merge.task.MergeTask; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.JMXService; import org.apache.iotdb.db.service.ServiceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,15 +52,23 @@ import org.slf4j.LoggerFactory; * MergeManager provides a ThreadPool to queue and run all merge tasks to restrain the total * resources occupied by merge and manages a Timer to periodically issue a global merge. */ -public class MergeManager implements IService { +public class MergeManager implements IService, MergeManagerMBean { private static final Logger logger = LoggerFactory.getLogger(MergeManager.class); private static final MergeManager INSTANCE = new MergeManager(); + private final String mbeanName = String + .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, + getID().getJmxName()); private AtomicInteger threadCnt = new AtomicInteger(); private ThreadPoolExecutor mergeTaskPool; private ThreadPoolExecutor mergeChunkSubTaskPool; private ScheduledExecutorService timedMergeThreadPool; + private ScheduledExecutorService taskCleanerThreadPool; + + // TODO: add to JMX + private Map<String, Set<MergeFuture>> storageGroupMainTasks = new ConcurrentHashMap<>(); + private Map<String, Set<MergeFuture>> storageGroupSubTasks = new ConcurrentHashMap<>(); private MergeManager() { } @@ -58,15 +78,20 @@ public class MergeManager implements IService { } public void submitMainTask(MergeTask mergeTask) { - mergeTaskPool.submit(mergeTask); + MergeFuture future = (MergeFuture) mergeTaskPool.submit(mergeTask); + storageGroupMainTasks.computeIfAbsent(mergeTask.getStorageGroupName(), + k -> new ConcurrentSkipListSet<>()).add(future); } - public Future submitChunkSubTask(Callable callable) { - return mergeChunkSubTaskPool.submit(callable); + public Future<Void> submitChunkSubTask(MergeChunkHeapTask task) { + MergeFuture future = (MergeFuture) mergeChunkSubTaskPool.submit(task); + storageGroupSubTasks.computeIfAbsent(task.getStorageGroupName(), k -> new ConcurrentSkipListSet<>()).add(future); + return future; } @Override public void start() { + JMXService.registerMBean(this, mbeanName); if (mergeTaskPool == null) { int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum(); if (threadNum <= 0) { @@ -78,11 +103,9 @@ public class MergeManager implements IService { chunkSubThreadNum = 1; } - mergeTaskPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum, + mergeTaskPool = new MergeThreadPool(threadNum, r -> new Thread(r, "MergeThread-" + threadCnt.getAndIncrement())); - mergeChunkSubTaskPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum * chunkSubThreadNum, + mergeChunkSubTaskPool = new MergeThreadPool(threadNum * chunkSubThreadNum, r -> new Thread(r, "MergeChunkSubThread-" + threadCnt.getAndIncrement())); long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec(); if (mergeInterval > 0) { @@ -91,6 +114,10 @@ public class MergeManager implements IService { timedMergeThreadPool.scheduleAtFixedRate(this::mergeAll, mergeInterval, mergeInterval, TimeUnit.SECONDS); } + + taskCleanerThreadPool = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r, + "MergeTaskCleaner")); + taskCleanerThreadPool.scheduleAtFixedRate(this::cleanFinishedTask, 30, 30, TimeUnit.MINUTES); logger.info("MergeManager started"); } } @@ -102,6 +129,8 @@ public class MergeManager implements IService { timedMergeThreadPool.shutdownNow(); timedMergeThreadPool = null; } + taskCleanerThreadPool.shutdownNow(); + taskCleanerThreadPool = null; mergeTaskPool.shutdownNow(); mergeChunkSubTaskPool.shutdownNow(); logger.info("Waiting for task pool to shut down"); @@ -116,6 +145,7 @@ public class MergeManager implements IService { mergeTaskPool = null; logger.info("MergeManager stopped"); } + JMXService.deregisterMBean(mbeanName); } @Override @@ -125,6 +155,9 @@ public class MergeManager implements IService { awaitTermination(timedMergeThreadPool, millseconds); timedMergeThreadPool = null; } + awaitTermination(taskCleanerThreadPool, millseconds); + taskCleanerThreadPool = null; + awaitTermination(mergeTaskPool, millseconds); awaitTermination(mergeChunkSubTaskPool, millseconds); logger.info("Waiting for task pool to shut down"); @@ -164,4 +197,146 @@ public class MergeManager implements IService { logger.error("Cannot perform a global merge because", e); } } + + /** + * Abort all merges of a storage group. The caller must acquire the write lock of the + * corresponding storage group. + * @param storageGroup + */ + @Override + public void abortMerge(String storageGroup) { + // abort sub-tasks first + Set<MergeFuture> subTasks = storageGroupSubTasks + .getOrDefault(storageGroup, Collections.emptySet()); + Iterator<MergeFuture> subIterator = subTasks.iterator(); + while (subIterator.hasNext()) { + Future<Void> next = subIterator.next(); + if (!next.isDone() && !next.isCancelled()) { + next.cancel(true); + } + subIterator.remove(); + } + // abort main tasks + Set<MergeFuture> mainTasks = storageGroupMainTasks + .getOrDefault(storageGroup, Collections.emptySet()); + Iterator<MergeFuture> mainIterator = mainTasks.iterator(); + while (mainIterator.hasNext()) { + Future<Void> next = mainIterator.next(); + if (!next.isDone() && !next.isCancelled()) { + next.cancel(true); + } + mainIterator.remove(); + } + } + + private void cleanFinishedTask() { + for (Set<MergeFuture> subTasks : storageGroupSubTasks.values()) { + subTasks.removeIf(next -> next.isDone() || next.isCancelled()); + } + for (Set<MergeFuture> mainTasks : storageGroupMainTasks.values()) { + mainTasks.removeIf(next -> next.isDone() || next.isCancelled()); + } + } + + /** + * + * @return 2 maps, the first map contains status of main merge tasks and the second map + * contains status of merge chunk heap tasks, both map use storage groups as keys and list of + * merge status as values. + */ + public Map<String, List<TaskStatus>>[] collectTaskStatus() { + Map<String, List<TaskStatus>>[] result = new Map[2]; + result[0] = new HashMap<>(); + result[1] = new HashMap<>(); + for (Entry<String, Set<MergeFuture>> stringSetEntry : storageGroupMainTasks.entrySet()) { + String storageGroup = stringSetEntry.getKey(); + Set<MergeFuture> tasks = stringSetEntry.getValue(); + for (MergeFuture task : tasks) { + result[0].computeIfAbsent(storageGroup, s -> new ArrayList<>()).add(new TaskStatus(task)); + } + } + + for (Entry<String, Set<MergeFuture>> stringSetEntry : storageGroupSubTasks.entrySet()) { + String storageGroup = stringSetEntry.getKey(); + Set<MergeFuture> tasks = stringSetEntry.getValue(); + for (MergeFuture task : tasks) { + result[1].computeIfAbsent(storageGroup, s -> new ArrayList<>()).add(new TaskStatus(task)); + } + } + return result; + } + + public String genMergeTaskReport() { + Map<String, List<TaskStatus>>[] statusMaps = collectTaskStatus(); + StringBuilder builder = new StringBuilder("Main tasks:").append(System.lineSeparator()); + for (Entry<String, List<TaskStatus>> stringListEntry : statusMaps[0].entrySet()) { + String storageGroup = stringListEntry.getKey(); + List<TaskStatus> statusList = stringListEntry.getValue(); + builder.append("\t").append("Storage group: ").append(storageGroup).append(System.lineSeparator()); + for (TaskStatus status : statusList) { + builder.append("\t\t").append(status.toString()).append(System.lineSeparator()); + } + } + + builder.append("Sub tasks:").append(System.lineSeparator()); + for (Entry<String, List<TaskStatus>> stringListEntry : statusMaps[1].entrySet()) { + String storageGroup = stringListEntry.getKey(); + List<TaskStatus> statusList = stringListEntry.getValue(); + builder.append("\t").append("Storage group: ").append(storageGroup).append(System.lineSeparator()); + for (TaskStatus status : statusList) { + builder.append("\t\t").append(status.toString()).append(System.lineSeparator()); + } + } + return builder.toString(); + } + + @Override + public void printMergeStatus() { + if (logger.isInfoEnabled()) { + logger.info("Running tasks:\n {}", genMergeTaskReport()); + } + } + + public static class TaskStatus { + private String taskName; + private String createdTime; + private String progress; + private boolean isDone; + private boolean isCancelled; + + public TaskStatus(MergeFuture future) { + this.taskName = future.getTaskName(); + this.createdTime = future.getCreatedTime(); + this.progress = future.getProgress(); + this.isCancelled = future.isCancelled(); + this.isDone = future.isDone(); + } + + @Override + public String toString() { + return String.format("%s, " + + "%s, %s, done:%s, cancelled:%s", taskName, + createdTime, progress, isDone, isCancelled); + } + + public String getTaskName() { + return taskName; + } + + public String getCreatedTime() { + return createdTime; + } + + public String getProgress() { + return progress; + } + + public boolean isDone() { + return isDone; + } + + public boolean isCancelled() { + return isCancelled; + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java new file mode 100644 index 0000000..645f51f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +public interface MergeManagerMBean { + void printMergeStatus(); + + void abortMerge(String storageGroup); +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java new file mode 100644 index 0000000..0ac292f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.iotdb.db.engine.merge.manage.MergeFuture.MainMergeFuture; +import org.apache.iotdb.db.engine.merge.manage.MergeFuture.SubMergeFuture; +import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask; +import org.apache.iotdb.db.engine.merge.task.MergeTask; + +public class MergeThreadPool extends ThreadPoolExecutor { + + public MergeThreadPool(int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, corePoolSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + threadFactory); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + if (callable instanceof MergeTask) { + return (RunnableFuture<T>) new MainMergeFuture((MergeTask) callable); + } else { + return (RunnableFuture<T>) new SubMergeFuture((MergeChunkHeapTask) callable); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java index 15cfa6e..4a2efbf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java @@ -65,6 +65,9 @@ class MergeFileTask { private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + private int currentMergeIndex; + private String currMergeFile; + MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger, MergeResource resource, List<TsFileResource> unmergedSeqFiles) { this.taskName = taskName; @@ -81,8 +84,11 @@ class MergeFileTask { logger.info("{} starts to merge {} files", taskName, unmergedFiles.size()); } long startTime = System.currentTimeMillis(); - int cnt = 0; - for (TsFileResource seqFile : unmergedFiles) { + for (int i = 0; i < unmergedFiles.size(); i++) { + TsFileResource seqFile = unmergedFiles.get(i); + currentMergeIndex = i; + currMergeFile = seqFile.getPath(); + int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0); int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0); if (mergedChunkNum >= unmergedChunkNum) { @@ -102,10 +108,13 @@ class MergeFileTask { } moveMergedToOld(seqFile); } - cnt++; - if (logger.isInfoEnabled()) { - logger.debug("{} has merged {}/{} files", taskName, cnt, unmergedFiles.size()); + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return; } + + logProgress(); } if (logger.isInfoEnabled()) { logger.info("{} has merged all files after {}ms", taskName, @@ -114,6 +123,18 @@ class MergeFileTask { mergeLogger.logMergeEnd(); } + private void logProgress() { + if (logger.isInfoEnabled()) { + logger.debug("{} has merged {}, processed {}/{} files", taskName, currMergeFile, + currentMergeIndex + 1, unmergedFiles.size()); + } + } + + public String getProgress() { + return String.format("Merging %s, processed %d/%d files", currMergeFile, + currentMergeIndex + 1, unmergedFiles.size()); + } + private void moveMergedToOld(TsFileResource seqFile) throws IOException { int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0); if (mergedChunkNum == 0) { @@ -127,17 +148,8 @@ class MergeFileTask { FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getPath()); resource.removeFileReader(seqFile); - TsFileIOWriter oldFileWriter; - try { - oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile()); - mergeLogger.logFileMergeStart(seqFile.getFile(), - ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition()); - logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile); - ((ForceAppendTsFileWriter) oldFileWriter).doTruncate(); - } catch (TsFileNotCompleteException e) { - // this file may already be truncated if this merge is a system reboot merge - oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile()); - } + TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile); + // filter the chunks that have been merged oldFileWriter.filterChunks(context.getUnmergedChunkStartTimes().get(seqFile)); @@ -156,6 +168,13 @@ class MergeFileTask { String deviceId = entry.getKey(); List<ChunkMetadata> chunkMetadataList = entry.getValue(); writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, oldFileWriter); + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + oldFileWriter.close(); + restoreOldFile(seqFile); + return; + } } } oldFileWriter.endFile(); @@ -175,19 +194,50 @@ class MergeFileTask { .getFile(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); seqFile.setFile(nextMergeVersionFile); } catch (Exception e) { - RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter( - seqFile.getFile()); - if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) { - oldFileRecoverWriter.endFile(); - } else { - oldFileRecoverWriter.close(); - } + restoreOldFile(seqFile); throw e; } finally { seqFile.writeUnlock(); } } + /** + * Restore an old seq file which is being written new chunks when exceptions occur or the task + * is aborted. + * @param seqFile + * @throws IOException + */ + private void restoreOldFile(TsFileResource seqFile) throws IOException { + RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter( + seqFile.getFile()); + if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) { + oldFileRecoverWriter.endFile(); + } else { + oldFileRecoverWriter.close(); + } + } + + /** + * Open an appending writer for an old seq file so we can add new chunks to it. + * @param seqFile + * @return + * @throws IOException + */ + private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws IOException { + TsFileIOWriter oldFileWriter; + try { + oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile()); + mergeLogger.logFileMergeStart(seqFile.getFile(), + ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition()); + logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile); + ((ForceAppendTsFileWriter) oldFileWriter).doTruncate(); + } catch (TsFileNotCompleteException e) { + // this file may already be truncated if this merge is a system reboot merge + oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile()); + } + return oldFileWriter; + } + private void updateHistoricalVersions(TsFileResource seqFile) { // as the new file contains data of other files, track their versions in the new file // so that we will be able to compare data across different IoTDBs that share the same file @@ -245,6 +295,12 @@ class MergeFileTask { fileWriter.startChunkGroup(path.getDevice()); long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetadataList, resource.getFileReader(seqFile), fileWriter); + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return; + } + fileWriter.writeVersion(maxVersion + 1); fileWriter.endChunkGroup(); } @@ -303,6 +359,11 @@ class MergeFileTask { break; } } + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return maxVersion; + } } return maxVersion; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java index 67caf2c..464ce44 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.PriorityQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -57,7 +58,7 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class MergeMultiChunkTask { +public class MergeMultiChunkTask { private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class); private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig() @@ -81,9 +82,11 @@ class MergeMultiChunkTask { private int concurrentMergeSeriesNum; private List<Path> currMergingPaths = new ArrayList<>(); - MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger, + private String storageGroupName; + + public MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger, MergeResource mergeResource, boolean fullMerge, List<Path> unmergedSeries, - int concurrentMergeSeriesNum) { + int concurrentMergeSeriesNum, String storageGroupName) { this.mergeContext = context; this.taskName = taskName; this.mergeLogger = mergeLogger; @@ -91,6 +94,7 @@ class MergeMultiChunkTask { this.fullMerge = fullMerge; this.unmergedSeries = unmergedSeries; this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; + this.storageGroupName = storageGroupName; } void mergeSeries() throws IOException { @@ -110,6 +114,11 @@ class MergeMultiChunkTask { currMergingPaths = pathSelector.next(); mergePaths(); resource.clearChunkWriterCache(); + if (Thread.interrupted()) { + logger.info("MergeMultiChunkTask {} aborted", taskName); + Thread.currentThread().interrupt(); + return; + } mergedSeriesCnt += currMergingPaths.size(); logMergeProgress(); } @@ -131,6 +140,10 @@ class MergeMultiChunkTask { } } + public String getProgress() { + return String.format("Processed %d/%d series", mergedSeriesCnt, unmergedSeries.size()); + } + private void mergePaths() throws IOException { mergeLogger.logTSStart(currMergingPaths); IPointReader[] unseqReaders; @@ -144,6 +157,11 @@ class MergeMultiChunkTask { for (int i = 0; i < resource.getSeqFiles().size(); i++) { pathsMergeOneFile(i, unseqReaders); + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return; + } } mergeLogger.logTSEnd(); } @@ -153,7 +171,7 @@ class MergeMultiChunkTask { TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx); String deviceId = currMergingPaths.get(0).getDevice(); long currDeviceMinTime = currTsFile.getStartTime(deviceId); - //COMMENTS: is this correct? how about if there are other devices (in the currMergingPaths) that have unseq data? + // all paths in one call are from the same device if (currDeviceMinTime == Long.MAX_VALUE) { return; } @@ -178,6 +196,11 @@ class MergeMultiChunkTask { modifications[i] = resource.getModifications(currTsFile, currMergingPaths.get(i)); seqChunkMeta[i] = resource.queryChunkMetadata(currMergingPaths.get(i), currTsFile); modifyChunkMetaData(seqChunkMeta[i], modifications[i]); + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return; + } } List<Integer> unskippedPathIndices = filterNoDataPaths(seqChunkMeta, seqFileIdx); @@ -245,22 +268,28 @@ class MergeMultiChunkTask { mergedChunkNum.set(0); unmergedChunkNum.set(0); - List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>(); for (int i = 0; i < mergeChunkSubTaskNum; i++) { - int finalI = i; - futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> { - mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens, - reader, - mergeFileWriter, unseqReaders, - currFile, - isLastFile); - return null; - })); + futures.add(MergeManager.getINSTANCE() + .submitChunkSubTask(new MergeChunkHeapTask(chunkIdxHeaps[i], + metaListEntries, ptWrittens, + reader, + mergeFileWriter, unseqReaders, + currFile, + isLastFile, i))); + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return false; + } } for (int i = 0; i < mergeChunkSubTaskNum; i++) { try { futures.get(i).get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (ExecutionException e) { throw new IOException(e); } } @@ -274,56 +303,7 @@ class MergeMultiChunkTask { return mergedChunkNum.get() > 0; } - private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap, MetaListEntry[] metaListEntries, - int[] ptWrittens, - TsFileSequenceReader reader, - RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders, - TsFileResource currFile, - boolean isLastFile) throws IOException { - while (!chunkIdxHeap.isEmpty()) { - int pathIdx = chunkIdxHeap.poll(); - Path path = currMergingPaths.get(pathIdx); - MeasurementSchema measurementSchema = resource.getSchema(path); - IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema); - if (metaListEntries[pathIdx] != null) { - MetaListEntry metaListEntry = metaListEntries[pathIdx]; - ChunkMetadata currMeta = metaListEntry.current(); - boolean isLastChunk = !metaListEntry.hasNext(); - boolean chunkOverflowed = MergeUtils - .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta); - boolean chunkTooSmall = MergeUtils - .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum); - - Chunk chunk; - synchronized (reader) { - chunk = reader.readMemChunk(currMeta); - } - ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk, - ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter, - currFile); - - if (!isLastChunk) { - metaListEntry.next(); - chunkIdxHeap.add(pathIdx); - continue; - } - } - // this only happens when the seqFiles do not contain this series, otherwise the remaining - // data will be merged with the last chunk in the seqFiles - if (isLastFile && currTimeValuePairs[pathIdx] != null) { - ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], - Long.MAX_VALUE, - pathIdx); - mergedChunkNum.incrementAndGet(); - } - // the last merged chunk may still be smaller than the threshold, flush it anyway - if (ptWrittens[pathIdx] > 0) { - synchronized (mergeFileWriter) { - chunkWriter.writeToFileWriter(mergeFileWriter); - } - } - } - } + /** * merge a sequence chunk SK @@ -338,6 +318,7 @@ class MergeMultiChunkTask { * 3. other cases: need to unCompress the chunk and write 3.1 SK isn't overflowed 3.2 SK is * overflowed */ + @SuppressWarnings("java:S2445") // avoid writing the same writer concurrently private int mergeChunkV2(ChunkMetadata currMeta, boolean chunkOverflowed, boolean chunkTooSmall, Chunk chunk, int lastUnclosedChunkPoint, int pathIdx, TsFileIOWriter mergeFileWriter, IPointReader unseqReader, @@ -379,7 +360,7 @@ class MergeMultiChunkTask { } // update points written statistics - mergeContext.incTotalPointWritten(unclosedChunkPoint - lastUnclosedChunkPoint); + mergeContext.incTotalPointWritten((long) unclosedChunkPoint - lastUnclosedChunkPoint); if (minChunkPointNum > 0 && unclosedChunkPoint >= minChunkPointNum || unclosedChunkPoint > 0 && minChunkPointNum < 0) { // the new chunk's size is large enough and it should be flushed @@ -445,4 +426,107 @@ class MergeMultiChunkTask { } return cnt; } + + public class MergeChunkHeapTask implements Callable<Void> { + + private PriorityQueue<Integer> chunkIdxHeap; + private MetaListEntry[] metaListEntries; + private int[] ptWrittens; + private TsFileSequenceReader reader; + private RestorableTsFileIOWriter mergeFileWriter; + private IPointReader[] unseqReaders; + private TsFileResource currFile; + private boolean isLastFile; + private int taskNum; + + private int totalSeriesNum; + + public MergeChunkHeapTask(PriorityQueue<Integer> chunkIdxHeap, + MetaListEntry[] metaListEntries, int[] ptWrittens, + TsFileSequenceReader reader, + RestorableTsFileIOWriter mergeFileWriter, + IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile, int taskNum) { + this.chunkIdxHeap = chunkIdxHeap; + this.metaListEntries = metaListEntries; + this.ptWrittens = ptWrittens; + this.reader = reader; + this.mergeFileWriter = mergeFileWriter; + this.unseqReaders = unseqReaders; + this.currFile = currFile; + this.isLastFile = isLastFile; + this.taskNum = taskNum; + this.totalSeriesNum = chunkIdxHeap.size(); + } + + @Override + public Void call() throws Exception { + mergeChunkHeap(); + return null; + } + + @SuppressWarnings("java:S2445") // avoid reading the same reader concurrently + private void mergeChunkHeap() throws IOException { + while (!chunkIdxHeap.isEmpty()) { + int pathIdx = chunkIdxHeap.poll(); + Path path = currMergingPaths.get(pathIdx); + MeasurementSchema measurementSchema = resource.getSchema(path); + IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema); + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return; + } + + if (metaListEntries[pathIdx] != null) { + MetaListEntry metaListEntry = metaListEntries[pathIdx]; + ChunkMetadata currMeta = metaListEntry.current(); + boolean isLastChunk = !metaListEntry.hasNext(); + boolean chunkOverflowed = MergeUtils + .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta); + boolean chunkTooSmall = MergeUtils + .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum); + + Chunk chunk; + synchronized (reader) { + chunk = reader.readMemChunk(currMeta); + } + ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk, + ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter, + currFile); + + if (!isLastChunk) { + metaListEntry.next(); + chunkIdxHeap.add(pathIdx); + continue; + } + } + // this only happens when the seqFiles do not contain this series, otherwise the remaining + // data will be merged with the last chunk in the seqFiles + if (isLastFile && currTimeValuePairs[pathIdx] != null) { + ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], + Long.MAX_VALUE, + pathIdx); + mergedChunkNum.incrementAndGet(); + } + // the last merged chunk may still be smaller than the threshold, flush it anyway + if (ptWrittens[pathIdx] > 0) { + synchronized (mergeFileWriter) { + chunkWriter.writeToFileWriter(mergeFileWriter); + } + } + } + } + + public String getStorageGroupName() { + return storageGroupName; + } + + public String getTaskName() { + return taskName + "_" + taskNum; + } + + public String getProgress() { + return String.format("Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(), + totalSeriesNum); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java index 6d1a6fc..68476aa 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java @@ -67,6 +67,11 @@ public class MergeTask implements Callable<Void> { String taskName; boolean fullMerge; + States states = States.START; + + MergeMultiChunkTask chunkTask; + MergeFileTask fileTask; + MergeTask(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, String storageGroupSysDir, MergeCallback callback, String taskName, boolean fullMerge, String storageGroupName) { @@ -96,15 +101,19 @@ public class MergeTask implements Callable<Void> { doMerge(); } catch (Exception e) { logger.error("Runtime exception in merge {}", taskName, e); - cleanUp(false); - // call the callback to make sure the StorageGroup exit merging status, but passing 2 - // empty file lists to avoid files being deleted. - callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME)); - throw e; + abort(); } return null; } + private void abort() throws IOException { + states = States.ABORTED; + cleanUp(false); + // call the callback to make sure the StorageGroup exit merging status, but passing 2 + // empty file lists to avoid files being deleted. + callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME)); + } + private void doMerge() throws IOException, MetadataException { if (logger.isInfoEnabled()) { logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName, @@ -132,14 +141,30 @@ public class MergeTask implements Callable<Void> { mergeLogger.logMergeStart(); - MergeMultiChunkTask mergeChunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource, - fullMerge, unmergedSeries, concurrentMergeSeriesNum); - mergeChunkTask.mergeSeries(); + chunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource, + fullMerge, unmergedSeries, concurrentMergeSeriesNum, storageGroupName); + states = States.MERGE_CHUNKS; + chunkTask.mergeSeries(); + if (Thread.interrupted()) { + logger.info("Merge task {} aborted", taskName); + abort(); + return; + } + - MergeFileTask mergeFileTask = new MergeFileTask(taskName, mergeContext, mergeLogger, resource, + fileTask = new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles()); - mergeFileTask.mergeFiles(); + states = States.MERGE_FILES; + chunkTask = null; + fileTask.mergeFiles(); + if (Thread.interrupted()) { + logger.info("Merge task {} aborted", taskName); + abort(); + return; + } + states = States.CLEAN_UP; + fileTask = null; cleanUp(true); if (logger.isInfoEnabled()) { double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0; @@ -183,4 +208,36 @@ public class MergeTask implements Callable<Void> { logFile.delete(); } } + + public String getStorageGroupName() { + return storageGroupName; + } + + enum States { + START, + MERGE_CHUNKS, + MERGE_FILES, + CLEAN_UP, + ABORTED + } + + public String getProgress() { + switch (states) { + case ABORTED: + return "Aborted"; + case CLEAN_UP: + return "Cleaning up"; + case MERGE_FILES: + return "Merging files: " + fileTask.getProgress(); + case MERGE_CHUNKS: + return "Merging series: " + chunkTask.getProgress(); + case START: + default: + return "Just started"; + } + } + + public String getTaskName() { + return taskName; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java index c0beb3d..9857d40 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java @@ -104,7 +104,7 @@ public class RecoverMergeTask extends MergeTask { } MergeMultiChunkTask mergeChunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource, - fullMerge, analyzer.getUnmergedPaths(), concurrentMergeSeriesNum); + fullMerge, analyzer.getUnmergedPaths(), concurrentMergeSeriesNum, storageGroupName); analyzer.setUnmergedPaths(null); mergeChunkTask.mergeSeries(); @@ -158,9 +158,8 @@ public class RecoverMergeTask extends MergeTask { long maxChunkNum = chunkNums[1]; long fileMetaSize = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile)); long newSingleSeriesSeqReadCost = fileMetaSize * maxChunkNum / totalChunkNum; - singleSeriesSeqReadCost = newSingleSeriesSeqReadCost > singleSeriesSeqReadCost ? - newSingleSeriesSeqReadCost : singleSeriesSeqReadCost; - maxSeqReadCost = fileMetaSize > maxSeqReadCost ? fileMetaSize : maxSeqReadCost; + singleSeriesSeqReadCost = Math.max(newSingleSeriesSeqReadCost, singleSeriesSeqReadCost); + maxSeqReadCost = Math.max(fileMetaSize, maxSeqReadCost); seqWriteCost += fileMetaSize; } @@ -169,10 +168,8 @@ public class RecoverMergeTask extends MergeTask { int ub = MaxSeriesMergeFileSelector.MAX_SERIES_NUM; int mid = (lb + ub) / 2; while (mid != lb) { - long unseqCost = singleSeriesUnseqCost * mid < maxUnseqCost ? singleSeriesUnseqCost * mid : - maxUnseqCost; - long seqReadCos = singleSeriesSeqReadCost * mid < maxSeqReadCost ? - singleSeriesSeqReadCost * mid : maxSeqReadCost; + long unseqCost = Math.min(singleSeriesUnseqCost * mid, maxUnseqCost); + long seqReadCos = Math.min(singleSeriesSeqReadCost * mid, maxSeqReadCost); long totalCost = unseqCost + seqReadCos + seqWriteCost; if (totalCost <= memBudget) { lb = mid; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java index 68031e9..3b31bbe 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java @@ -98,6 +98,8 @@ public class Planner { case FLUSH: case MERGE: case CLEAR_CACHE: + case NULL: + case SHOW_MERGE_STATUS: return operator; case QUERY: case UPDATE: diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java index 468f286..1acf1a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java @@ -150,6 +150,8 @@ public class SQLConstant { public static final int TOK_LOAD_CONFIGURATION_GLOBAL = 85; public static final int TOK_LOAD_CONFIGURATION_LOCAL = 86; + public static final int TOK_SHOW_MERGE_STATUS = 87; + public static final Map<Integer, String> tokenSymbol = new HashMap<>(); public static final Map<Integer, String> tokenNames = new HashMap<>(); public static final Map<Integer, Integer> reverseWords = new HashMap<>(); @@ -217,6 +219,8 @@ public class SQLConstant { tokenNames.put(TOK_LOAD_FILES, "TOK_LOAD_FILES"); tokenNames.put(TOK_REMOVE_FILE, "TOK_REMOVE_FILE"); tokenNames.put(TOK_MOVE_FILE, "TOK_MOVE_FILE"); + + tokenNames.put(TOK_SHOW_MERGE_STATUS, "TOK_SHOW_MERGE_STATUS"); } static { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index cce6140..f77ef78 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -18,15 +18,20 @@ */ package org.apache.iotdb.db.qp.executor; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION; @@ -48,6 +53,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; @@ -67,6 +73,8 @@ import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.ChunkMetadataCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager; +import org.apache.iotdb.db.engine.merge.manage.MergeManager; +import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.DeleteFailedException; @@ -349,6 +357,8 @@ public class PlanExecutor implements IPlanExecutor { return processCountNodeTimeSeries((CountPlan) showPlan); case COUNT_NODES: return processCountNodes((CountPlan) showPlan); + case MERGE_STATUS: + return processShowMergeStatus(); default: throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan)); } @@ -1493,4 +1503,43 @@ public class PlanExecutor implements IPlanExecutor { protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException { IoTDBDescriptor.getInstance().loadHotModifiedProps(); } + + private QueryDataSet processShowMergeStatus() { + List<Path> headerList = new ArrayList<>(); + List<TSDataType> typeList = new ArrayList<>(); + headerList.add(new Path(COLUMN_STORAGE_GROUP)); + headerList.add(new Path(COLUMN_TASK_NAME)); + headerList.add(new Path(COLUMN_CREATED_TIME)); + headerList.add(new Path(COLUMN_PROGRESS)); + headerList.add(new Path(COLUMN_CANCELLED)); + headerList.add(new Path(COLUMN_DONE)); + + typeList.add(TSDataType.TEXT); + typeList.add(TSDataType.TEXT); + typeList.add(TSDataType.TEXT); + typeList.add(TSDataType.TEXT); + typeList.add(TSDataType.BOOLEAN); + typeList.add(TSDataType.BOOLEAN); + ListDataSet dataSet = new ListDataSet(headerList, typeList); + Map<String, List<TaskStatus>>[] taskStatus = MergeManager.getINSTANCE().collectTaskStatus(); + for (Map<String, List<TaskStatus>> statusMap : taskStatus) { + for (Entry<String, List<TaskStatus>> stringListEntry : statusMap.entrySet()) { + for (TaskStatus status : stringListEntry.getValue()) { + dataSet.putRecord(toRowRecord(status, stringListEntry.getKey())); + } + } + } + return dataSet; + } + + public RowRecord toRowRecord(TaskStatus status, String storageGroup) { + RowRecord record = new RowRecord(0); + record.addField(new Binary(storageGroup), TSDataType.TEXT); + record.addField(new Binary(status.getTaskName()), TSDataType.TEXT); + record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT); + record.addField(new Binary(status.getProgress()), TSDataType.TEXT); + record.addField(status.isCancelled(), TSDataType.BOOLEAN); + record.addField(status.isDone(), TSDataType.BOOLEAN); + return record; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java index 0a849df..746326c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java @@ -75,6 +75,7 @@ public abstract class Operator { LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS, GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING, TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL, - ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE + ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE, + SHOW_MERGE_STATUS } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java new file mode 100644 index 0000000..d10cc2e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.qp.logical.sys; + +import org.apache.iotdb.db.qp.logical.RootOperator; + +public class ShowMergeStatusOperator extends RootOperator { + + public ShowMergeStatusOperator(int tokenIntType) { + super(tokenIntType); + setOperatorType(OperatorType.SHOW_MERGE_STATUS); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java new file mode 100644 index 0000000..51c2d52 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.qp.physical.sys; + +public class ShowMergeStatusPlan extends ShowPlan { + + public ShowMergeStatusPlan() { + super(ShowContentType.MERGE_STATUS); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java index ced1b80..bde812a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java @@ -50,7 +50,7 @@ public class ShowPlan extends PhysicalPlan { public enum ShowContentType { DYNAMIC_PARAMETER, FLUSH_TASK_INFO, TTL, VERSION, TIMESERIES, STORAGE_GROUP, CHILD_PATH, DEVICES, - COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES + COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES, MERGE_STATUS } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java index 43e4c4a..8048f86 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java @@ -65,6 +65,7 @@ import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator; import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator; import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator; import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator; +import org.apache.iotdb.db.qp.logical.sys.ShowMergeStatusOperator; import org.apache.iotdb.db.qp.logical.sys.ShowOperator; import org.apache.iotdb.db.qp.logical.sys.ShowTTLOperator; import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator; @@ -141,6 +142,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SetTTLStatementContext; import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowAllTTLStatementContext; import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowChildPathsContext; import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowDevicesContext; +import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowMergeStatusContext; import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowStorageGroupContext; import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTTLStatementContext; import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTimeseriesContext; @@ -1610,4 +1612,10 @@ public class LogicalGenerator extends SqlBaseBaseListener { String.format("encoding %s does not support %s", tsEncoding, tsDataType)); } } + + @Override + public void enterShowMergeStatus(ShowMergeStatusContext ctx) { + super.enterShowMergeStatus(ctx); + initializedOperator = new ShowMergeStatusOperator(SQLConstant.TOK_SHOW_MERGE_STATUS); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 644b99b..abd2d23 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -82,6 +82,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan; import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowMergeStatusPlan; import org.apache.iotdb.db.qp.physical.sys.ShowPlan; import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType; import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; @@ -271,6 +272,8 @@ public class PhysicalGenerator { OperatorType.MOVE_FILE); case CLEAR_CACHE: return new ClearCachePlan(); + case SHOW_MERGE_STATUS: + return new ShowMergeStatusPlan(); default: throw new LogicalOperatorException(operator.getType().toString(), ""); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java index f3edd9f..2ae9928 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -94,8 +94,8 @@ public class RPCService implements RPCServiceMBean, IService { @Override public void start() throws StartupException { - JMXService.registerMBean(getInstance(), mbeanName); - startService(); + JMXService.registerMBean(getInstance(), mbeanName); + startService(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index e1498bc..ad1569b 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -36,7 +36,7 @@ public enum ServiceType { FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""), SYNC_SERVICE("SYNC ServerService", ""), UPGRADE_SERVICE("UPGRADE DataService", ""), - MERGE_SERVICE("Merge Manager", ""), + MERGE_SERVICE("Merge Manager", "Merge Manager"), PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"), MANAGE_DYNAMIC_PARAMETERS_SERVICE("Manage Dynamic Parameters", "Manage Dynamic Parameters"), TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""), diff --git a/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java b/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java index 26fe5f8..6b0a0c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java +++ b/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java @@ -19,15 +19,20 @@ package org.apache.iotdb.db.service; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION; import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE; @@ -123,6 +128,13 @@ class StaticResps { Arrays.asList(TSDataType.TEXT.toString(), TSDataType.TEXT.toString()), false ); + static final TSExecuteStatementResp MERGE_STATUS_RESP = getNoTimeExecuteResp( + Arrays.asList(COLUMN_STORAGE_GROUP, COLUMN_TASK_NAME, COLUMN_CREATED_TIME, COLUMN_PROGRESS, + COLUMN_CANCELLED, COLUMN_DONE), + Arrays.asList(TSDataType.TEXT.toString(), TSDataType.TEXT.toString(), + TSDataType.TEXT.toString(), + TSDataType.TEXT.toString(), TSDataType.BOOLEAN.toString(), TSDataType.BOOLEAN.toString())); + private static TSExecuteStatementResp getNoTimeExecuteResp(List<String> columns, List<String> dataTypes) { return getExecuteResp(columns, dataTypes, true); diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 7ae24a9..fe79492 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -633,6 +633,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return StaticResps.COUNT_NODES; case COUNT_TIMESERIES: return StaticResps.COUNT_TIMESERIES; + case MERGE_STATUS: + return StaticResps.MERGE_STATUS_RESP; default: logger.error("Unsupported show content type: {}", showPlan.getShowContentType()); throw new QueryProcessException( diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java new file mode 100644 index 0000000..abe7660 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.PriorityQueue; +import org.apache.iotdb.db.engine.merge.manage.MergeManager; +import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.junit.Test; + +public class MergeManagerTest extends MergeTest { + + @Test + public void testGenMergeReport() { + FakedMergeMultiChunkTask chunkTask = new FakedMergeMultiChunkTask(); + for (int i = 0; i < 5; i++) { + MergeManager.getINSTANCE().submitMainTask(new FakedMainMergeTask(i)); + MergeManager.getINSTANCE().submitChunkSubTask(chunkTask.createSubTask(i)); + } + + String report = MergeManager.getINSTANCE().genMergeTaskReport(); + checkReport(report); + } + + @Test + public void testAbortMerge() { + FakedMergeMultiChunkTask chunkTask = new FakedMergeMultiChunkTask(); + for (int i = 0; i < 5; i++) { + MergeManager.getINSTANCE().submitMainTask(new FakedMainMergeTask(i)); + MergeManager.getINSTANCE().submitChunkSubTask(chunkTask.createSubTask(i)); + } + + MergeManager.getINSTANCE().abortMerge("non-exist"); + String report = MergeManager.getINSTANCE().genMergeTaskReport(); + + checkReport(report); + + MergeManager.getINSTANCE().abortMerge("test"); + report = MergeManager.getINSTANCE().genMergeTaskReport(); + assertEquals(String.format("Main tasks:%n" + + "Sub tasks:%n"), report); + } + + private void checkReport(String report) { + String[] split = report.split(System.lineSeparator()); + assertEquals("Main tasks:", split[0]); + assertEquals("\tStorage group: test", split[1]); + for (int i = 0; i < 5; i++) { + assertTrue(split[2 + i].contains("task" + i)); + assertTrue(split[2 + i].contains("0,")); + assertTrue(split[2 + i].contains("done:false")); + assertTrue(split[2 + i].contains("cancelled:false")); + } + assertEquals("Sub tasks:", split[7]); + assertEquals("\tStorage group: test", split[8]); + for (int i = 0; i < 5; i++) { + assertTrue(split[9 + i].contains("task" + i)); + assertTrue(split[9 + i].contains("0,")); + assertTrue(split[9 + i].contains("done:false")); + assertTrue(split[9 + i].contains("cancelled:false")); + } + } + + static class FakedMainMergeTask extends MergeTask { + + private int serialNum; + private String progress = "0"; + + public FakedMainMergeTask(int serialNum) { + super(null, null, null, null, false, + 0, + null); + this.serialNum = serialNum; + } + + @Override + public Void call() { + while (!Thread.currentThread().isInterrupted()) { + // wait until interrupt + } + progress = "1"; + return null; + } + + @Override + public String getStorageGroupName() { + return "test"; + } + + @Override + public String getProgress() { + return progress; + } + + @Override + public String getTaskName() { + return "task" + serialNum; + } + } + + static class FakedMergeMultiChunkTask extends MergeMultiChunkTask { + + public FakedMergeMultiChunkTask() { + super(null, null, null, null, false, null, + 0, null); + } + + public MergeChunkHeapTask createSubTask(int serialNum) { + return new FakedSubMergeTask(serialNum); + } + + class FakedSubMergeTask extends MergeChunkHeapTask { + + private int serialNum; + private String progress = "0"; + + public FakedSubMergeTask(int serialNum) { + super(new PriorityQueue<>(), null, null, null, null, null, null, false, serialNum); + this.serialNum = serialNum; + } + + @Override + public Void call() { + while (!Thread.currentThread().isInterrupted()) { + // wait until interrupt + } + progress = "1"; + return null; + } + + @Override + public String getStorageGroupName() { + return "test"; + } + + @Override + public String getProgress() { + return progress; + } + + @Override + public String getTaskName() { + return "task" + serialNum; + } + } + } + +} diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java index 35d402f..e2313b4 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java @@ -28,7 +28,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.junit.After; @@ -169,7 +168,7 @@ public class IoTDBMergeTest { } @Test - public void testCrossPartition() throws SQLException, StorageEngineException { + public void testCrossPartition() throws SQLException { logger.info("testCrossPartition..."); try (Connection connection = DriverManager .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); @@ -229,4 +228,48 @@ public class IoTDBMergeTest { assertEquals(10000, cnt); } } + + @Test + public void testShowMergeStatus() throws SQLException { + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + statement.execute("SET STORAGE GROUP TO root.mergeTest"); + for (int i = 1; i <= 3; i++) { + try { + statement.execute("CREATE TIMESERIES root.mergeTest.s" + i + " WITH DATATYPE=INT64," + + "ENCODING=PLAIN"); + } catch (SQLException e) { + // ignore + } + } + + for (int j = 1; j <= 10; j++) { + statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + + "%d,%d)", j, j+1, j+2, j+3)); + } + statement.execute("FLUSH"); + for (int j = 1; j <= 10; j++) { + statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + + "%d,%d)", j, j+10, j+20, j+30)); + } + statement.execute("FLUSH"); + statement.execute("MERGE"); + + int cnt; + try (ResultSet resultSet = statement.executeQuery("SHOW MERGE STATUS")) { + cnt = 0; + int colNum = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < colNum; i++) { + stringBuilder.append(resultSet.getString(i + 1)).append(","); + } + System.out.println(stringBuilder.toString()); + cnt++; + } + } + assertEquals(1, cnt); + } + } }
