This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7e02c886b1 [Feature][RestAPI] overview support tag filter (#7173)
7e02c886b1 is described below

commit 7e02c886b143fd6cfaa8d60d4ead25cd5d5b0ac0
Author: Jarvis <[email protected]>
AuthorDate: Fri Jul 12 10:03:51 2024 +0800

    [Feature][RestAPI] overview support tag filter (#7173)
---
 docs/en/seatunnel-engine/rest-api.md               | 15 +++--
 docs/zh/seatunnel-engine/rest-api.md               | 43 ++++++++------
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 49 +++++++++++++---
 .../engine/server/CoordinatorService.java          |  3 +-
 .../resourcemanager/AbstractResourceManager.java   | 67 +++++++++++-----------
 .../server/resourcemanager/ResourceManager.java    |  6 +-
 .../opeartion/GetOverviewOperation.java            | 35 +++++++++--
 .../server/rest/RestHttpGetCommandProcessor.java   | 23 ++++++--
 .../resourcemanager/FixSlotResourceTest.java       |  2 +-
 9 files changed, 160 insertions(+), 83 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api.md 
b/docs/en/seatunnel-engine/rest-api.md
index 28931336a9..ef71814cfb 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -38,10 +38,14 @@ network:
 ### Returns an overview over the Zeta engine cluster.
 
 <details>
- <summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview</b></code> 
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
+ <summary><code>GET</code> 
<code><b>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2</b></code> 
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
 
 #### Parameters
 
+> |   name   |   type   | data type |                                          
   description                                              |
+> 
|----------|----------|-----------|------------------------------------------------------------------------------------------------------|
+> | tag_name | optional | string    | the tags filter, you can add tag filter 
to get those matched worker count, and slot on those workers |
+
 #### Responses
 
 ```json
@@ -50,16 +54,17 @@ network:
     "gitCommitAbbrev":"DeadD0d0",
     "totalSlot":"0",
     "unassignedSlot":"0",
+    "works":"1",
     "runningJobs":"0",
     "finishedJobs":"0",
     "failedJobs":"0",
-    "cancelledJobs":"0",
-    "works":"1"
+    "cancelledJobs":"0"
 }
 ```
 
-If you use `dynamic-slot`, the `totalSlot` and `unassignedSlot` always be `0`.
-If you set it to fix slot number, it will return the correct total and 
unassigned slot number
+**Notes:**
+- If you use `dynamic-slot`, the `totalSlot` and `unassignedSlot` always be 
`0`. when you set it to fix slot number, it will return the correct total and 
unassigned slot number
+- If the url has tag filter, the `works`, `totalSlot` and `unassignedSlot` 
will return the result on the matched worker. but the job related metric will 
always return the cluster level information.
 
 </details>
 
diff --git a/docs/zh/seatunnel-engine/rest-api.md 
b/docs/zh/seatunnel-engine/rest-api.md
index 20c1020fd9..baa38f4cd9 100644
--- a/docs/zh/seatunnel-engine/rest-api.md
+++ b/docs/zh/seatunnel-engine/rest-api.md
@@ -37,10 +37,14 @@ network:
 ### 返回Zeta集群的概览
 
 <details>
- <summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview</b></code> 
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
+ <summary><code>GET</code> 
<code><b>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2</b></code> 
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
 
 #### 参数
 
+> |  参数名称  | 是否必传 | 参数类型 |           参数描述           |
+> |--------|------|------|--------------------------|
+> | tag键值对 | 否    | 字符串  | 一组标签值, 通过该标签值过滤满足条件的节点信息 |
+
 #### 响应
 
 ```json
@@ -49,16 +53,17 @@ network:
     "gitCommitAbbrev":"DeadD0d0",
     "totalSlot":"0",
     "unassignedSlot":"0",
+    "works":"1",
     "runningJobs":"0",
     "finishedJobs":"0",
     "failedJobs":"0",
-    "cancelledJobs":"0",
-    "works":"1"
+    "cancelledJobs":"0"
 }
 ```
 
-当你使用`dynamic-slot`时, 返回结果中的`totalSlot`和`unassignedSlot`将始终为0.
-当你设置为固定的slot值时, 将正确返回集群中总共的slot数量以及未分配的slot数量.
+**注意:**
+- 当你使用`dynamic-slot`时, 返回结果中的`totalSlot`和`unassignedSlot`将始终为0. 设置为固定的slot值后, 
将正确返回集群中总共的slot数量以及未分配的slot数量.
+- 当添加标签过滤后, `works`, `totalSlot`, `unassignedSlot`将返回满足条件的节点的相关指标. 
注意`runningJobs`等job相关指标为集群级别结果, 无法根据标签进行过滤.
 
 </details>
 
@@ -110,9 +115,9 @@ network:
 
 #### 参数
 
-> | name  |   type   | data type | description |
-> |-------|----------|-----------|-------------|
-> | jobId | required | long      | job id      |
+> | 参数名称  | 是否必传 | 参数类型 |  参数描述  |
+> |-------|------|------|--------|
+> | jobId | 是    | long | job id |
 
 #### 响应
 
@@ -167,9 +172,9 @@ network:
 
 #### 参数
 
-> | name  |   type   | data type | description |
-> |-------|----------|-----------|-------------|
-> | jobId | required | long      | job id      |
+> | 参数名称  | 是否必传 | 参数类型 |  参数描述  |
+> |-------|------|------|--------|
+> | jobId | 是    | long | job id |
 
 #### 响应
 
@@ -222,9 +227,9 @@ network:
 
 #### 参数
 
-> | name  |   type   | data type |                           description       
                     |
-> 
|-------|----------|-----------|------------------------------------------------------------------|
-> | state | optional | string    | finished job status. 
`FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |
+> | 参数名称  |   是否必传   |  参数类型  |                               参数描述             
                  |
+> 
|-------|----------|--------|------------------------------------------------------------------|
+> | state | optional | string | finished job status. 
`FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |
 
 #### 响应
 
@@ -319,11 +324,11 @@ network:
 
 #### 参数
 
-> |         name         |   type   | data type |            description       
     |
-> 
|----------------------|----------|-----------|-----------------------------------|
-> | jobId                | optional | string    | job id                       
     |
-> | jobName              | optional | string    | job name                     
     |
-> | isStartWithSavePoint | optional | string    | if job is started with save 
point |
+> |         参数名称         |   是否必传   |  参数类型  |               参数描述              
  |
+> 
|----------------------|----------|--------|-----------------------------------|
+> | jobId                | optional | string | job id                          
  |
+> | jobName              | optional | string | job name                        
  |
+> | isStartWithSavePoint | optional | string | if job is started with save 
point |
 
 #### 请求体
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 51a1fd85ec..71b903ca16 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -34,6 +34,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MemberAttributeConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import lombok.extern.slf4j.Slf4j;
 
@@ -63,13 +65,23 @@ public class RestApiIT {
     @BeforeEach
     void beforeClass() throws Exception {
         String testClusterName = TestUtils.getClusterName("RestApiIT");
-        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
-        seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
-        
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
-        
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
-        node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+        SeaTunnelConfig node1Config = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        node1Config.getHazelcastConfig().setClusterName(testClusterName);
+        
node1Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+        node1Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
+        MemberAttributeConfig node1Tags = new MemberAttributeConfig();
+        node1Tags.setAttribute("node", "node1");
+        node1Config.getHazelcastConfig().setMemberAttributeConfig(node1Tags);
+        node1 = SeaTunnelServerStarter.createHazelcastInstance(node1Config);
 
-        node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+        MemberAttributeConfig node2Tags = new MemberAttributeConfig();
+        node2Tags.setAttribute("node", "node2");
+        Config node2hzconfig = 
node1Config.getHazelcastConfig().setMemberAttributeConfig(node2Tags);
+        SeaTunnelConfig node2Config = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
node2Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+        node2Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
+        node2Config.setHazelcastConfig(node2hzconfig);
+        node2 = SeaTunnelServerStarter.createHazelcastInstance(node2Config);
 
         String filePath = 
TestUtils.getResource("stream_fakesource_to_file.conf");
         JobConfig jobConfig = new JobConfig();
@@ -79,7 +91,7 @@ public class RestApiIT {
         clientConfig.setClusterName(testClusterName);
         engineClient = new SeaTunnelClient(clientConfig);
         ClientJobExecutionEnvironment jobExecutionEnv =
-                engineClient.createExecutionContext(filePath, jobConfig, 
seaTunnelConfig);
+                engineClient.createExecutionContext(filePath, jobConfig, 
node1Config);
 
         clientJobProxy = jobExecutionEnv.execute();
 
@@ -94,7 +106,7 @@ public class RestApiIT {
         JobConfig batchConf = new JobConfig();
         batchConf.setName("fake_to_console");
         ClientJobExecutionEnvironment batchJobExecutionEnv =
-                engineClient.createExecutionContext(batchFilePath, batchConf, 
seaTunnelConfig);
+                engineClient.createExecutionContext(batchFilePath, batchConf, 
node1Config);
         batchJobProxy = batchJobExecutionEnv.execute();
         Awaitility.await()
                 .atMost(5, TimeUnit.MINUTES)
@@ -240,6 +252,27 @@ public class RestApiIT {
                         });
     }
 
+    @Test
+    public void testOverviewFilterByTag() {
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            given().get(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + RestConstant.OVERVIEW
+                                                    + "?node=node1")
+                                    .then()
+                                    .statusCode(200)
+                                    .body("projectVersion", notNullValue())
+                                    .body("totalSlot", equalTo("20"))
+                                    .body("workers", equalTo("1"));
+                        });
+    }
+
     @Test
     public void testGetRunningThreads() {
         Arrays.asList(node2, node1)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 8c96b4e6e5..8c454c6777 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -70,6 +70,7 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
 import lombok.NonNull;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -280,7 +281,7 @@ public class CoordinatorService {
             return;
         }
         // waiting have worker registered
-        while (getResourceManager().workerCount() == 0) {
+        while (getResourceManager().workerCount(Collections.emptyMap()) == 0) {
             try {
                 logger.info("Waiting for worker registered");
                 Thread.sleep(1000);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 5fe29fa6f1..b830e5f056 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -140,35 +140,7 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
             long jobId, List<ResourceProfile> resourceProfile, Map<String, 
String> tagFilter)
             throws NoEnoughResourceException {
         waitingWorkerRegister();
-        ConcurrentMap<Address, WorkerProfile> matchedWorker;
-        if (tagFilter == null || tagFilter.isEmpty()) {
-            matchedWorker = registerWorker;
-        } else {
-            matchedWorker =
-                    registerWorker.entrySet().stream()
-                            .filter(
-                                    e -> {
-                                        Map<String, String> workerAttr =
-                                                e.getValue().getAttributes();
-                                        if (workerAttr == null || 
workerAttr.isEmpty()) {
-                                            return false;
-                                        }
-                                        boolean match = true;
-                                        for (Map.Entry<String, String> entry :
-                                                tagFilter.entrySet()) {
-                                            if 
(!workerAttr.containsKey(entry.getKey())
-                                                    || !workerAttr
-                                                            
.get(entry.getKey())
-                                                            
.equals(entry.getValue())) {
-                                                return false;
-                                            }
-                                        }
-                                        return match;
-                                    })
-                            .collect(
-                                    Collectors.toConcurrentMap(
-                                            Map.Entry::getKey, 
Map.Entry::getValue));
-        }
+        ConcurrentMap<Address, WorkerProfile> matchedWorker = 
filterWorkerByTag(tagFilter);
         if (matchedWorker.isEmpty()) {
             log.error("No matched worker with tag filter {}.", tagFilter);
             throw new NoEnoughResourceException();
@@ -264,21 +236,46 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
     }
 
     @Override
-    public List<SlotProfile> getUnassignedSlots() {
-        return registerWorker.values().stream()
+    public List<SlotProfile> getUnassignedSlots(Map<String, String> tags) {
+        return filterWorkerByTag(tags).values().stream()
                 .flatMap(workerProfile -> 
Arrays.stream(workerProfile.getUnassignedSlots()))
                 .collect(Collectors.toList());
     }
 
     @Override
-    public List<SlotProfile> getAssignedSlots() {
-        return registerWorker.values().stream()
+    public List<SlotProfile> getAssignedSlots(Map<String, String> tags) {
+        return filterWorkerByTag(tags).values().stream()
                 .flatMap(workerProfile -> 
Arrays.stream(workerProfile.getAssignedSlots()))
                 .collect(Collectors.toList());
     }
 
     @Override
-    public int workerCount() {
-        return registerWorker.size();
+    public int workerCount(Map<String, String> tags) {
+        return filterWorkerByTag(tags).size();
+    }
+
+    private ConcurrentMap<Address, WorkerProfile> 
filterWorkerByTag(Map<String, String> tagFilter) {
+        if (tagFilter == null || tagFilter.isEmpty()) {
+            return registerWorker;
+        }
+        return registerWorker.entrySet().stream()
+                .filter(
+                        e -> {
+                            Map<String, String> workerAttr = 
e.getValue().getAttributes();
+                            if (workerAttr == null || workerAttr.isEmpty()) {
+                                return false;
+                            }
+                            boolean match = true;
+                            for (Map.Entry<String, String> entry : 
tagFilter.entrySet()) {
+                                if (!workerAttr.containsKey(entry.getKey())
+                                        || !workerAttr
+                                                .get(entry.getKey())
+                                                .equals(entry.getValue())) {
+                                    return false;
+                                }
+                            }
+                            return match;
+                        })
+                .collect(Collectors.toConcurrentMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 8e222b0682..0911345eb2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -62,9 +62,9 @@ public interface ResourceManager {
 
     void close();
 
-    List<SlotProfile> getUnassignedSlots();
+    List<SlotProfile> getUnassignedSlots(Map<String, String> tags);
 
-    List<SlotProfile> getAssignedSlots();
+    List<SlotProfile> getAssignedSlots(Map<String, String> tags);
 
-    int workerCount();
+    int workerCount(Map<String, String> tags);
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
index 6bc0ef8906..8b2533ece5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
@@ -27,23 +27,33 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import 
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
 
 import com.hazelcast.map.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 @Slf4j
 public class GetOverviewOperation extends Operation implements 
IdentifiedDataSerializable {
 
     private OverviewInfo overviewInfo;
+    private Map<String, String> tags;
+
+    public GetOverviewOperation() {}
+
+    public GetOverviewOperation(Map<String, String> tags) {
+        this.tags = tags;
+    }
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-
-        overviewInfo = getOverviewInfo(server, getNodeEngine());
+        overviewInfo = getOverviewInfo(server, getNodeEngine(), tags);
     }
 
     @Override
@@ -66,17 +76,19 @@ public class GetOverviewOperation extends Operation 
implements IdentifiedDataSer
         return SeaTunnelServer.SERVICE_NAME;
     }
 
-    public static OverviewInfo getOverviewInfo(SeaTunnelServer server, 
NodeEngine nodeEngine) {
+    public static OverviewInfo getOverviewInfo(
+            SeaTunnelServer server, NodeEngine nodeEngine, Map<String, String> 
tags) {
         OverviewInfo overviewInfo = new OverviewInfo();
         ResourceManager resourceManager = 
server.getCoordinatorService().getResourceManager();
 
-        List<SlotProfile> assignedSlots = resourceManager.getAssignedSlots();
+        List<SlotProfile> assignedSlots = 
resourceManager.getAssignedSlots(tags);
 
-        List<SlotProfile> unassignedSlots = 
resourceManager.getUnassignedSlots();
+        List<SlotProfile> unassignedSlots = 
resourceManager.getUnassignedSlots(tags);
         IMap<Long, JobState> finishedJob =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE);
         overviewInfo.setTotalSlot(assignedSlots.size() + 
unassignedSlots.size());
         overviewInfo.setUnassignedSlot(unassignedSlots.size());
+        overviewInfo.setWorkers(resourceManager.workerCount(tags));
         overviewInfo.setRunningJobs(
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO).size());
         overviewInfo.setFailedJobs(
@@ -95,7 +107,6 @@ public class GetOverviewOperation extends Operation 
implements IdentifiedDataSer
                                                 .name()
                                                 
.equals(JobStatus.CANCELED.toString()))
                         .count());
-        overviewInfo.setWorkers(resourceManager.workerCount());
         overviewInfo.setFinishedJobs(
                 finishedJob.values().stream()
                         .filter(
@@ -107,4 +118,16 @@ public class GetOverviewOperation extends Operation 
implements IdentifiedDataSer
 
         return overviewInfo;
     }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeObject(tags);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        tags = in.readObject();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 0e89f9cfda..6081b0f2ea 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -65,6 +65,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
@@ -110,7 +111,7 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
             } else if (uri.startsWith(RUNNING_THREADS)) {
                 getRunningThread(httpGetCommand);
             } else if (uri.startsWith(OVERVIEW)) {
-                overView(httpGetCommand);
+                overView(httpGetCommand, uri);
             } else {
                 original.handle(httpGetCommand);
             }
@@ -129,8 +130,20 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
         handle(httpGetCommand);
     }
 
-    public void overView(HttpGetCommand command) {
-
+    public void overView(HttpGetCommand command, String uri) {
+        uri = StringUtil.stripTrailingSlash(uri);
+        String tagStr;
+        if (uri.contains("?")) {
+            int index = uri.indexOf("?");
+            tagStr = uri.substring(index + 1);
+        } else {
+            tagStr = "";
+        }
+        Map<String, String> tags =
+                Arrays.stream(tagStr.split("&"))
+                        .map(variable -> variable.split("=", 2))
+                        .filter(pair -> pair.length == 2)
+                        .collect(Collectors.toMap(pair -> pair[0], pair -> 
pair[1]));
         Version version = EnvironmentUtil.getVersion();
 
         SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
@@ -141,14 +154,14 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
             overviewInfo =
                     (OverviewInfo)
                             NodeEngineUtil.sendOperationToMasterNode(
-                                            getNode().nodeEngine, new 
GetOverviewOperation())
+                                            getNode().nodeEngine, new 
GetOverviewOperation(tags))
                                     .join();
             overviewInfo.setProjectVersion(version.getProjectVersion());
             overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev());
         } else {
 
             NodeEngineImpl nodeEngine = 
this.textCommandService.getNode().getNodeEngine();
-            overviewInfo = 
GetOverviewOperation.getOverviewInfo(seaTunnelServer, nodeEngine);
+            overviewInfo = 
GetOverviewOperation.getOverviewInfo(seaTunnelServer, nodeEngine, tags);
             overviewInfo.setProjectVersion(version.getProjectVersion());
             overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev());
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
index cbba82dda8..b3df40f6aa 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
@@ -86,7 +86,7 @@ public class FixSlotResourceTest extends 
AbstractSeaTunnelServerTest<FixSlotReso
                                     3,
                                     server.getCoordinatorService()
                                             .getResourceManager()
-                                            .getUnassignedSlots()
+                                            .getUnassignedSlots(null)
                                             .size());
                         });
         resourceProfiles.remove(0);

Reply via email to