This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 394805b2c7 [Feature][Metrics] Tag workflow related metrics with
process definition code (workflow id) (#13640)
394805b2c7 is described below
commit 394805b2c737e7cebf8e9718bc64093ede9513e6
Author: Eric Gao <[email protected]>
AuthorDate: Thu Mar 9 11:30:21 2023 +0800
[Feature][Metrics] Tag workflow related metrics with process definition
code (workflow id) (#13640)
* Tag workflow related metrics with process definition code (workflow id)
* Clean up related metrics when deleting workflow definition
* Add license headers
* Update related UT cases
* Add an example in grafana-demo
* Add related docs
---
docs/docs/en/guide/metrics/metrics.md | 3 +-
docs/docs/zh/guide/metrics/metrics.md | 2 +-
.../api/service/MetricsCleanUpService.java | 24 ++
.../service/impl/MetricsCleanUpServiceImpl.java | 62 ++++
.../service/impl/ProcessDefinitionServiceImpl.java | 6 +
.../api/service/ProcessDefinitionServiceTest.java | 10 +-
.../master/event/WorkflowStartEventHandler.java | 3 +-
.../master/event/WorkflowStateEventHandler.java | 13 +-
.../event/WorkflowTimeoutStateEventHandler.java | 5 +-
.../master/metrics/ProcessInstanceMetrics.java | 38 ++-
.../processor/WorkflowMetricsCleanUpProcessor.java | 47 +++
.../server/master/rpc/MasterRPCServer.java | 6 +
.../master/service/MasterFailoverService.java | 3 +-
.../server/master/service/FailoverServiceTest.java | 1 +
.../resources/grafana/DolphinSchedulerMaster.json | 320 +++++++++++++++++----
.../remote/command/CommandType.java | 4 +-
.../command/WorkflowMetricsCleanUpCommand.java | 44 +++
17 files changed, 507 insertions(+), 84 deletions(-)
diff --git a/docs/docs/en/guide/metrics/metrics.md
b/docs/docs/en/guide/metrics/metrics.md
index fa0f07bf3f..951cccc4ac 100644
--- a/docs/docs/en/guide/metrics/metrics.md
+++ b/docs/docs/en/guide/metrics/metrics.md
@@ -83,7 +83,8 @@ For example, you can get the master metrics by `curl
http://localhost:5679/actua
- ds.workflow.create.command.count: (counter) the number of commands created
and inserted by workflows
- ds.workflow.instance.submit.count: (counter) the number of submitted
workflow instances
- ds.workflow.instance.running: (gauge) the number of running workflow
instances
-- ds.workflow.instance.count: (counter) the number of workflow instances,
sliced by the tag `state`:
+- ds.workflow.instance.count: (counter) the number of workflow instances,
sliced by tags `process.definition.code` and `state`. To monitor a specific
workflow, you could filter the metrics by tag `process.definition.code`, which
refers to the definition code of your workflow. There are seven different
states for workflow instances as follows:
+ - submit: the number of submitted workflow instances
- timeout: the number of timeout workflow instances
- finish: the number of finished workflow instances, both successes and
failures included
- success: the number of successful workflow instances
diff --git a/docs/docs/zh/guide/metrics/metrics.md
b/docs/docs/zh/guide/metrics/metrics.md
index 4868ba7686..03b601f066 100644
--- a/docs/docs/zh/guide/metrics/metrics.md
+++ b/docs/docs/zh/guide/metrics/metrics.md
@@ -83,7 +83,7 @@ metrics exporter端口`server.port`是在application.yaml里定义的:
master: `
- ds.workflow.create.command.count: (counter) 工作量创建并插入的命令数量
- ds.workflow.instance.running: (gauge) 正在运行的工作流实例数量
-- ds.workflow.instance.count: (counter) 工作流实例数量,由tag `state`按状态切分:
+- ds.workflow.instance.count: (counter) 工作流实例数量,由tag `process.definition.code`
和 `state` 切分。您可以通过 `process.definition.code` 这个tag筛选出和某个workflow相关的指标,这里的
`process.definition.code` 指的是您工作流定义的编号代码。工作流实例有如下七种状态:
- submit:已提交的工作量实例数量
- timeout:运行超时的工作流实例数量
- finish:已完成的工作流实例数量,包含成功和失败
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MetricsCleanUpService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MetricsCleanUpService.java
new file mode 100644
index 0000000000..7878f91981
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MetricsCleanUpService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service;
+
+public interface MetricsCleanUpService {
+
+ void cleanUpWorkflowMetricsByDefinitionCode(String workflowDefinitionCode);
+
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
new file mode 100644
index 0000000000..abea381251
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.api.service.MetricsCleanUpService;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import
org.apache.dolphinscheduler.remote.command.WorkflowMetricsCleanUpCommand;
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class MetricsCleanUpServiceImpl implements MetricsCleanUpService {
+
+ @Autowired
+ private ApiRpcClient apiRpcClient;
+
+ @Autowired
+ private RegistryClient registryClient;
+
+ @Override
+ public void cleanUpWorkflowMetricsByDefinitionCode(String
workflowDefinitionCode) {
+ WorkflowMetricsCleanUpCommand workflowMetricsCleanUpCommand = new
WorkflowMetricsCleanUpCommand();
+
workflowMetricsCleanUpCommand.setProcessDefinitionCode(workflowDefinitionCode);
+ List<Server> masterNodeList =
registryClient.getServerList(NodeType.MASTER);
+ for (Server server : masterNodeList) {
+ try {
+ final String host = String.format("%s:%s", server.getHost(),
server.getPort());
+ apiRpcClient.send(Host.of(host),
workflowMetricsCleanUpCommand.convert2Command());
+ } catch (Exception e) {
+ log.error(
+ "Fail to clean up workflow related metrics on {} when
deleting workflow definition {}, error message {}",
+ server.getHost(), workflowDefinitionCode,
e.getMessage());
+ }
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index b8def57492..fde462e9e9 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -54,6 +54,7 @@ import
org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.MetricsCleanUpService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -252,6 +253,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Autowired
private WorkFlowLineageService workFlowLineageService;
+ @Autowired
+ private MetricsCleanUpService metricsCleanUpService;
+
/**
* create process definition
*
@@ -1033,6 +1037,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
for (ProcessDefinition process : processDefinitionList) {
try {
this.deleteProcessDefinitionByCode(loginUser,
process.getCode());
+
metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(process.getCode()));
} catch (Exception e) {
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_ERROR,
process.getName(), e.getMessage());
}
@@ -1117,6 +1122,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
// we delete the workflow definition at last to avoid using
transaction here.
// If delete error, we can call this interface again.
processDefinitionDao.deleteByWorkflowDefinitionCode(processDefinition.getCode());
+
metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(code));
log.info("Success delete workflow definition workflowDefinitionCode:
{}", code);
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 1d59ba8c58..d1c493a64b 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -30,6 +30,7 @@ import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT;
import static
org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.times;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest;
@@ -175,6 +176,9 @@ public class ProcessDefinitionServiceTest extends
BaseServiceTestTool {
@Mock
private WorkFlowLineageService workFlowLineageService;
+ @Mock
+ private MetricsCleanUpService metricsCleanUpService;
+
@Mock
private TaskDefinitionService taskDefinitionService;
@@ -478,6 +482,7 @@ public class ProcessDefinitionServiceTest extends
BaseServiceTestTool {
@Test
public void deleteProcessDefinitionByCodeTest() {
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
+
Mockito.doNothing().when(metricsCleanUpService).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(46L));
Project project = getProject(projectCode);
@@ -525,6 +530,7 @@ public class ProcessDefinitionServiceTest extends
BaseServiceTestTool {
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode()))
.thenReturn(Collections.emptySet());
processDefinitionService.deleteProcessDefinitionByCode(user, 46L);
+ Mockito.verify(metricsCleanUpService,
times(1)).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(46L));
// scheduler online
Schedule schedule = getSchedule();
@@ -551,7 +557,7 @@ public class ProcessDefinitionServiceTest extends
BaseServiceTestTool {
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
processDefinition.getCode()))
.thenReturn(Collections.emptySet());
Assertions.assertDoesNotThrow(() ->
processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
-
+ Mockito.verify(metricsCleanUpService,
times(2)).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(46L));
}
@Test
@@ -600,9 +606,11 @@ public class ProcessDefinitionServiceTest extends
BaseServiceTestTool {
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(),
process.getCode()))
.thenReturn(Collections.emptySet());
putMsg(result, Status.SUCCESS, projectCode);
+
Mockito.doNothing().when(metricsCleanUpService).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(11L));
Map<String, Object> deleteSuccess =
processDefinitionService.batchDeleteProcessDefinitionByCodes(user, projectCode,
singleCodes);
Assertions.assertEquals(Status.SUCCESS,
deleteSuccess.get(Constants.STATUS));
+ Mockito.verify(metricsCleanUpService,
times(2)).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(11L));
}
@Test
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
index 83af487bfe..e08e33a8d2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
@@ -59,8 +59,9 @@ public class WorkflowStartEventHandler implements
WorkflowEventHandler {
throw new WorkflowEventHandleError(
"The workflow start event is invalid, cannot find the
workflow instance from cache");
}
- ProcessInstanceMetrics.incProcessInstanceByState("submit");
ProcessInstance processInstance =
workflowExecuteRunnable.getProcessInstance();
+
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit",
+ processInstance.getProcessDefinitionCode().toString());
CompletableFuture.supplyAsync(workflowExecuteRunnable::call,
workflowExecuteThreadPool)
.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
index ba92d5b33a..11ad217d81 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
@@ -35,9 +35,9 @@ public class WorkflowStateEventHandler implements
StateEventHandler {
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable,
StateEvent stateEvent) throws
StateEventHandleException {
WorkflowStateEvent workflowStateEvent = (WorkflowStateEvent)
stateEvent;
- measureProcessState(workflowStateEvent);
ProcessInstance processInstance =
workflowExecuteRunnable.getProcessInstance();
ProcessDefinition processDefinition =
processInstance.getProcessDefinition();
+ measureProcessState(workflowStateEvent,
processInstance.getProcessDefinitionCode().toString());
log.info(
"Handle workflow instance state event, the current workflow
instance state {} will be changed to {}",
@@ -74,19 +74,20 @@ public class WorkflowStateEventHandler implements
StateEventHandler {
return StateEventType.PROCESS_STATE_CHANGE;
}
- private void measureProcessState(WorkflowStateEvent processStateEvent) {
+ private void measureProcessState(WorkflowStateEvent processStateEvent,
String processDefinitionCode) {
if (processStateEvent.getStatus().isFinished()) {
- ProcessInstanceMetrics.incProcessInstanceByState("finish");
+
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("finish",
processDefinitionCode);
}
switch (processStateEvent.getStatus()) {
case STOP:
- ProcessInstanceMetrics.incProcessInstanceByState("stop");
+
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("stop",
processDefinitionCode);
break;
case SUCCESS:
- ProcessInstanceMetrics.incProcessInstanceByState("success");
+
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("success",
+ processDefinitionCode);
break;
case FAILURE:
- ProcessInstanceMetrics.incProcessInstanceByState("fail");
+
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("fail",
processDefinitionCode);
break;
default:
break;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
index e7bc579ea4..3d280673a6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -32,7 +33,9 @@ public class WorkflowTimeoutStateEventHandler implements
StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable, StateEvent stateEvent) {
log.info("Handle workflow instance timeout event");
- ProcessInstanceMetrics.incProcessInstanceByState("timeout");
+ ProcessInstance processInstance =
workflowExecuteRunnable.getProcessInstance();
+
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("timeout",
+ processInstance.getProcessDefinitionCode().toString());
workflowExecuteRunnable.processTimeout();
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
index d610778424..dc1341db1d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
@@ -17,13 +17,12 @@
package org.apache.dolphinscheduler.server.master.metrics;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
import com.google.common.collect.ImmutableSet;
@@ -33,21 +32,18 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
@UtilityClass
+@Slf4j
public class ProcessInstanceMetrics {
- private final Map<String, Counter> processInstanceCounters = new
HashMap<>();
-
private final Set<String> processInstanceStates = ImmutableSet.of(
"submit", "timeout", "finish", "failover", "success", "fail",
"stop");
static {
for (final String state : processInstanceStates) {
- processInstanceCounters.put(
- state,
- Counter.builder("ds.workflow.instance.count")
- .tag("state", state)
- .description(String.format("Process instance %s
total count", state))
- .register(Metrics.globalRegistry));
+ Counter.builder("ds.workflow.instance.count")
+ .tags("state", state, "process.definition.code", "dummy")
+ .description(String.format("Process instance total count
by state and definition code"))
+ .register(Metrics.globalRegistry);
}
}
@@ -82,8 +78,26 @@ public class ProcessInstanceMetrics {
.register(Metrics.globalRegistry);
}
- public void incProcessInstanceByState(final String state) {
- processInstanceCounters.get(state).increment();
+ public void incProcessInstanceByStateAndProcessDefinitionCode(final String
state,
+ final String
processDefinitionCode) {
+ // When tags need to be determined from local context,
+ // you have no choice but to construct or lookup the Meter inside your
method body.
+ // The lookup cost is just a single hash lookup, so it is acceptable
for most use cases.
+ Metrics.globalRegistry.counter(
+ "ds.workflow.instance.count",
+ "state", state,
+ "process.definition.code", processDefinitionCode)
+ .increment();
+ }
+
+ public void cleanUpProcessInstanceCountMetricsByDefinitionCode(final
String processDefinitionCode) {
+ for (final String state : processInstanceStates) {
+ final Counter counter = Metrics.globalRegistry.counter(
+ "ds.workflow.instance.count",
+ "state", state,
+ "process.definition.code", processDefinitionCode);
+ Metrics.globalRegistry.remove(counter);
+ }
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowMetricsCleanUpProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowMetricsCleanUpProcessor.java
new file mode 100644
index 0000000000..30254d63a8
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowMetricsCleanUpProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import
org.apache.dolphinscheduler.remote.command.WorkflowMetricsCleanUpCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
+@Component
+public class WorkflowMetricsCleanUpProcessor implements NettyRequestProcessor {
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.WORKFLOW_METRICS_CLEANUP ==
command.getType(),
+ String.format("invalid command type: %s", command.getType()));
+
+ WorkflowMetricsCleanUpCommand workflowMetricsCleanUpCommand =
+ JSONUtils.parseObject(command.getBody(),
WorkflowMetricsCleanUpCommand.class);
+
+
ProcessInstanceMetrics.cleanUpProcessInstanceCountMetricsByDefinitionCode(
+ workflowMetricsCleanUpCommand.getProcessDefinitionCode());
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index a72d73b849..d039aac814 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -32,6 +32,7 @@ import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskUpdatePidProcessor;
import
org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
+import
org.apache.dolphinscheduler.server.master.processor.WorkflowMetricsCleanUpProcessor;
import lombok.extern.slf4j.Slf4j;
@@ -83,6 +84,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private TaskExecuteStartProcessor taskExecuteStartProcessor;
+ @Autowired
+ private WorkflowMetricsCleanUpProcessor workflowMetricsCleanUpProcessor;
+
public void start() {
log.info("Starting Master RPC Server...");
// init remoting server
@@ -101,6 +105,8 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST,
workflowExecutingDataRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START,
taskExecuteStartProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_METRICS_CLEANUP,
+ workflowMetricsCleanUpProcessor);
// log server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,
loggerRequestProcessor);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 1d36b1b47b..915b8f6ec9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -193,7 +193,8 @@ public class MasterFailoverService {
}
}
- ProcessInstanceMetrics.incProcessInstanceByState("failover");
+
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("failover",
+ processInstance.getProcessDefinitionCode().toString());
// updateProcessInstance host is null to mark this
processInstance has been failover
// and insert a failover command
processInstance.setHost(Constants.NULL);
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 1bb9e4dbeb..8289da93b9 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -148,6 +148,7 @@ public class FailoverServiceTest {
processInstance.setRestartTime(new Date());
processInstance.setHistoryCmd("xxx");
processInstance.setCommandType(CommandType.STOP);
+ processInstance.setProcessDefinitionCode(123L);
masterTaskInstance = new TaskInstance();
masterTaskInstance.setId(1);
diff --git
a/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json
b/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json
index 3599ba8490..d5b7f1f397 100644
---
a/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json
+++
b/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json
@@ -70,6 +70,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -125,7 +127,8 @@
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -159,6 +162,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -214,7 +219,8 @@
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -248,6 +254,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -303,7 +311,8 @@
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -362,6 +371,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -417,7 +428,8 @@
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -502,7 +514,7 @@
"text": {},
"textMode": "auto"
},
- "pluginVersion": "9.0.5",
+ "pluginVersion": "9.4.3",
"targets": [
{
"exemplar": true,
@@ -569,7 +581,7 @@
"showThresholdMarkers": true,
"text": {}
},
- "pluginVersion": "9.0.5",
+ "pluginVersion": "9.4.3",
"targets": [
{
"exemplar": true,
@@ -624,7 +636,7 @@
"alertThreshold": true
},
"percentage": false,
- "pluginVersion": "9.0.5",
+ "pluginVersion": "9.4.3",
"pointradius": 5,
"points": false,
"renderer": "flot",
@@ -731,7 +743,7 @@
"alertThreshold": true
},
"percentage": false,
- "pluginVersion": "9.0.5",
+ "pluginVersion": "9.4.3",
"pointradius": 5,
"points": false,
"renderer": "flot",
@@ -835,7 +847,7 @@
"alertThreshold": true
},
"percentage": false,
- "pluginVersion": "9.0.5",
+ "pluginVersion": "9.4.3",
"pointradius": 5,
"points": false,
"renderer": "flot",
@@ -920,6 +932,21 @@
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
+ "fieldConfig": {
+ "defaults": {
+ "custom": {
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "scaleDistribution": {
+ "type": "linear"
+ }
+ }
+ },
+ "overrides": []
+ },
"gridPos": {
"h": 9,
"w": 6,
@@ -933,7 +960,44 @@
"legend": {
"show": true
},
- "pluginVersion": "9.0.5",
+ "options": {
+ "calculate": true,
+ "calculation": {},
+ "cellGap": 2,
+ "cellValues": {},
+ "color": {
+ "exponent": 0.5,
+ "fill": "#F2495C",
+ "mode": "opacity",
+ "reverse": false,
+ "scale": "exponential",
+ "scheme": "Oranges",
+ "steps": 128
+ },
+ "exemplars": {
+ "color": "rgba(255,0,255,0.7)"
+ },
+ "filterValues": {
+ "le": 1e-9
+ },
+ "legend": {
+ "show": true
+ },
+ "rowsFrame": {
+ "layout": "auto"
+ },
+ "showValue": "never",
+ "tooltip": {
+ "show": true,
+ "yHistogram": false
+ },
+ "yAxis": {
+ "axisPlacement": "left",
+ "reverse": false,
+ "unit": "s"
+ }
+ },
+ "pluginVersion": "9.4.3",
"reverseYBuckets": false,
"targets": [
{
@@ -984,6 +1048,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1030,16 +1096,109 @@
},
"gridPos": {
"h": 8,
- "w": 12,
+ "w": 24,
"x": 0,
"y": 28
},
+ "id": 196,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "editorMode": "builder",
+ "expr":
"ds_workflow_instance_count_total{process_definition_code=\"dummy\"}",
+ "legendFormat": "__auto",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "Process Instance State Count By Definition Code",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 6,
+ "x": 0,
+ "y": 36
+ },
"id": 152,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1070,6 +1229,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1116,16 +1277,17 @@
},
"gridPos": {
"h": 8,
- "w": 12,
- "x": 12,
- "y": 28
+ "w": 7,
+ "x": 6,
+ "y": 36
},
- "id": 162,
+ "id": 160,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1138,11 +1300,11 @@
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
- "expr":
"sum(increase(ds_workflow_instance_count_total{state=\"finish\"}[1m]))",
+ "expr":
"sum(increase(ds_workflow_instance_count_total{state=\"stop\"}[1m]))",
"refId": "A"
}
],
- "title": "Process Instance Finish/1m",
+ "title": "Process Instance Stop/1m",
"type": "timeseries"
},
{
@@ -1156,6 +1318,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1203,15 +1367,16 @@
"gridPos": {
"h": 8,
"w": 6,
- "x": 0,
+ "x": 14,
"y": 36
},
- "id": 156,
+ "id": 162,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1224,11 +1389,11 @@
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
- "expr":
"sum(increase(ds_workflow_instance_count_total{state=\"success\"}[1m]))",
+ "expr":
"sum(increase(ds_workflow_instance_count_total{state=\"finish\"}[1m]))",
"refId": "A"
}
],
- "title": "Process Instance Success /1m",
+ "title": "Process Instance Finish/1m",
"type": "timeseries"
},
{
@@ -1242,6 +1407,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1289,15 +1456,16 @@
"gridPos": {
"h": 8,
"w": 6,
- "x": 6,
- "y": 36
+ "x": 0,
+ "y": 44
},
- "id": 160,
+ "id": 156,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1310,11 +1478,11 @@
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
- "expr":
"sum(increase(ds_workflow_instance_count_total{state=\"stop\"}[1m]))",
+ "expr":
"sum(increase(ds_workflow_instance_count_total{state=\"success\"}[1m]))",
"refId": "A"
}
],
- "title": "Process Instance Stop/1m",
+ "title": "Process Instance Success /1m",
"type": "timeseries"
},
{
@@ -1328,6 +1496,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1375,15 +1545,16 @@
"gridPos": {
"h": 8,
"w": 6,
- "x": 12,
- "y": 36
+ "x": 6,
+ "y": 44
},
"id": 154,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1414,6 +1585,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1461,15 +1634,16 @@
"gridPos": {
"h": 8,
"w": 6,
- "x": 18,
- "y": 36
+ "x": 12,
+ "y": 44
},
"id": 158,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1495,7 +1669,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 44
+ "y": 52
},
"id": 172,
"panels": [],
@@ -1513,6 +1687,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1561,14 +1737,15 @@
"h": 8,
"w": 8,
"x": 0,
- "y": 45
+ "y": 53
},
"id": 178,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1617,6 +1794,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1665,14 +1844,15 @@
"h": 8,
"w": 8,
"x": 8,
- "y": 45
+ "y": 53
},
"id": 180,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1703,6 +1883,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1751,14 +1933,15 @@
"h": 8,
"w": 8,
"x": 16,
- "y": 45
+ "y": 53
},
"id": 182,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1789,6 +1972,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1837,14 +2022,15 @@
"h": 8,
"w": 8,
"x": 0,
- "y": 53
+ "y": 61
},
"id": 184,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1875,6 +2061,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -1923,14 +2111,15 @@
"h": 8,
"w": 8,
"x": 8,
- "y": 53
+ "y": 61
},
"id": 186,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -1961,6 +2150,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -2009,14 +2200,15 @@
"h": 8,
"w": 8,
"x": 16,
- "y": 53
+ "y": 61
},
"id": 188,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -2047,6 +2239,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -2095,14 +2289,15 @@
"h": 8,
"w": 8,
"x": 0,
- "y": 61
+ "y": 69
},
"id": 190,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -2133,6 +2328,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -2181,14 +2378,15 @@
"h": 8,
"w": 8,
"x": 8,
- "y": 61
+ "y": 69
},
"id": 192,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -2219,6 +2417,8 @@
"mode": "palette-classic"
},
"custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@@ -2267,14 +2467,15 @@
"h": 8,
"w": 8,
"x": 16,
- "y": 61
+ "y": 69
},
"id": 194,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
- "placement": "bottom"
+ "placement": "bottom",
+ "showLegend": true
},
"tooltip": {
"mode": "single",
@@ -2295,8 +2496,9 @@
"type": "timeseries"
}
],
- "refresh": "5s",
- "schemaVersion": 36,
+ "refresh": false,
+ "revision": 1,
+ "schemaVersion": 38,
"style": "dark",
"tags": [],
"templating": {
@@ -2430,8 +2632,8 @@
]
},
"time": {
- "from": "now-30m",
- "to": "now"
+ "from": "2023-03-07T04:20:35.626Z",
+ "to": "2023-03-07T04:22:11.080Z"
},
"timepicker": {
"now": true,
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 0ee631ae28..88f0f117f3 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -144,5 +144,7 @@ public enum CommandType {
/**
* workflow executing data response, from master to api
*/
- WORKFLOW_EXECUTING_DATA_RESPONSE;
+ WORKFLOW_EXECUTING_DATA_RESPONSE,
+
+ WORKFLOW_METRICS_CLEANUP;
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowMetricsCleanUpCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowMetricsCleanUpCommand.java
new file mode 100644
index 0000000000..337bd05333
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowMetricsCleanUpCommand.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+import lombok.Data;
+
+@Data
+public class WorkflowMetricsCleanUpCommand implements Serializable {
+
+ private String processDefinitionCode;
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.WORKFLOW_METRICS_CLEANUP);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+}