This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7e02c886b1 [Feature][RestAPI] overview support tag filter (#7173)
7e02c886b1 is described below
commit 7e02c886b143fd6cfaa8d60d4ead25cd5d5b0ac0
Author: Jarvis <[email protected]>
AuthorDate: Fri Jul 12 10:03:51 2024 +0800
[Feature][RestAPI] overview support tag filter (#7173)
---
docs/en/seatunnel-engine/rest-api.md | 15 +++--
docs/zh/seatunnel-engine/rest-api.md | 43 ++++++++------
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 49 +++++++++++++---
.../engine/server/CoordinatorService.java | 3 +-
.../resourcemanager/AbstractResourceManager.java | 67 +++++++++++-----------
.../server/resourcemanager/ResourceManager.java | 6 +-
.../opeartion/GetOverviewOperation.java | 35 +++++++++--
.../server/rest/RestHttpGetCommandProcessor.java | 23 ++++++--
.../resourcemanager/FixSlotResourceTest.java | 2 +-
9 files changed, 160 insertions(+), 83 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api.md
b/docs/en/seatunnel-engine/rest-api.md
index 28931336a9..ef71814cfb 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -38,10 +38,14 @@ network:
### Returns an overview over the Zeta engine cluster.
<details>
- <summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview</b></code>
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
+ <summary><code>GET</code>
<code><b>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2</b></code>
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
#### Parameters
+> | name | type | data type |
description |
+>
|----------|----------|-----------|------------------------------------------------------------------------------------------------------|
+> | tag_name | optional | string | the tags filter, you can add tag filter
to get those matched worker count, and slot on those workers |
+
#### Responses
```json
@@ -50,16 +54,17 @@ network:
"gitCommitAbbrev":"DeadD0d0",
"totalSlot":"0",
"unassignedSlot":"0",
+ "works":"1",
"runningJobs":"0",
"finishedJobs":"0",
"failedJobs":"0",
- "cancelledJobs":"0",
- "works":"1"
+ "cancelledJobs":"0"
}
```
-If you use `dynamic-slot`, the `totalSlot` and `unassignedSlot` always be `0`.
-If you set it to fix slot number, it will return the correct total and
unassigned slot number
+**Notes:**
+- If you use `dynamic-slot`, the `totalSlot` and `unassignedSlot` always be
`0`. when you set it to fix slot number, it will return the correct total and
unassigned slot number
+- If the url has tag filter, the `works`, `totalSlot` and `unassignedSlot`
will return the result on the matched worker. but the job related metric will
always return the cluster level information.
</details>
diff --git a/docs/zh/seatunnel-engine/rest-api.md
b/docs/zh/seatunnel-engine/rest-api.md
index 20c1020fd9..baa38f4cd9 100644
--- a/docs/zh/seatunnel-engine/rest-api.md
+++ b/docs/zh/seatunnel-engine/rest-api.md
@@ -37,10 +37,14 @@ network:
### 返回Zeta集群的概览
<details>
- <summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview</b></code>
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
+ <summary><code>GET</code>
<code><b>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2</b></code>
<code>(Returns an overview over the Zeta engine cluster.)</code></summary>
#### 参数
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+> |--------|------|------|--------------------------|
+> | tag键值对 | 否 | 字符串 | 一组标签值, 通过该标签值过滤满足条件的节点信息 |
+
#### 响应
```json
@@ -49,16 +53,17 @@ network:
"gitCommitAbbrev":"DeadD0d0",
"totalSlot":"0",
"unassignedSlot":"0",
+ "works":"1",
"runningJobs":"0",
"finishedJobs":"0",
"failedJobs":"0",
- "cancelledJobs":"0",
- "works":"1"
+ "cancelledJobs":"0"
}
```
-当你使用`dynamic-slot`时, 返回结果中的`totalSlot`和`unassignedSlot`将始终为0.
-当你设置为固定的slot值时, 将正确返回集群中总共的slot数量以及未分配的slot数量.
+**注意:**
+- 当你使用`dynamic-slot`时, 返回结果中的`totalSlot`和`unassignedSlot`将始终为0. 设置为固定的slot值后,
将正确返回集群中总共的slot数量以及未分配的slot数量.
+- 当添加标签过滤后, `works`, `totalSlot`, `unassignedSlot`将返回满足条件的节点的相关指标.
注意`runningJobs`等job相关指标为集群级别结果, 无法根据标签进行过滤.
</details>
@@ -110,9 +115,9 @@ network:
#### 参数
-> | name | type | data type | description |
-> |-------|----------|-----------|-------------|
-> | jobId | required | long | job id |
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+> |-------|------|------|--------|
+> | jobId | 是 | long | job id |
#### 响应
@@ -167,9 +172,9 @@ network:
#### 参数
-> | name | type | data type | description |
-> |-------|----------|-----------|-------------|
-> | jobId | required | long | job id |
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+> |-------|------|------|--------|
+> | jobId | 是 | long | job id |
#### 响应
@@ -222,9 +227,9 @@ network:
#### 参数
-> | name | type | data type | description
|
->
|-------|----------|-----------|------------------------------------------------------------------|
-> | state | optional | string | finished job status.
`FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述
|
+>
|-------|----------|--------|------------------------------------------------------------------|
+> | state | optional | string | finished job status.
`FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |
#### 响应
@@ -319,11 +324,11 @@ network:
#### 参数
-> | name | type | data type | description
|
->
|----------------------|----------|-----------|-----------------------------------|
-> | jobId | optional | string | job id
|
-> | jobName | optional | string | job name
|
-> | isStartWithSavePoint | optional | string | if job is started with save
point |
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述
|
+>
|----------------------|----------|--------|-----------------------------------|
+> | jobId | optional | string | job id
|
+> | jobName | optional | string | job name
|
+> | isStartWithSavePoint | optional | string | if job is started with save
point |
#### 请求体
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 51a1fd85ec..71b903ca16 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -34,6 +34,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.extern.slf4j.Slf4j;
@@ -63,13 +65,23 @@ public class RestApiIT {
@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
- SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
- seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
-
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
-
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
- node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ SeaTunnelConfig node1Config =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ node1Config.getHazelcastConfig().setClusterName(testClusterName);
+
node1Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+ node1Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
+ MemberAttributeConfig node1Tags = new MemberAttributeConfig();
+ node1Tags.setAttribute("node", "node1");
+ node1Config.getHazelcastConfig().setMemberAttributeConfig(node1Tags);
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(node1Config);
- node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ MemberAttributeConfig node2Tags = new MemberAttributeConfig();
+ node2Tags.setAttribute("node", "node2");
+ Config node2hzconfig =
node1Config.getHazelcastConfig().setMemberAttributeConfig(node2Tags);
+ SeaTunnelConfig node2Config =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
node2Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+ node2Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
+ node2Config.setHazelcastConfig(node2hzconfig);
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(node2Config);
String filePath =
TestUtils.getResource("stream_fakesource_to_file.conf");
JobConfig jobConfig = new JobConfig();
@@ -79,7 +91,7 @@ public class RestApiIT {
clientConfig.setClusterName(testClusterName);
engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
seaTunnelConfig);
+ engineClient.createExecutionContext(filePath, jobConfig,
node1Config);
clientJobProxy = jobExecutionEnv.execute();
@@ -94,7 +106,7 @@ public class RestApiIT {
JobConfig batchConf = new JobConfig();
batchConf.setName("fake_to_console");
ClientJobExecutionEnvironment batchJobExecutionEnv =
- engineClient.createExecutionContext(batchFilePath, batchConf,
seaTunnelConfig);
+ engineClient.createExecutionContext(batchFilePath, batchConf,
node1Config);
batchJobProxy = batchJobExecutionEnv.execute();
Awaitility.await()
.atMost(5, TimeUnit.MINUTES)
@@ -240,6 +252,27 @@ public class RestApiIT {
});
}
+ @Test
+ public void testOverviewFilterByTag() {
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.OVERVIEW
+ + "?node=node1")
+ .then()
+ .statusCode(200)
+ .body("projectVersion", notNullValue())
+ .body("totalSlot", equalTo("20"))
+ .body("workers", equalTo("1"));
+ });
+ }
+
@Test
public void testGetRunningThreads() {
Arrays.asList(node2, node1)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 8c96b4e6e5..8c454c6777 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -70,6 +70,7 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -280,7 +281,7 @@ public class CoordinatorService {
return;
}
// waiting have worker registered
- while (getResourceManager().workerCount() == 0) {
+ while (getResourceManager().workerCount(Collections.emptyMap()) == 0) {
try {
logger.info("Waiting for worker registered");
Thread.sleep(1000);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 5fe29fa6f1..b830e5f056 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -140,35 +140,7 @@ public abstract class AbstractResourceManager implements
ResourceManager {
long jobId, List<ResourceProfile> resourceProfile, Map<String,
String> tagFilter)
throws NoEnoughResourceException {
waitingWorkerRegister();
- ConcurrentMap<Address, WorkerProfile> matchedWorker;
- if (tagFilter == null || tagFilter.isEmpty()) {
- matchedWorker = registerWorker;
- } else {
- matchedWorker =
- registerWorker.entrySet().stream()
- .filter(
- e -> {
- Map<String, String> workerAttr =
- e.getValue().getAttributes();
- if (workerAttr == null ||
workerAttr.isEmpty()) {
- return false;
- }
- boolean match = true;
- for (Map.Entry<String, String> entry :
- tagFilter.entrySet()) {
- if
(!workerAttr.containsKey(entry.getKey())
- || !workerAttr
-
.get(entry.getKey())
-
.equals(entry.getValue())) {
- return false;
- }
- }
- return match;
- })
- .collect(
- Collectors.toConcurrentMap(
- Map.Entry::getKey,
Map.Entry::getValue));
- }
+ ConcurrentMap<Address, WorkerProfile> matchedWorker =
filterWorkerByTag(tagFilter);
if (matchedWorker.isEmpty()) {
log.error("No matched worker with tag filter {}.", tagFilter);
throw new NoEnoughResourceException();
@@ -264,21 +236,46 @@ public abstract class AbstractResourceManager implements
ResourceManager {
}
@Override
- public List<SlotProfile> getUnassignedSlots() {
- return registerWorker.values().stream()
+ public List<SlotProfile> getUnassignedSlots(Map<String, String> tags) {
+ return filterWorkerByTag(tags).values().stream()
.flatMap(workerProfile ->
Arrays.stream(workerProfile.getUnassignedSlots()))
.collect(Collectors.toList());
}
@Override
- public List<SlotProfile> getAssignedSlots() {
- return registerWorker.values().stream()
+ public List<SlotProfile> getAssignedSlots(Map<String, String> tags) {
+ return filterWorkerByTag(tags).values().stream()
.flatMap(workerProfile ->
Arrays.stream(workerProfile.getAssignedSlots()))
.collect(Collectors.toList());
}
@Override
- public int workerCount() {
- return registerWorker.size();
+ public int workerCount(Map<String, String> tags) {
+ return filterWorkerByTag(tags).size();
+ }
+
+ private ConcurrentMap<Address, WorkerProfile>
filterWorkerByTag(Map<String, String> tagFilter) {
+ if (tagFilter == null || tagFilter.isEmpty()) {
+ return registerWorker;
+ }
+ return registerWorker.entrySet().stream()
+ .filter(
+ e -> {
+ Map<String, String> workerAttr =
e.getValue().getAttributes();
+ if (workerAttr == null || workerAttr.isEmpty()) {
+ return false;
+ }
+ boolean match = true;
+ for (Map.Entry<String, String> entry :
tagFilter.entrySet()) {
+ if (!workerAttr.containsKey(entry.getKey())
+ || !workerAttr
+ .get(entry.getKey())
+ .equals(entry.getValue())) {
+ return false;
+ }
+ }
+ return match;
+ })
+ .collect(Collectors.toConcurrentMap(Map.Entry::getKey,
Map.Entry::getValue));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 8e222b0682..0911345eb2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -62,9 +62,9 @@ public interface ResourceManager {
void close();
- List<SlotProfile> getUnassignedSlots();
+ List<SlotProfile> getUnassignedSlots(Map<String, String> tags);
- List<SlotProfile> getAssignedSlots();
+ List<SlotProfile> getAssignedSlots(Map<String, String> tags);
- int workerCount();
+ int workerCount(Map<String, String> tags);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
index 6bc0ef8906..8b2533ece5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
@@ -27,23 +27,33 @@ import
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
import com.hazelcast.map.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
@Slf4j
public class GetOverviewOperation extends Operation implements
IdentifiedDataSerializable {
private OverviewInfo overviewInfo;
+ private Map<String, String> tags;
+
+ public GetOverviewOperation() {}
+
+ public GetOverviewOperation(Map<String, String> tags) {
+ this.tags = tags;
+ }
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
-
- overviewInfo = getOverviewInfo(server, getNodeEngine());
+ overviewInfo = getOverviewInfo(server, getNodeEngine(), tags);
}
@Override
@@ -66,17 +76,19 @@ public class GetOverviewOperation extends Operation
implements IdentifiedDataSer
return SeaTunnelServer.SERVICE_NAME;
}
- public static OverviewInfo getOverviewInfo(SeaTunnelServer server,
NodeEngine nodeEngine) {
+ public static OverviewInfo getOverviewInfo(
+ SeaTunnelServer server, NodeEngine nodeEngine, Map<String, String>
tags) {
OverviewInfo overviewInfo = new OverviewInfo();
ResourceManager resourceManager =
server.getCoordinatorService().getResourceManager();
- List<SlotProfile> assignedSlots = resourceManager.getAssignedSlots();
+ List<SlotProfile> assignedSlots =
resourceManager.getAssignedSlots(tags);
- List<SlotProfile> unassignedSlots =
resourceManager.getUnassignedSlots();
+ List<SlotProfile> unassignedSlots =
resourceManager.getUnassignedSlots(tags);
IMap<Long, JobState> finishedJob =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE);
overviewInfo.setTotalSlot(assignedSlots.size() +
unassignedSlots.size());
overviewInfo.setUnassignedSlot(unassignedSlots.size());
+ overviewInfo.setWorkers(resourceManager.workerCount(tags));
overviewInfo.setRunningJobs(
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO).size());
overviewInfo.setFailedJobs(
@@ -95,7 +107,6 @@ public class GetOverviewOperation extends Operation
implements IdentifiedDataSer
.name()
.equals(JobStatus.CANCELED.toString()))
.count());
- overviewInfo.setWorkers(resourceManager.workerCount());
overviewInfo.setFinishedJobs(
finishedJob.values().stream()
.filter(
@@ -107,4 +118,16 @@ public class GetOverviewOperation extends Operation
implements IdentifiedDataSer
return overviewInfo;
}
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeObject(tags);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ tags = in.readObject();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 0e89f9cfda..6081b0f2ea 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -65,6 +65,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
@@ -110,7 +111,7 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
} else if (uri.startsWith(RUNNING_THREADS)) {
getRunningThread(httpGetCommand);
} else if (uri.startsWith(OVERVIEW)) {
- overView(httpGetCommand);
+ overView(httpGetCommand, uri);
} else {
original.handle(httpGetCommand);
}
@@ -129,8 +130,20 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
handle(httpGetCommand);
}
- public void overView(HttpGetCommand command) {
-
+ public void overView(HttpGetCommand command, String uri) {
+ uri = StringUtil.stripTrailingSlash(uri);
+ String tagStr;
+ if (uri.contains("?")) {
+ int index = uri.indexOf("?");
+ tagStr = uri.substring(index + 1);
+ } else {
+ tagStr = "";
+ }
+ Map<String, String> tags =
+ Arrays.stream(tagStr.split("&"))
+ .map(variable -> variable.split("=", 2))
+ .filter(pair -> pair.length == 2)
+ .collect(Collectors.toMap(pair -> pair[0], pair ->
pair[1]));
Version version = EnvironmentUtil.getVersion();
SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
@@ -141,14 +154,14 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
overviewInfo =
(OverviewInfo)
NodeEngineUtil.sendOperationToMasterNode(
- getNode().nodeEngine, new
GetOverviewOperation())
+ getNode().nodeEngine, new
GetOverviewOperation(tags))
.join();
overviewInfo.setProjectVersion(version.getProjectVersion());
overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev());
} else {
NodeEngineImpl nodeEngine =
this.textCommandService.getNode().getNodeEngine();
- overviewInfo =
GetOverviewOperation.getOverviewInfo(seaTunnelServer, nodeEngine);
+ overviewInfo =
GetOverviewOperation.getOverviewInfo(seaTunnelServer, nodeEngine, tags);
overviewInfo.setProjectVersion(version.getProjectVersion());
overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
index cbba82dda8..b3df40f6aa 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
@@ -86,7 +86,7 @@ public class FixSlotResourceTest extends
AbstractSeaTunnelServerTest<FixSlotReso
3,
server.getCoordinatorService()
.getResourceManager()
- .getUnassignedSlots()
+ .getUnassignedSlots(null)
.size());
});
resourceProfiles.remove(0);