This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 32a9c6a865 [To rel/0.13][IOTDB-4602] Add status judgment and filter
condition for data archiving (#7565)
32a9c6a865 is described below
commit 32a9c6a86576402a52a7d231e87b35e01331f6e2
Author: Alan Choo <[email protected]>
AuthorDate: Wed Oct 12 09:50:05 2022 +0800
[To rel/0.13][IOTDB-4602] Add status judgment and filter condition for data
archiving (#7565)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 8 +---
docs/UserGuide/Process-Data/Archiving.md | 6 ++-
docs/zh/UserGuide/Process-Data/Archiving.md | 11 ++++--
.../iotdb/db/integration/IoTDBArchivingIT.java | 8 ++--
.../org/apache/iotdb/db/engine/StorageEngine.java | 9 +++--
.../db/engine/archiving/ArchivingManager.java | 8 ++--
.../iotdb/db/engine/archiving/ArchivingTask.java | 10 ++++-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 46 ++++++++++++++--------
.../db/qp/logical/sys/ShowArchivingOperator.java | 9 +++--
.../db/qp/physical/sys/ShowArchivingPlan.java | 11 +++++-
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 17 +++-----
.../iotdb/db/engine/archiving/ArchivingTest.java | 2 +-
12 files changed, 86 insertions(+), 59 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 5720a95b0f..7996345033 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -47,7 +47,6 @@ ddlStatement
| showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
| countStorageGroup | countDevices | countTimeseries | countNodes
| setArchiving | cancelArchiving | pauseArchiving | resumeArchiving |
showArchiving
- | showAllArchiving
;
dmlStatement
@@ -362,12 +361,7 @@ resumeArchiving
// Show Archiving
showArchiving
- : SHOW ARCHIVING ON prefixPath (COMMA prefixPath)*
- ;
-
-// Show All Archiving
-showAllArchiving
- : SHOW ALL ARCHIVING
+ : SHOW ALL? ARCHIVING (ON prefixPath (COMMA prefixPath)*)?
;
diff --git a/docs/UserGuide/Process-Data/Archiving.md
b/docs/UserGuide/Process-Data/Archiving.md
index bb2409b41e..e89cadd840 100644
--- a/docs/UserGuide/Process-Data/Archiving.md
+++ b/docs/UserGuide/Process-Data/Archiving.md
@@ -35,12 +35,14 @@ Show the data archiving tasks.
#### Syntax
```sql
+SHOW ARCHIVING
SHOW ALL ARCHIVING
SHOW ARCHIVING ON <storage_group>
+SHOW ALL ARCHIVING ON <storage_group>
```
- `<storage_group>` specifies the storage group to show archiving task on.
-
+- `all` By default, only tasks in the READY, RUNNING, and PAUSED states are
returned. You can view tasks in other states by adding the all parameter.
#### Example Result
```sql
@@ -106,7 +108,7 @@ CANCEL ARCHIVING ON root.ln
### Pause Archiving Task
-Suspend the data migration task, run the `RESUME` command to resume the task.
+Suspend the data migration task.
#### Syntax
diff --git a/docs/zh/UserGuide/Process-Data/Archiving.md
b/docs/zh/UserGuide/Process-Data/Archiving.md
index e5eaecce1a..86ad558a4e 100644
--- a/docs/zh/UserGuide/Process-Data/Archiving.md
+++ b/docs/zh/UserGuide/Process-Data/Archiving.md
@@ -21,7 +21,7 @@
# 数据归档
-数据归档功能提供 5 个 Cli 命令:包括查看、提交、删除、暂停和继续归档任务。
+数据归档功能提供 5 个 Cli 命令:包括查看、提交、取消、暂停和继续归档任务。
用户可以创建归档任务,这些归档任务由用户指定的的启动时间,并归档过期数据到用户指定的目录。
## SQL 语句
@@ -33,11 +33,14 @@
#### 语法
```sql
+SHOW ARCHIVING
SHOW ALL ARCHIVING
SHOW ARCHIVING ON <storage_group>
+SHOW ALL ARCHIVING ON <storage_group>
```
- `<storage_group>` 返回指定存储组上的任务参数以及状态。
+- `all` 默认只返回处于 READY、RUNNING、PAUSED 状态的任务,可以通过添加 all 参数查看其他状态的任务
#### 结果示例
@@ -78,9 +81,9 @@ SET ARCHIVING TO root.ln 2023-01-01 360000 "/tmp"
- 开始时间使用 ISO 8601 格式,因此可以省略时/分/秒等信息,省略后默认设成 0。
- 可以提交全部存储组的归档任务,使用类似 `root.ln.**`。
-### 删除数据归档任务
+### 取消数据归档任务
-停止并删除数据归档任务。(注意:已经被归档的数据不会被放回数据库中)
+停止并取消数据归档任务。(注意:已经被归档的数据不会被放回数据库中)
#### 语法
@@ -101,7 +104,7 @@ CANCEL ARCHIVING ON root.ln
### 暂停数据归档任务
-将数据归档任务挂起,可通过 `RESUME` 命令让任务重新执行。
+将正在运行的数据归档任务挂起。
#### 语法
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
index 1b01376a14..4963267f7d 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
@@ -81,17 +81,17 @@ public class IoTDBArchivingIT {
try {
statement.execute("CANCEL ARCHIVING ON root.ARCHIVING_SG1");
} catch (SQLException e) {
- assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(),
e.getErrorCode());
+ assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(),
e.getErrorCode());
}
try {
statement.execute("PAUSE ARCHIVING ON root.ARCHIVING_SG1");
} catch (SQLException e) {
- assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(),
e.getErrorCode());
+ assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(),
e.getErrorCode());
}
try {
statement.execute("RESUME ARCHIVING ON root.ARCHIVING_SG1");
} catch (SQLException e) {
- assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(),
e.getErrorCode());
+ assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(),
e.getErrorCode());
}
statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG1");
@@ -229,7 +229,7 @@ public class IoTDBArchivingIT {
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
statement.execute(
- "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 0 '" +
testTargetDir.getPath() + "'");
+ "SET ARCHIVING TO root.ARCHIVING_SG1 3000-01-01 0 '" +
testTargetDir.getPath() + "'");
statement.execute("CANCEL ARCHIVING ON root.ARCHIVING_SG1");
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 4c2226a4ea..df3f1f5428 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -1337,14 +1337,15 @@ public class StorageEngine implements IService {
}
}
- public void operateArchiving(
+ public boolean operateArchiving(
ArchivingOperate.ArchivingOperateType operateType, long taskId,
PartialPath storageGroup) {
if (taskId >= 0) {
- archivingManager.operate(operateType, taskId);
+ return archivingManager.operate(operateType, taskId);
} else if (storageGroup != null) {
- archivingManager.operate(operateType, storageGroup);
+ return archivingManager.operate(operateType, storageGroup);
} else {
- logger.error("{} archiving cannot recognize taskId or storagegroup",
operateType.name());
+ logger.error("{} archiving cannot recognize taskId or storage group",
operateType.name());
+ return false;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
index 4e9b02b4fd..14b79ed1f0 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
@@ -216,7 +216,7 @@ public class ArchivingManager {
&& archivingTask.getTargetDir().equals(targetDir)
&& archivingTask.getTTL() == ttl
&& archivingTask.getStartTime() == startTime) {
- logger.info("archiving task already equals archiving task {}",
archivingTask.getTaskId());
+ logger.warn("archiving task already equals archiving task {}",
archivingTask.getTaskId());
return false;
}
}
@@ -332,12 +332,16 @@ public class ArchivingManager {
// can cancel/pause only when status=READY/RUNNING
if (!(task.getStatus() == ArchivingTask.ArchivingTaskStatus.READY
|| task.getStatus() == ArchivingTask.ArchivingTaskStatus.RUNNING))
{
+ logger.warn(
+ "Cannot cancel or pause archiving task when it's in the {}
status.",
+ task.getStatus());
return false;
}
break;
case RESUME:
// can resume only when status=PAUSED
if (!(task.getStatus() == ArchivingTask.ArchivingTaskStatus.PAUSED)) {
+ logger.warn("Cannot resume archiving task when it's in the {}
status.", task.getStatus());
return false;
}
break;
@@ -367,7 +371,6 @@ public class ArchivingManager {
try {
lock.lock();
- logger.info("checking archivingTasks");
for (ArchivingTask task : archivingTasks) {
if (task.getStartTime() - DatetimeUtils.currentTime() <= 0
@@ -398,7 +401,6 @@ public class ArchivingManager {
.getProcessorMap()
.get(task.getStorageGroup())
.checkArchivingTask(task);
- logger.info("check archiving task successfully.");
// set state and remove
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
index 4973e3bbb1..3056a5250b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
@@ -178,12 +178,18 @@ public class ArchivingTask {
return startTime;
}
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
public ArchivingTaskStatus getStatus() {
return status;
}
- public long getSubmitTime() {
- return submitTime;
+ public boolean isActive() {
+ return status == ArchivingTaskStatus.READY
+ || status == ArchivingTaskStatus.RUNNING
+ || status == ArchivingTaskStatus.PAUSED;
}
public void setStatus(ArchivingTaskStatus status) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 12e0992469..c7dc9fedb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1311,6 +1311,10 @@ public class PlanExecutor implements IPlanExecutor {
if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
continue;
}
+ // show inactive tasks only when show all
+ if (!showArchivingPlan.isShowAll() && !task.isActive()) {
+ continue;
+ }
RowRecord rowRecord = new RowRecord(timestamp++);
Field taskId = new Field(TSDataType.INT64);
Field submitTime = new Field(TSDataType.TEXT);
@@ -1699,11 +1703,15 @@ public class PlanExecutor implements IPlanExecutor {
private void operateSetArchiving(SetArchivingPlan plan) throws
QueryProcessException {
if (plan.getTargetDir() == null) {
// is cancel plan
- StorageEngine.getInstance()
- .operateArchiving(
- ArchivingOperate.ArchivingOperateType.CANCEL,
- plan.getTaskId(),
- plan.getStorageGroup());
+ boolean success =
+ StorageEngine.getInstance()
+ .operateArchiving(
+ ArchivingOperate.ArchivingOperateType.CANCEL,
+ plan.getTaskId(),
+ plan.getStorageGroup());
+ if (!success) {
+ throw new QueryProcessException("Fail to cancel archiving task.");
+ }
} else {
try {
List<PartialPath> storageGroupPaths =
@@ -1718,19 +1726,25 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- private void operatePauseArchiving(PauseArchivingPlan plan) {
+ private void operatePauseArchiving(PauseArchivingPlan plan) throws
QueryProcessException {
+ boolean success;
if (plan.isPause()) {
- StorageEngine.getInstance()
- .operateArchiving(
- ArchivingOperate.ArchivingOperateType.PAUSE,
- plan.getTaskId(),
- plan.getStorageGroup());
+ success =
+ StorageEngine.getInstance()
+ .operateArchiving(
+ ArchivingOperate.ArchivingOperateType.PAUSE,
+ plan.getTaskId(),
+ plan.getStorageGroup());
} else {
- StorageEngine.getInstance()
- .operateArchiving(
- ArchivingOperate.ArchivingOperateType.RESUME,
- plan.getTaskId(),
- plan.getStorageGroup());
+ success =
+ StorageEngine.getInstance()
+ .operateArchiving(
+ ArchivingOperate.ArchivingOperateType.RESUME,
+ plan.getTaskId(),
+ plan.getStorageGroup());
+ }
+ if (!success) {
+ throw new QueryProcessException("Fail to operate archiving task.");
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
index 575be7c063..50006eb8f8 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
@@ -28,11 +28,14 @@ import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import java.util.List;
public class ShowArchivingOperator extends ShowOperator {
- private List<PartialPath> storageGroups;
+ private final List<PartialPath> storageGroups;
- public ShowArchivingOperator(List<PartialPath> storageGroups) {
+ private final boolean showAll;
+
+ public ShowArchivingOperator(List<PartialPath> storageGroups, boolean
showAll) {
super(SQLConstant.TOK_SHOW, OperatorType.SHOW_ARCHIVING);
this.storageGroups = storageGroups;
+ this.showAll = showAll;
}
public List<PartialPath> getStorageGroups() {
@@ -41,6 +44,6 @@ public class ShowArchivingOperator extends ShowOperator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator) {
- return new ShowArchivingPlan(storageGroups);
+ return new ShowArchivingPlan(storageGroups, showAll);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
index 1b7cd70549..f03992801e 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
@@ -23,11 +23,14 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import java.util.List;
public class ShowArchivingPlan extends ShowPlan {
- private List<PartialPath> storageGroups;
+ private final List<PartialPath> storageGroups;
- public ShowArchivingPlan(List<PartialPath> storageGroups) {
+ private final boolean showAll;
+
+ public ShowArchivingPlan(List<PartialPath> storageGroups, boolean showAll) {
super(ShowContentType.SHOW_ARCHIVING);
this.storageGroups = storageGroups;
+ this.showAll = showAll;
}
@Override
@@ -38,4 +41,8 @@ public class ShowArchivingPlan extends ShowPlan {
public List<PartialPath> getStorageGroups() {
return storageGroups;
}
+
+ public boolean isShowAll() {
+ return showAll;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 47dd2c1a77..8e74c9042f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -893,18 +893,13 @@ public class IoTDBSqlVisitor extends
IoTDBSqlParserBaseVisitor<Operator> {
@Override
public Operator visitShowArchiving(IoTDBSqlParser.ShowArchivingContext ctx) {
List<PartialPath> storageGroups = new ArrayList<>();
- List<IoTDBSqlParser.PrefixPathContext> prefixPathList = ctx.prefixPath();
- for (IoTDBSqlParser.PrefixPathContext prefixPath : prefixPathList) {
- storageGroups.add(parsePrefixPath(prefixPath));
+ if (ctx.ON() != null) {
+ List<IoTDBSqlParser.PrefixPathContext> prefixPathList = ctx.prefixPath();
+ for (IoTDBSqlParser.PrefixPathContext prefixPath : prefixPathList) {
+ storageGroups.add(parsePrefixPath(prefixPath));
+ }
}
- return new ShowArchivingOperator(storageGroups);
- }
-
- // Show All Archiving
- @Override
- public Operator visitShowAllArchiving(IoTDBSqlParser.ShowAllArchivingContext
ctx) {
- List<PartialPath> storageGroups = new ArrayList<>();
- return new ShowArchivingOperator(storageGroups);
+ return new ShowArchivingOperator(storageGroups, ctx.ALL() != null);
}
// Start Trigger
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
index 7a6a5eb626..a5fc577dd9 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
@@ -334,7 +334,7 @@ public class ArchivingTest {
ArchivingManager archivingManager = ArchivingManager.getInstance();
archivingManager.setArchiving(new PartialPath(sg1), targetDir, ttl,
startTime);
- ShowArchivingPlan plan = new ShowArchivingPlan(Collections.emptyList());
+ ShowArchivingPlan plan = new ShowArchivingPlan(Collections.emptyList(),
true);
PlanExecutor executor = new PlanExecutor();
QueryDataSet queryDataSet = executor.processQuery(plan,
EnvironmentUtils.TEST_QUERY_CONTEXT);