This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2cc82bd4dc [Feature][RestAPI] Add overview api (#6883)
2cc82bd4dc is described below
commit 2cc82bd4dc491c23e6c60a55a58c64fbaa9a1b95
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Jun 3 18:40:57 2024 +0800
[Feature][RestAPI] Add overview api (#6883)
---
docs/en/seatunnel-engine/rest-api.md | 20 ++++
docs/zh/seatunnel-engine/rest-api.md | 20 ++++
pom.xml | 7 ++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 20 ++++
.../src/test/resources/seatunnel.yaml | 3 +-
seatunnel-engine/seatunnel-engine-common/pom.xml | 48 +++++++++
.../apache/seatunnel/engine/common/Constant.java | 2 +
.../engine/common/env/EnvironmentUtil.java | 89 +++++++++++++++++
.../seatunnel/engine/common/env/Version.java | 29 ++++++
.../resources-filtered/zeta.version.properties | 21 +---
.../engine/common/config/EnvironmentUtilTest.java | 40 ++++++++
.../resourcemanager/AbstractResourceManager.java | 12 +++
.../server/resourcemanager/ResourceManager.java | 4 +
.../opeartion/GetOverviewOperation.java | 110 +++++++++++++++++++++
.../resourcemanager/resource/OverviewInfo.java | 35 +++++++
.../seatunnel/engine/server/rest/RestConstant.java | 2 +
.../server/rest/RestHttpGetCommandProcessor.java | 42 +++++++-
.../serializable/ResourceDataSerializerHook.java | 5 +
18 files changed, 488 insertions(+), 21 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api.md
b/docs/en/seatunnel-engine/rest-api.md
index 0a0c160587..adde89d89a 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -35,6 +35,26 @@ network:
## API reference
+### Returns an overview over the Zeta engine cluster.
+
+#### Parameters
+
+#### Responses
+
+```json
+{
+ "projectVersion":"2.3.5-SNAPSHOT",
+ "gitCommitAbbrev":"DeadD0d0",
+ "totalSlot":"0",
+ "unassignedSlot":"0",
+ "runningJobs":"0",
+ "finishedJobs":"0",
+ "failedJobs":"0",
+ "cancelledJobs":"0",
+ "works":"1"
+}
+```
+
### Returns an overview over all jobs and their current state.
<details>
diff --git a/docs/zh/seatunnel-engine/rest-api.md
b/docs/zh/seatunnel-engine/rest-api.md
index ee9a1511a9..e6aecaeb2f 100644
--- a/docs/zh/seatunnel-engine/rest-api.md
+++ b/docs/zh/seatunnel-engine/rest-api.md
@@ -34,6 +34,26 @@ network:
## API参考
+### 返回Zeta集群的概览
+
+#### 参数
+
+#### 响应
+
+```json
+{
+ "projectVersion":"2.3.5-SNAPSHOT",
+ "gitCommitAbbrev":"DeadD0d0",
+ "totalSlot":"0",
+ "unassignedSlot":"0",
+ "runningJobs":"0",
+ "finishedJobs":"0",
+ "failedJobs":"0",
+ "cancelledJobs":"0",
+ "works":"1"
+}
+```
+
### 返回所有作业及其当前状态的概览。
<details>
diff --git a/pom.xml b/pom.xml
index acff06bc09..50c0d41200 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,7 @@
<config.version>1.3.3</config.version>
<maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
<maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
+
<maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version>
<flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
<maven-license-maven-plugin>1.20</maven-license-maven-plugin>
<log4j-core.version>2.17.1</log4j-core.version>
@@ -704,6 +705,12 @@
<version>${maven-helper-plugin.version}</version>
</plugin>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <version>${maven-git-commit-id-plugin.version}</version>
+ </plugin>
+
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>license-maven-plugin</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 3569fb4b11..37a8d51b76 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -218,6 +218,26 @@ public class RestApiIT {
});
}
+ @Test
+ public void testOverview() {
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.OVERVIEW)
+ .then()
+ .statusCode(200)
+ .body("projectVersion", notNullValue())
+ .body("totalSlot", equalTo("40"))
+ .body("workers", equalTo("2"));
+ });
+ }
+
@Test
public void testGetRunningThreads() {
Arrays.asList(node2, node1)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 7775a483cd..b643fe16b3 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -22,7 +22,8 @@ seatunnel:
queue-type: blockingqueue
print-execution-info-interval: 10
slot-service:
- dynamic-slot: true
+ dynamic-slot: false
+ slot-num: 20
checkpoint:
interval: 300000
timeout: 100000
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml
b/seatunnel-engine/seatunnel-engine-common/pom.xml
index 824a4aed53..0de0614dee 100644
--- a/seatunnel-engine/seatunnel-engine-common/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -42,4 +42,52 @@
<version>${project.version}</version>
</dependency>
</dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <filtering>false</filtering>
+ <directory>src/main/resources</directory>
+ </resource>
+ <resource>
+ <filtering>true</filtering>
+ <directory>src/main/resources-filtered</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>parse-version</id>
+ <goals>
+ <goal>parse-version</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <configuration>
+ <skipPoms>false</skipPoms>
+ <failOnNoGitDirectory>false</failOnNoGitDirectory>
+
<failOnUnableToExtractRepoInfo>false</failOnUnableToExtractRepoInfo>
+ <gitDescribe>
+ <skip>true</skip>
+ </gitDescribe>
+ </configuration>
+ <executions>
+ <execution>
+ <id>get-the-git-information</id>
+ <goals>
+ <goal>revision</goal>
+ </goals>
+ <phase>validate</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 3dc739168b..fdb2102581 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -60,4 +60,6 @@ public class Constant {
public static final Long IMAP_RUNNING_JOB_METRICS_KEY = 1L;
public static final String IMAP_CONNECTOR_JAR_REF_COUNTERS =
"engine_connectorJarRefCounters";
+
+ public static final String PROP_FILE = "zeta.version.properties";
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/EnvironmentUtil.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/EnvironmentUtil.java
new file mode 100644
index 0000000000..262b741cd8
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/EnvironmentUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.env;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+
+import static org.apache.seatunnel.engine.common.Constant.PROP_FILE;
+
+@Slf4j
+public class EnvironmentUtil {
+
+ private static String getProperty(Properties properties, String key,
String defaultValue) {
+ String value = properties.getProperty(key);
+ if (value == null || value.charAt(0) == '$') {
+ return defaultValue;
+ }
+ return value;
+ }
+
+ public static Version getVersion() {
+
+ Version version = new Version();
+ ClassLoader classLoader = EnvironmentUtil.class.getClassLoader();
+
+ try (InputStream propFile =
classLoader.getResourceAsStream(PROP_FILE)) {
+
+ if (propFile != null) {
+ Properties properties = new Properties();
+
+ properties.load(propFile);
+
+ version.setProjectVersion(
+ getProperty(properties, "project.version",
version.getProjectVersion()));
+ version.setGitCommitId(
+ getProperty(properties, "git.commit.id",
version.getGitCommitId()));
+ version.setGitCommitAbbrev(
+ getProperty(
+ properties, "git.commit.id.abbrev",
version.getGitCommitAbbrev()));
+
+ DateTimeFormatter gitDateTimeFormatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ");
+
+ DateTimeFormatter systemDefault =
+
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.systemDefault());
+
+ version.setBuildTime(
+ systemDefault.format(
+ gitDateTimeFormatter.parse(
+ getProperty(
+ properties,
+ "git.build.time",
+ version.getBuildTime()))));
+ version.setCommitTime(
+ systemDefault.format(
+ gitDateTimeFormatter.parse(
+ getProperty(
+ properties,
+ "git.commit.time",
+ version.getCommitTime()))));
+ }
+
+ } catch (IOException ioException) {
+ log.info("Unable to read version property file: {}",
ioException.getMessage());
+ }
+
+ return version;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/Version.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/Version.java
new file mode 100644
index 0000000000..b04dc96f4b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/Version.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.env;
+
+import lombok.Data;
+
+@Data
+public class Version {
+ private String projectVersion = "<unknown>";
+ private String gitCommitId = "DecafC0ffeeD0d0F00d";
+ private String buildTime = "1970-01-01T00:00:00+0000";
+ private String commitTime = "1970-01-01T00:00:00+0000";
+ private String gitCommitAbbrev = "DeadD0d0";
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-common/src/main/resources-filtered/zeta.version.properties
similarity index 66%
copy from
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
copy to
seatunnel-engine/seatunnel-engine-common/src/main/resources-filtered/zeta.version.properties
index 7775a483cd..72aa40b794 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/resources-filtered/zeta.version.properties
@@ -15,19 +15,8 @@
# limitations under the License.
#
-seatunnel:
- engine:
- history-job-expire-minutes: 1
- backup-count: 2
- queue-type: blockingqueue
- print-execution-info-interval: 10
- slot-service:
- dynamic-slot: true
- checkpoint:
- interval: 300000
- timeout: 100000
- storage:
- type: localfile
- max-retained: 3
- plugin-config:
- namespace: /tmp/seatunnel/checkpoint_snapshot/
+project.version=${project.version}
+git.commit.id=${git.commit.id}
+git.commit.id.abbrev=${git.commit.id.abbrev}
+git.commit.time=${git.commit.time}
+git.build.time=${git.build.time}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/EnvironmentUtilTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/EnvironmentUtilTest.java
new file mode 100644
index 0000000000..0e2c57c2f1
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/EnvironmentUtilTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import org.apache.seatunnel.engine.common.env.EnvironmentUtil;
+import org.apache.seatunnel.engine.common.env.Version;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class EnvironmentUtilTest {
+
+ @Test
+ public void testGetVersion() {
+
+ Version version = EnvironmentUtil.getVersion();
+
+ assertNotNull(version.getProjectVersion());
+ assertNotNull(version.getGitCommitId());
+ assertNotNull(version.getGitCommitAbbrev());
+ assertNotNull(version.getBuildTime());
+ assertNotNull(version.getCommitTime());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 2caa6e6816..9d9b9d9a76 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -223,4 +223,16 @@ public abstract class AbstractResourceManager implements
ResourceManager {
.flatMap(workerProfile ->
Arrays.stream(workerProfile.getUnassignedSlots()))
.collect(Collectors.toList());
}
+
+ @Override
+ public List<SlotProfile> getAssignedSlots() {
+ return registerWorker.values().stream()
+ .flatMap(workerProfile ->
Arrays.stream(workerProfile.getAssignedSlots()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public int workerCount() {
+ return registerWorker.size();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index ca668482aa..8a04b21e4b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -60,4 +60,8 @@ public interface ResourceManager {
void close();
List<SlotProfile> getUnassignedSlots();
+
+ List<SlotProfile> getAssignedSlots();
+
+ int workerCount();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
new file mode 100644
index 0000000000..6bc0ef8906
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.OverviewInfo;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.map.IMap;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.Operation;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class GetOverviewOperation extends Operation implements
IdentifiedDataSerializable {
+
+ private OverviewInfo overviewInfo;
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+
+ overviewInfo = getOverviewInfo(server, getNodeEngine());
+ }
+
+ @Override
+ public Object getResponse() {
+ return overviewInfo;
+ }
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.REQUEST_SLOT_INFO_TYPE;
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+
+ public static OverviewInfo getOverviewInfo(SeaTunnelServer server,
NodeEngine nodeEngine) {
+ OverviewInfo overviewInfo = new OverviewInfo();
+ ResourceManager resourceManager =
server.getCoordinatorService().getResourceManager();
+
+ List<SlotProfile> assignedSlots = resourceManager.getAssignedSlots();
+
+ List<SlotProfile> unassignedSlots =
resourceManager.getUnassignedSlots();
+ IMap<Long, JobState> finishedJob =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE);
+ overviewInfo.setTotalSlot(assignedSlots.size() +
unassignedSlots.size());
+ overviewInfo.setUnassignedSlot(unassignedSlots.size());
+ overviewInfo.setRunningJobs(
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO).size());
+ overviewInfo.setFailedJobs(
+ finishedJob.values().stream()
+ .filter(
+ jobState ->
+ jobState.getJobStatus()
+ .name()
+
.equals(JobStatus.FAILED.toString()))
+ .count());
+ overviewInfo.setCancelledJobs(
+ finishedJob.values().stream()
+ .filter(
+ jobState ->
+ jobState.getJobStatus()
+ .name()
+
.equals(JobStatus.CANCELED.toString()))
+ .count());
+ overviewInfo.setWorkers(resourceManager.workerCount());
+ overviewInfo.setFinishedJobs(
+ finishedJob.values().stream()
+ .filter(
+ jobState ->
+ jobState.getJobStatus()
+ .name()
+
.equals(JobStatus.FINISHED.toString()))
+ .count());
+
+ return overviewInfo;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/OverviewInfo.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/OverviewInfo.java
new file mode 100644
index 0000000000..5047944323
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/OverviewInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.resource;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class OverviewInfo implements Serializable {
+ private String projectVersion;
+ private String gitCommitAbbrev;
+ private int totalSlot;
+ private int unassignedSlot;
+ private long runningJobs;
+ private long finishedJobs;
+ private long failedJobs;
+ private long cancelledJobs;
+ private int workers;
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 6daa817a48..7248773703 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -44,6 +44,8 @@ public class RestConstant {
public static final String ERROR_MSG = "errorMsg";
public static final String METRICS = "metrics";
+
+ public static final String OVERVIEW = "/hazelcast/rest/maps/overview";
public static final String RUNNING_JOBS_URL =
"/hazelcast/rest/maps/running-jobs";
@Deprecated public static final String RUNNING_JOB_URL =
"/hazelcast/rest/maps/running-job";
public static final String JOB_INFO_URL = "/hazelcast/rest/maps/job-info";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index b4110f46ff..0e89f9cfda 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.env.EnvironmentUtil;
+import org.apache.seatunnel.engine.common.env.Version;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
@@ -37,6 +39,8 @@ import
org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
import
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.OverviewInfo;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
@@ -53,6 +57,7 @@ import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.Arrays;
import java.util.Comparator;
@@ -64,6 +69,7 @@ import java.util.concurrent.ExecutionException;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.JOB_INFO_URL;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.OVERVIEW;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_THREADS;
@@ -71,12 +77,9 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITO
public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCommand> {
- private final Log4j2HttpGetCommandProcessor original;
-
private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
-
private static final String SINK_WRITE_COUNT = "SinkWriteCount";
-
+ private final Log4j2HttpGetCommandProcessor original;
private NodeEngine nodeEngine;
public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
@@ -106,6 +109,8 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
getSystemMonitoringInformation(httpGetCommand);
} else if (uri.startsWith(RUNNING_THREADS)) {
getRunningThread(httpGetCommand);
+ } else if (uri.startsWith(OVERVIEW)) {
+ overView(httpGetCommand);
} else {
original.handle(httpGetCommand);
}
@@ -124,6 +129,35 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
handle(httpGetCommand);
}
+ public void overView(HttpGetCommand command) {
+
+ Version version = EnvironmentUtil.getVersion();
+
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
+
+ OverviewInfo overviewInfo;
+
+ if (seaTunnelServer == null) {
+ overviewInfo =
+ (OverviewInfo)
+ NodeEngineUtil.sendOperationToMasterNode(
+ getNode().nodeEngine, new
GetOverviewOperation())
+ .join();
+ overviewInfo.setProjectVersion(version.getProjectVersion());
+ overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev());
+ } else {
+
+ NodeEngineImpl nodeEngine =
this.textCommandService.getNode().getNodeEngine();
+ overviewInfo =
GetOverviewOperation.getOverviewInfo(seaTunnelServer, nodeEngine);
+ overviewInfo.setProjectVersion(version.getProjectVersion());
+ overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev());
+ }
+
+ this.prepareResponse(
+ command,
+
JsonUtil.toJsonObject(JsonUtils.toMap(JsonUtils.toJsonString(overviewInfo))));
+ }
+
private void getSystemMonitoringInformation(HttpGetCommand command) {
Cluster cluster =
textCommandService.getNode().hazelcastInstance.getCluster();
nodeEngine =
textCommandService.getNode().hazelcastInstance.node.nodeEngine;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index d229e4cd90..09f9a4550c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.serializable;
import
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
@@ -50,6 +51,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
public static final int SYNC_SLOT_SERVICE_STATUS_TYPE = 8;
+ public static final int REQUEST_SLOT_INFO_TYPE = 9;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
@@ -86,6 +89,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
return new SlotAndWorkerProfile();
case SYNC_SLOT_SERVICE_STATUS_TYPE:
return new SyncWorkerProfileOperation();
+ case REQUEST_SLOT_INFO_TYPE:
+ return new GetOverviewOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}