This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new c2799e488 [Feature-3960] Add task distribution for streampark (#4040)
c2799e488 is described below

commit c2799e488c6e4ce115d75bf1a6ee30b82b927e31
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Sep 11 14:03:00 2024 +0800

    [Feature-3960] Add task distribution for streampark (#4040)
    
    * init hash
    
    * add test
    
    * spotless
    
    * refine docs
    
    * add TaskManager
    
    * rename
---
 .../application-mysql.yml                          |   2 +-
 .../streampark/common/utils/UUIDUtilsTestCase.java |   2 +-
 .../main/assembly/script/schema/mysql-schema.sql   |  13 +
 .../main/assembly/script/schema/pgsql-schema.sql   |  18 +
 .../base/config/AsyncExecutorPoolConfig.java       |  16 +
 .../console/base/util/ConsistentHash.java          |  85 ++++
 .../streampark/console/base/util/Murmur3Hash.java  | 552 +++++++++++++++++++++
 .../console/core/bean/FlinkTaskItem.java           |  46 ++
 .../console/core/entity/DistributedTask.java       |  45 ++
 .../console/core/enums/DistributedTaskEnum.java    |  59 +++
 .../console/core/mapper/DistributedTaskMapper.java |  25 +
 .../core/registry/ConsoleRegistryClient.java       |   5 +
 .../core/service/DistributedTaskService.java       |  80 +++
 .../impl/ApplicationActionServiceImpl.java         |  33 +-
 .../service/impl/DistributedTaskServiceImpl.java   | 239 +++++++++
 .../console/core/watcher/FlinkAppHttpWatcher.java  |  13 +-
 .../src/main/resources/db/schema-h2.sql            |  14 +
 .../mapper/core/DistributedTaskMapper.xml          |  22 +
 .../console/base/util/ConsistentHashTest.java      |  90 ++++
 .../core/service/DistributedTaskServiceTest.java   |  69 +++
 20 files changed, 1420 insertions(+), 8 deletions(-)

diff --git 
a/helm/streampark/conf/streampark-console-config/application-mysql.yml 
b/helm/streampark/conf/streampark-console-config/application-mysql.yml
index 7c91f7c8c..3625d2180 100755
--- a/helm/streampark/conf/streampark-console-config/application-mysql.yml
+++ b/helm/streampark/conf/streampark-console-config/application-mysql.yml
@@ -18,6 +18,6 @@
 spring:
   datasource:
     username: root
-    password: streampark
+    password: root
     driver-class-name: com.mysql.cj.jdbc.Driver
     url: 
jdbc:mysql://localhost:3306/streampark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
diff --git 
a/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
 
b/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
index b6f61addf..441013471 100644
--- 
a/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
+++ 
b/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
@@ -44,7 +44,7 @@ class UUIDUtilsTestCase {
     @Test
     @SneakyThrows
     void testNoDuplicateGenerateUUID() {
-        int threadNum = 100;
+        int threadNum = 10;
         int uuidNum = 10000;
 
         CountDownLatch countDownLatch = new CountDownLatch(threadNum);
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 5673d7778..e723e63fc 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -240,6 +240,19 @@ create table `t_flink_sql` (
 ) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
 
 
+-- ----------------------------
+-- Table structure for t_distributed_task
+-- ----------------------------
+drop table if exists `t_distributed_task`;
+create table `t_distributed_task` (
+  `id` bigint not null auto_increment,
+  `action` tinyint not null,
+  `engine_type` tinyint not null,
+  `properties` text collate utf8mb4_general_ci,
+  primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
+
+
 -- ----------------------------
 -- Table structure for t_menu
 -- ----------------------------
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index f65947a2c..c25619457 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -34,6 +34,7 @@ drop table if exists "public"."t_flink_effective";
 drop table if exists "public"."t_flink_config";
 drop table if exists "public"."t_flink_cluster";
 drop table if exists "public"."t_flink_app";
+drop table if exists "public"."t_distributed_task";
 drop table if exists "public"."t_app_build_pipe";
 drop table if exists "public"."t_app_backup";
 drop table if exists "public"."t_alert_config";
@@ -61,6 +62,7 @@ drop sequence if exists 
"public"."streampark_t_flink_effective_id_seq";
 drop sequence if exists "public"."streampark_t_flink_config_id_seq";
 drop sequence if exists "public"."streampark_t_flink_cluster_id_seq";
 drop sequence if exists "public"."streampark_t_flink_app_id_seq";
+drop sequence if exists "public"."streampark_t_distributed_task_id_seq";
 drop sequence if exists "public"."streampark_t_app_backup_id_seq";
 drop sequence if exists "public"."streampark_t_alert_config_id_seq";
 drop sequence if exists "public"."streampark_t_access_token_id_seq";
@@ -494,6 +496,22 @@ create table "public"."t_flink_sql" (
 alter table "public"."t_flink_sql" add constraint "t_flink_sql_pkey" primary 
key ("id");
 
 
+-- ----------------------------
+-- table structure for t_distributed_task
+-- ----------------------------
+create sequence "public"."streampark_t_distributed_task_id_seq"
+    increment 1 start 10000 cache 1 minvalue 10000 maxvalue 
9223372036854775807;
+create table "public"."t_distributed_task"
+(
+    "id"                           int8 not null default 
nextval('streampark_t_distributed_task_id_seq'::regclass),
+    "action"                       int2,
+    "engine_type"                  int2,
+    "properties"                   text collate "pg_catalog"."default"
+)
+;
+alter table "public"."t_distributed_task" add constraint 
"t_distributed_task_pkey" primary key ("id");
+
+
 -- ----------------------------
 -- table structure for t_menu
 -- ----------------------------
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
index 5d51b3df1..c79e47a8a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
@@ -182,4 +182,20 @@ public class AsyncExecutorPoolConfig extends 
AsyncConfigurerSupport {
             ThreadUtils.threadFactory("streampark-build-executor-"),
             new ThreadPoolExecutor.AbortPolicy());
     }
+
+    /**
+     * Create a ThreadPoolTaskExecutor for DistributedTask.
+     *
+     * @return Executor
+     */
+    @Bean("streamparkDistributedTaskExecutor")
+    public Executor distributedTaskExecutor() {
+        return new ThreadPoolExecutor(
+            Runtime.getRuntime().availableProcessors() * 5,
+            Runtime.getRuntime().availableProcessors() * 10,
+            60L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(1024),
+            ThreadUtils.threadFactory("streampark-distributed-task-"));
+    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ConsistentHash.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ConsistentHash.java
new file mode 100644
index 000000000..60a34358c
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ConsistentHash.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import java.util.Collection;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentHash<T> {
+
+    // the number of virtual nodes for each server
+    private final int numberOfReplicas = 2 << 16;
+
+    // the hash ring of servers
+    private final SortedMap<Long, T> circle = new TreeMap<>();
+
+    /**
+     * Initialize the ConsistentHash with a collection of servers.
+     * @param servers the collection of servers
+     */
+    public ConsistentHash(Collection<T> servers) {
+        servers.forEach(this::add);
+    }
+
+    /**
+     * Add the virtual nodes of the server to the hash ring.
+     * @param server the server to be added
+     */
+    public void add(T server) {
+        for (int i = 0; i < numberOfReplicas; i++) {
+            circle.put(Murmur3Hash.hash64(server.toString() + i), server);
+        }
+    }
+
+    /**
+     * Remove the virtual nodes of the server from the hash ring.
+     * @param server the server to be removed
+     */
+    public void remove(T server) {
+        for (int i = 0; i < numberOfReplicas; i++) {
+            circle.remove(Murmur3Hash.hash64(server.toString() + i));
+        }
+    }
+
+    /**
+     * Get the server that the key belongs to from the hash ring.
+     * @param key the key
+     * @return the specified server
+     */
+    public T get(Object key) {
+        if (circle.isEmpty()) {
+            return null;
+        }
+        long hash = Murmur3Hash.hash64(key.toString());
+        if (!circle.containsKey(hash)) {
+            SortedMap<Long, T> tailMap = circle.tailMap(hash);
+            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
+        }
+        return circle.get(hash);
+    }
+
+    /**
+     * Get the size of the hash ring.
+     * @return the size of the hash ring
+     */
+    public long getSize() {
+        return circle.size();
+    }
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/Murmur3Hash.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/Murmur3Hash.java
new file mode 100644
index 000000000..6cc185766
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/Murmur3Hash.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+/**
+ * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
+ *
+ * Murmur3 32 and 128 bit variants.
+ * 32-bit Java port of 
https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
+ * 128-bit Java port of 
https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255
+ *
+ * This is a public domain code with no copyrights.
+ * From homepage of MurmurHash (https://code.google.com/p/smhasher/),
+ * "All MurmurHash versions are public domain software, and the author 
disclaims all copyright
+ * to their code."
+ */
+public class Murmur3Hash {
+
+    // from 64-bit linear congruential generator
+    public static final long NULL_HASHCODE = 2862933555777941757L;
+
+    // Constants for 32 bit variant
+    private static final int C1_32 = 0xcc9e2d51;
+    private static final int C2_32 = 0x1b873593;
+    private static final int R1_32 = 15;
+    private static final int R2_32 = 13;
+    private static final int M_32 = 5;
+    private static final int N_32 = 0xe6546b64;
+
+    // Constants for 128 bit variant
+    private static final long C1 = 0x87c37b91114253d5L;
+    private static final long C2 = 0x4cf5ad432745937fL;
+    private static final int R1 = 31;
+    private static final int R2 = 27;
+    private static final int R3 = 33;
+    private static final int M = 5;
+    private static final int N1 = 0x52dce729;
+    private static final int N2 = 0x38495ab5;
+
+    public static final int DEFAULT_SEED = 104729;
+
+    public static int hash32(long l0, long l1) {
+        return hash32(l0, l1, DEFAULT_SEED);
+    }
+
+    public static int hash32(long l0) {
+        return hash32(l0, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     */
+    public static int hash32(long l0, int seed) {
+        int hash = seed;
+        final long r0 = Long.reverseBytes(l0);
+
+        hash = mix32((int) r0, hash);
+        hash = mix32((int) (r0 >>> 32), hash);
+
+        return fmix32(Long.BYTES, hash);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     */
+    public static int hash32(long l0, long l1, int seed) {
+        int hash = seed;
+        final long r0 = Long.reverseBytes(l0);
+        final long r1 = Long.reverseBytes(l1);
+
+        hash = mix32((int) r0, hash);
+        hash = mix32((int) (r0 >>> 32), hash);
+        hash = mix32((int) (r1), hash);
+        hash = mix32((int) (r1 >>> 32), hash);
+
+        return fmix32(Long.BYTES * 2, hash);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data - input byte array
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data) {
+        return hash32(data, 0, data.length, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data - input byte array
+     * @param length - length of array
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data, int length) {
+        return hash32(data, 0, length, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data   - input byte array
+     * @param length - length of array
+     * @param seed   - seed. (default 0)
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data, int length, int seed) {
+        return hash32(data, 0, length, seed);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data   - input byte array
+     * @param offset - offset of data
+     * @param length - length of array
+     * @param seed   - seed. (default 0)
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data, int offset, int length, int seed) {
+        int hash = seed;
+        final int nblocks = length >> 2;
+
+        // body
+        for (int i = 0; i < nblocks; i++) {
+            int i_4 = i << 2;
+            int k = (data[offset + i_4] & 0xff)
+                | ((data[offset + i_4 + 1] & 0xff) << 8)
+                | ((data[offset + i_4 + 2] & 0xff) << 16)
+                | ((data[offset + i_4 + 3] & 0xff) << 24);
+
+            hash = mix32(k, hash);
+        }
+
+        // tail
+        int idx = nblocks << 2;
+        int k1 = 0;
+        switch (length - idx) {
+            case 3:
+                k1 ^= data[offset + idx + 2] << 16;
+            case 2:
+                k1 ^= data[offset + idx + 1] << 8;
+            case 1:
+                k1 ^= data[offset + idx];
+
+                // mix functions
+                k1 *= C1_32;
+                k1 = Integer.rotateLeft(k1, R1_32);
+                k1 *= C2_32;
+                hash ^= k1;
+        }
+
+        return fmix32(length, hash);
+    }
+
+    private static int mix32(int k, int hash) {
+        k *= C1_32;
+        k = Integer.rotateLeft(k, R1_32);
+        k *= C2_32;
+        hash ^= k;
+        return Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+    }
+
+    private static int fmix32(int length, int hash) {
+        hash ^= length;
+        hash ^= (hash >>> 16);
+        hash *= 0x85ebca6b;
+        hash ^= (hash >>> 13);
+        hash *= 0xc2b2ae35;
+        hash ^= (hash >>> 16);
+
+        return hash;
+    }
+
+    public static long hash64(String data) {
+        return hash64(data.getBytes());
+    }
+
+    /**
+     * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 
128-bit variant.
+     *
+     * @param data - input byte array
+     * @return - hashcode
+     */
+    public static long hash64(byte[] data) {
+        return hash64(data, 0, data.length, DEFAULT_SEED);
+    }
+
+    public static long hash64(long data) {
+        long hash = DEFAULT_SEED;
+        long k = Long.reverseBytes(data);
+        int length = Long.BYTES;
+        // mix functions
+        k *= C1;
+        k = Long.rotateLeft(k, R1);
+        k *= C2;
+        hash ^= k;
+        hash = Long.rotateLeft(hash, R2) * M + N1;
+        // finalization
+        hash ^= length;
+        hash = fmix64(hash);
+        return hash;
+    }
+
+    public static long hash64(int data) {
+        long k1 = Integer.reverseBytes(data) & (-1L >>> 32);
+        int length = Integer.BYTES;
+        long hash = DEFAULT_SEED;
+        k1 *= C1;
+        k1 = Long.rotateLeft(k1, R1);
+        k1 *= C2;
+        hash ^= k1;
+        // finalization
+        hash ^= length;
+        hash = fmix64(hash);
+        return hash;
+    }
+
+    public static long hash64(short data) {
+        long hash = DEFAULT_SEED;
+        long k1 = 0;
+        k1 ^= ((long) data & 0xff) << 8;
+        k1 ^= ((long) ((data & 0xFF00) >> 8) & 0xff);
+        k1 *= C1;
+        k1 = Long.rotateLeft(k1, R1);
+        k1 *= C2;
+        hash ^= k1;
+
+        // finalization
+        hash ^= Short.BYTES;
+        hash = fmix64(hash);
+        return hash;
+    }
+
+    public static long hash64(byte[] data, int offset, int length) {
+        return hash64(data, offset, length, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 
128-bit variant.
+     *
+     * @param data   - input byte array
+     * @param length - length of array
+     * @param seed   - seed. (default is 0)
+     * @return - hashcode
+     */
+    public static long hash64(byte[] data, int offset, int length, int seed) {
+        long hash = seed;
+        final int nblocks = length >> 3;
+
+        // body
+        for (int i = 0; i < nblocks; i++) {
+            final int i8 = i << 3;
+            long k = ((long) data[offset + i8] & 0xff)
+                | (((long) data[offset + i8 + 1] & 0xff) << 8)
+                | (((long) data[offset + i8 + 2] & 0xff) << 16)
+                | (((long) data[offset + i8 + 3] & 0xff) << 24)
+                | (((long) data[offset + i8 + 4] & 0xff) << 32)
+                | (((long) data[offset + i8 + 5] & 0xff) << 40)
+                | (((long) data[offset + i8 + 6] & 0xff) << 48)
+                | (((long) data[offset + i8 + 7] & 0xff) << 56);
+
+            // mix functions
+            k *= C1;
+            k = Long.rotateLeft(k, R1);
+            k *= C2;
+            hash ^= k;
+            hash = Long.rotateLeft(hash, R2) * M + N1;
+        }
+
+        // tail
+        long k1 = 0;
+        int tailStart = nblocks << 3;
+        switch (length - tailStart) {
+            case 7:
+                k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
+            case 6:
+                k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
+            case 5:
+                k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
+            case 4:
+                k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
+            case 3:
+                k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
+            case 2:
+                k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
+            case 1:
+                k1 ^= ((long) data[offset + tailStart] & 0xff);
+                k1 *= C1;
+                k1 = Long.rotateLeft(k1, R1);
+                k1 *= C2;
+                hash ^= k1;
+        }
+
+        // finalization
+        hash ^= length;
+        hash = fmix64(hash);
+
+        return hash;
+    }
+
+    /**
+     * Murmur3 128-bit variant.
+     *
+     * @param data - input byte array
+     * @return - hashcode (2 longs)
+     */
+    public static long[] hash128(byte[] data) {
+        return hash128(data, 0, data.length, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 128-bit variant.
+     *
+     * @param data   - input byte array
+     * @param offset - the first element of array
+     * @param length - length of array
+     * @param seed   - seed. (default is 0)
+     * @return - hashcode (2 longs)
+     */
+    public static long[] hash128(byte[] data, int offset, int length, int 
seed) {
+        long h1 = seed;
+        long h2 = seed;
+        final int nblocks = length >> 4;
+
+        // body
+        for (int i = 0; i < nblocks; i++) {
+            final int i16 = i << 4;
+            long k1 = ((long) data[offset + i16] & 0xff)
+                | (((long) data[offset + i16 + 1] & 0xff) << 8)
+                | (((long) data[offset + i16 + 2] & 0xff) << 16)
+                | (((long) data[offset + i16 + 3] & 0xff) << 24)
+                | (((long) data[offset + i16 + 4] & 0xff) << 32)
+                | (((long) data[offset + i16 + 5] & 0xff) << 40)
+                | (((long) data[offset + i16 + 6] & 0xff) << 48)
+                | (((long) data[offset + i16 + 7] & 0xff) << 56);
+
+            long k2 = ((long) data[offset + i16 + 8] & 0xff)
+                | (((long) data[offset + i16 + 9] & 0xff) << 8)
+                | (((long) data[offset + i16 + 10] & 0xff) << 16)
+                | (((long) data[offset + i16 + 11] & 0xff) << 24)
+                | (((long) data[offset + i16 + 12] & 0xff) << 32)
+                | (((long) data[offset + i16 + 13] & 0xff) << 40)
+                | (((long) data[offset + i16 + 14] & 0xff) << 48)
+                | (((long) data[offset + i16 + 15] & 0xff) << 56);
+
+            // mix functions for k1
+            k1 *= C1;
+            k1 = Long.rotateLeft(k1, R1);
+            k1 *= C2;
+            h1 ^= k1;
+            h1 = Long.rotateLeft(h1, R2);
+            h1 += h2;
+            h1 = h1 * M + N1;
+
+            // mix functions for k2
+            k2 *= C2;
+            k2 = Long.rotateLeft(k2, R3);
+            k2 *= C1;
+            h2 ^= k2;
+            h2 = Long.rotateLeft(h2, R1);
+            h2 += h1;
+            h2 = h2 * M + N2;
+        }
+
+        // tail
+        long k1 = 0;
+        long k2 = 0;
+        int tailStart = nblocks << 4;
+        switch (length - tailStart) {
+            case 15:
+                k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
+            case 14:
+                k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
+            case 13:
+                k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
+            case 12:
+                k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
+            case 11:
+                k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
+            case 10:
+                k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
+            case 9:
+                k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
+                k2 *= C2;
+                k2 = Long.rotateLeft(k2, R3);
+                k2 *= C1;
+                h2 ^= k2;
+
+            case 8:
+                k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
+            case 7:
+                k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
+            case 6:
+                k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
+            case 5:
+                k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
+            case 4:
+                k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
+            case 3:
+                k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
+            case 2:
+                k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
+            case 1:
+                k1 ^= (long) (data[offset + tailStart] & 0xff);
+                k1 *= C1;
+                k1 = Long.rotateLeft(k1, R1);
+                k1 *= C2;
+                h1 ^= k1;
+        }
+
+        // finalization
+        h1 ^= length;
+        h2 ^= length;
+
+        h1 += h2;
+        h2 += h1;
+
+        h1 = fmix64(h1);
+        h2 = fmix64(h2);
+
+        h1 += h2;
+        h2 += h1;
+
+        return new long[]{h1, h2};
+    }
+
+    private static long fmix64(long h) {
+        h ^= (h >>> 33);
+        h *= 0xff51afd7ed558ccdL;
+        h ^= (h >>> 33);
+        h *= 0xc4ceb9fe1a85ec53L;
+        h ^= (h >>> 33);
+        return h;
+    }
+
+    public static class IncrementalHash32 {
+
+        byte[] tail = new byte[3];
+        int tailLen;
+        int totalLen;
+        int hash;
+
+        public final void start(int hash) {
+            tailLen = totalLen = 0;
+            this.hash = hash;
+        }
+
+        public final void add(byte[] data, int offset, int length) {
+            if (length == 0)
+                return;
+            totalLen += length;
+            if (tailLen + length < 4) {
+                System.arraycopy(data, offset, tail, tailLen, length);
+                tailLen += length;
+                return;
+            }
+            int offset2 = 0;
+            if (tailLen > 0) {
+                offset2 = (4 - tailLen);
+                int k = -1;
+                switch (tailLen) {
+                    case 1:
+                        k = orBytes(tail[0], data[offset], data[offset + 1], 
data[offset + 2]);
+                        break;
+                    case 2:
+                        k = orBytes(tail[0], tail[1], data[offset], 
data[offset + 1]);
+                        break;
+                    case 3:
+                        k = orBytes(tail[0], tail[1], tail[2], data[offset]);
+                        break;
+                    default:
+                        throw new AssertionError(tailLen);
+                }
+                // mix functions
+                k *= C1_32;
+                k = Integer.rotateLeft(k, R1_32);
+                k *= C2_32;
+                hash ^= k;
+                hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+            }
+            int length2 = length - offset2;
+            offset += offset2;
+            final int nblocks = length2 >> 2;
+
+            for (int i = 0; i < nblocks; i++) {
+                int i_4 = (i << 2) + offset;
+                int k = orBytes(data[i_4], data[i_4 + 1], data[i_4 + 2], 
data[i_4 + 3]);
+
+                // mix functions
+                k *= C1_32;
+                k = Integer.rotateLeft(k, R1_32);
+                k *= C2_32;
+                hash ^= k;
+                hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+            }
+
+            int consumed = (nblocks << 2);
+            tailLen = length2 - consumed;
+            if (consumed == length2)
+                return;
+            System.arraycopy(data, offset + consumed, tail, 0, tailLen);
+        }
+
+        public final int end() {
+            int k1 = 0;
+            switch (tailLen) {
+                case 3:
+                    k1 ^= tail[2] << 16;
+                case 2:
+                    k1 ^= tail[1] << 8;
+                case 1:
+                    k1 ^= tail[0];
+
+                    // mix functions
+                    k1 *= C1_32;
+                    k1 = Integer.rotateLeft(k1, R1_32);
+                    k1 *= C2_32;
+                    hash ^= k1;
+            }
+
+            // finalization
+            hash ^= totalLen;
+            hash ^= (hash >>> 16);
+            hash *= 0x85ebca6b;
+            hash ^= (hash >>> 13);
+            hash *= 0xc2b2ae35;
+            hash ^= (hash >>> 16);
+            return hash;
+        }
+    }
+
+    private static int orBytes(byte b1, byte b2, byte b3, byte b4) {
+        return (b1 & 0xff) | ((b2 & 0xff) << 8) | ((b3 & 0xff) << 16) | ((b4 & 
0xff) << 24);
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkTaskItem.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkTaskItem.java
new file mode 100644
index 000000000..c99cebc72
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkTaskItem.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class FlinkTaskItem implements Serializable {
+
+    /** appId */
+    private Long appId;
+
+    private Boolean autoStart;
+
+    private String args;
+
+    private String dynamicProperties;
+
+    /** running job */
+    private String savepointPath;
+
+    private Boolean restoreOrTriggerSavepoint = false;
+
+    private Boolean drain = false;
+
+    private Boolean nativeFormat = false;
+
+    private Integer restoreMode;
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/DistributedTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/DistributedTask.java
new file mode 100644
index 000000000..01ac7b9cc
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/DistributedTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+
+@Data
+@TableName("t_distributed_task")
+@Slf4j
+public class DistributedTask implements Serializable {
+
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    private DistributedTaskEnum action;
+
+    private EngineTypeEnum engineType;
+
+    private String properties;
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
new file mode 100644
index 000000000..e27a9ce95
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.enums;
+
+/**
+ * The DistributedTaskEnum represents the possible actions that can be 
performed on a task.
+ */
+public enum DistributedTaskEnum {
+
+    /**
+     * Starts the specified application.
+     */
+    START(0),
+
+    /**
+     * Restarts the given application.
+     */
+    RESTART(1),
+
+    /**
+     * Revokes access for the given application.
+     */
+    REVOKE(2),
+
+    /**
+     * Cancels the given application. Throws an exception if cancellation 
fails.
+     */
+    CANCEL(3),
+
+    /**
+     * Forces the given application to stop.
+     */
+    ABORT(4);
+
+    private final int value;
+
+    DistributedTaskEnum(int value) {
+        this.value = value;
+    }
+
+    public int get() {
+        return this.value;
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DistributedTaskMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DistributedTaskMapper.java
new file mode 100644
index 000000000..a9318e93f
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DistributedTaskMapper.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.DistributedTask;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface DistributedTaskMapper extends BaseMapper<DistributedTask> {
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
index 65fa27527..d04b35104 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.IStoppable;
 import org.apache.streampark.common.utils.JSONUtils;
 import org.apache.streampark.common.utils.NetworkUtils;
 import org.apache.streampark.console.core.config.ConsoleConfig;
+import org.apache.streampark.console.core.service.DistributedTaskService;
 import org.apache.streampark.console.core.task.ConsoleHeartBeatTask;
 import org.apache.streampark.registry.api.RegistryClient;
 import org.apache.streampark.registry.api.RegistryException;
@@ -54,6 +55,9 @@ public class ConsoleRegistryClient implements AutoCloseable {
     @Autowired
     private ConsoleConnectStrategy consoleConnectStrategy;
 
+    @Autowired
+    private DistributedTaskService distributedTaskService;
+
     private ConsoleHeartBeatTask consoleHeartBeatTask;
 
     public void start() {
@@ -138,6 +142,7 @@ public class ConsoleRegistryClient implements AutoCloseable 
{
         ThreadUtils.sleep(SLEEP_TIME_MILLIS);
 
         consoleHeartBeatTask.start();
+        distributedTaskService.init(consoleConfig.getConsoleAddress());
         log.info("Console node : {} registered to registry center 
successfully", consoleConfig.getConsoleAddress());
 
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
new file mode 100644
index 000000000..7b29ef08c
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.DistributedTask;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.List;
+
+/**
+ * DistributedTaskService is the interface for managing tasks.
+ */
+public interface DistributedTaskService extends IService<DistributedTask> {
+
+    /**
+     * Add the current console server itself to the consistent hash ring.
+     * @param serverName String
+     */
+    void init(String serverName);
+
+    /**
+     * This interface is responsible for polling the database to retrieve task 
records and execute the corresponding operations.
+     * @param DistributedTask DistributedTask
+     */
+    void executeDistributedTask(DistributedTask DistributedTask) throws 
Exception;
+
+    /**
+     * Through this interface, the watcher obtains the list of tasks that need 
to be monitored.
+     * @param applications List<Application>
+     * @return List<Application> List of tasks that need to be monitored
+     */
+    List<Application> getMonitoredTaskList(List<Application> applications);
+
+    /**
+     * This interface handles task redistribution when server nodes are added.
+     * @param server String
+     */
+    void addServerRedistribute(String server);
+
+    /**
+     * This interface handles task redistribution when server nodes are 
removed.
+     * @param server String
+     */
+    void removeServerRedistribute(String server);
+
+    /**
+     * Determine whether the task is processed locally.
+     *
+     * @param appId Long
+     * @return boolean
+     */
+    public boolean isLocalProcessing(Long appId);
+
+    /**
+     * Save Distributed Task.
+     *
+     * @param appParam  Application
+     * @param autoStart boolean
+     * @param action It may be one of the following values: START, RESTART, 
REVOKE, CANCEL, ABORT
+     */
+    public void saveDistributedTask(Application appParam, boolean autoStart, 
DistributedTaskEnum action);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 110dae8b9..ef768f45f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.streampark.console.core.entity.Resource;
 import org.apache.streampark.console.core.entity.Savepoint;
 import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
 import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
 import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
 import org.apache.streampark.console.core.enums.OperationEnum;
 import org.apache.streampark.console.core.enums.OptionStateEnum;
@@ -57,6 +58,7 @@ import 
org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationBackUpService;
 import org.apache.streampark.console.core.service.ApplicationConfigService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
+import org.apache.streampark.console.core.service.DistributedTaskService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
@@ -175,6 +177,9 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     @Autowired
     private ResourceService resourceService;
 
+    @Autowired
+    private DistributedTaskService distributedTaskService;
+
     @Autowired
     private FlinkClusterWatcher flinkClusterWatcher;
 
@@ -191,6 +196,12 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
         ApiAlertException.throwIfNull(
             application, String.format("The application id=%s not found, 
revoke failed.", appId));
 
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appId)) {
+            distributedTaskService.saveDistributedTask(application, false, 
DistributedTaskEnum.REVOKE);
+            return;
+        }
+
         // 1) delete files that have been published to workspace
         application.getFsOperator().delete(application.getAppHome());
 
@@ -213,15 +224,25 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     @Override
     public void restart(Application appParam) throws Exception {
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+            distributedTaskService.saveDistributedTask(appParam, false, 
DistributedTaskEnum.RESTART);
+            return;
+        }
         this.cancel(appParam);
         this.start(appParam, false);
     }
 
     @Override
     public void abort(Long id) {
+        Application application = this.baseMapper.selectApp(id);
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(id)) {
+            distributedTaskService.saveDistributedTask(application, false, 
DistributedTaskEnum.ABORT);
+            return;
+        }
         CompletableFuture<SubmitResponse> startFuture = 
startFutureMap.remove(id);
         CompletableFuture<CancelResponse> cancelFuture = 
cancelFutureMap.remove(id);
-        Application application = this.baseMapper.selectApp(id);
         if (application.isKubernetesModeJob()) {
             KubernetesDeploymentHelper.watchPodTerminatedLog(
                 application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
@@ -239,6 +260,11 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     @Override
     public void cancel(Application appParam) throws Exception {
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+            distributedTaskService.saveDistributedTask(appParam, false, 
DistributedTaskEnum.CANCEL);
+            return;
+        }
         FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionStateEnum.CANCELLING);
         Application application = getById(appParam.getId());
         application.setState(FlinkAppStateEnum.CANCELLING.getValue());
@@ -373,6 +399,11 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     @Override
     public void start(Application appParam, boolean auto) throws Exception {
+        // For HA purposes, if the task is not processed locally, save the 
Distribution task and return
+        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+            distributedTaskService.saveDistributedTask(appParam, auto, 
DistributedTaskEnum.START);
+            return;
+        }
         // 1) check application
         final Application application = getById(appParam.getId());
         AssertUtils.notNull(application);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
new file mode 100644
index 000000000..10aa929ec
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.base.util.ConsistentHash;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.DistributedTask;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
+import org.apache.streampark.console.core.mapper.DistributedTaskMapper;
+import org.apache.streampark.console.core.service.DistributedTaskService;
+import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
+public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMapper, DistributedTask>
+    implements
+        DistributedTaskService {
+
+    /**
+     * Server name
+     */
+    private String serverId;
+
+    /**
+     * Consistent hash algorithm for task distribution
+     */
+    private final ConsistentHash<String> consistentHash = new 
ConsistentHash<>(Collections.emptyList());
+
+    @Qualifier("streamparkDistributedTaskExecutor")
+    @Autowired
+    private Executor taskExecutor;
+
+    @Autowired
+    private ApplicationActionService applicationActionService;
+
+    /**
+     * Task execution status
+     */
+    private final ConcurrentHashMap<Long, Boolean> runningTasks = new 
ConcurrentHashMap<>();
+
+    /**
+     * Add the current console server itself to the consistent hash ring.
+     */
+    public void init(String serverName) {
+        this.serverId = serverName;
+        consistentHash.add(serverName);
+    }
+
+    @Scheduled(fixedDelay = 50)
+    public void pollDistributedTask() {
+        List<DistributedTask> distributedTaskList = this.list();
+        for (DistributedTask DistributedTask : distributedTaskList) {
+            long taskId = DistributedTask.getId();
+            if (DistributedTask.getEngineType() != EngineTypeEnum.FLINK || 
!isLocalProcessing(taskId)) {
+                continue;
+            }
+            if (runningTasks.putIfAbsent(taskId, true) == null) {
+                taskExecutor.execute(() -> {
+                    try {
+                        // Execute Distributed task
+                        executeDistributedTask(DistributedTask);
+                    } catch (Exception e) {
+                        log.error(e.getMessage(), e);
+                    } finally {
+                        runningTasks.remove(DistributedTask.getId());
+                        this.removeById(DistributedTask.getId());
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * This interface is responsible for polling the database to retrieve task 
records and execute the corresponding operations.
+     * @param DistributedTask DistributedTask
+     */
+    @Override
+    public void executeDistributedTask(DistributedTask DistributedTask) throws 
Exception {
+        // Execute Distributed task
+        log.info("Execute Distributed task: {}", DistributedTask);
+        FlinkTaskItem flinkTaskItem = getFlinkTaskItem(DistributedTask);
+        Application appParam = getAppByFlinkTaskItem(flinkTaskItem);
+        switch (DistributedTask.getAction()) {
+            case START:
+                applicationActionService.start(appParam, 
flinkTaskItem.getAutoStart());
+                break;
+            case RESTART:
+                applicationActionService.restart(appParam);
+                break;
+            case REVOKE:
+                applicationActionService.revoke(appParam.getId());
+                break;
+            case CANCEL:
+                applicationActionService.cancel(appParam);
+                break;
+            case ABORT:
+                applicationActionService.abort(appParam.getId());
+                break;
+            default:
+                log.error("Unsupported task: {}", DistributedTask.getAction());
+        }
+    }
+
+    /**
+     * Through this interface, the watcher obtains the list of tasks that need 
to be monitored.
+     * @param applications List<Application>
+     * @return List<Application> List of tasks that need to be monitored
+     */
+    @Override
+    public List<Application> getMonitoredTaskList(List<Application> 
applications) {
+        return applications.stream()
+            .filter(application -> isLocalProcessing(application.getId()))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * This interface handles task redistribution when server nodes are added.
+     *
+     * @param server String
+     */
+    @Override
+    public void addServerRedistribute(String server) {
+
+    }
+
+    /**
+     * This interface handles task redistribution when server nodes are 
removed.
+     *
+     * @param server String
+     */
+    @Override
+    public void removeServerRedistribute(String server) {
+
+    }
+
+    /**
+     * Determine whether the task is processed locally.
+     *
+     * @param appId Long
+     * @return boolean
+     */
+    @Override
+    public boolean isLocalProcessing(Long appId) {
+        return consistentHash.get(appId).equals(serverId);
+    }
+
+    /**
+     * Save Distributed task.
+     *
+     * @param appParam  Application
+     * @param autoStart boolean
+     * @param action It may be one of the following values: START, RESTART, 
REVOKE, CANCEL, ABORT
+     */
+    @Override
+    public void saveDistributedTask(Application appParam, boolean autoStart, 
DistributedTaskEnum action) {
+        try {
+            DistributedTask DistributedTask = 
getDistributedTaskByApp(appParam, autoStart, action);
+            this.save(DistributedTask);
+        } catch (JsonProcessingException e) {
+            log.error("Failed to save Distributed task: {}", e.getMessage());
+        }
+    }
+
+    public DistributedTask getDistributedTaskByApp(Application appParam, 
boolean autoStart,
+                                                   DistributedTaskEnum action) 
throws JsonProcessingException {
+        FlinkTaskItem flinkTaskItem = new FlinkTaskItem();
+        flinkTaskItem.setAppId(appParam.getId());
+        flinkTaskItem.setAutoStart(autoStart);
+        flinkTaskItem.setArgs(appParam.getArgs());
+        flinkTaskItem.setDynamicProperties(appParam.getDynamicProperties());
+        flinkTaskItem.setSavepointPath(appParam.getSavepointPath());
+        
flinkTaskItem.setRestoreOrTriggerSavepoint(appParam.getRestoreOrTriggerSavepoint());
+        flinkTaskItem.setDrain(appParam.getDrain());
+        flinkTaskItem.setNativeFormat(appParam.getNativeFormat());
+        flinkTaskItem.setRestoreMode(appParam.getRestoreMode());
+
+        DistributedTask distributedTask = new DistributedTask();
+        distributedTask.setAction(action);
+        distributedTask.setEngineType(EngineTypeEnum.FLINK);
+        distributedTask.setProperties(JacksonUtils.write(flinkTaskItem));
+        return distributedTask;
+    }
+
+    public FlinkTaskItem getFlinkTaskItem(DistributedTask DistributedTask) 
throws JsonProcessingException {
+        return JacksonUtils.read(DistributedTask.getProperties(), 
FlinkTaskItem.class);
+    }
+
+    public Application getAppByFlinkTaskItem(FlinkTaskItem flinkTaskItem) {
+        Application appParam = new Application();
+        appParam.setId(flinkTaskItem.getAppId());
+        appParam.setArgs(flinkTaskItem.getArgs());
+        appParam.setDynamicProperties(flinkTaskItem.getDynamicProperties());
+        appParam.setSavepointPath(flinkTaskItem.getSavepointPath());
+        
appParam.setRestoreOrTriggerSavepoint(flinkTaskItem.getRestoreOrTriggerSavepoint());
+        appParam.setDrain(flinkTaskItem.getDrain());
+        appParam.setNativeFormat(flinkTaskItem.getNativeFormat());
+        appParam.setRestoreMode(flinkTaskItem.getRestoreMode());
+        return appParam;
+    }
+
+    public long getConsistentHashSize() {
+        return consistentHash.getSize();
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 8436cc1c5..b15c9ab31 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -35,6 +35,7 @@ import 
org.apache.streampark.console.core.metrics.flink.CheckPoints;
 import org.apache.streampark.console.core.metrics.flink.JobsOverview;
 import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.DistributedTaskService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.SavepointService;
 import org.apache.streampark.console.core.service.alert.AlertService;
@@ -98,6 +99,9 @@ public class FlinkAppHttpWatcher {
     @Autowired
     private SavepointService savepointService;
 
+    @Autowired
+    private DistributedTaskService distributedTaskService;
+
     // track interval every 5 seconds
     public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
 
@@ -176,11 +180,10 @@ public class FlinkAppHttpWatcher {
     @PostConstruct
     public void init() {
         WATCHING_APPS.clear();
-        List<Application> applications =
-            applicationManageService.list(
-                new LambdaQueryWrapper<Application>()
-                    .eq(Application::getTracking, 1)
-                    .notIn(Application::getExecutionMode, 
FlinkExecutionMode.getKubernetesMode()));
+        List<Application> applications = 
distributedTaskService.getMonitoredTaskList(applicationManageService.list(
+            new LambdaQueryWrapper<Application>()
+                .eq(Application::getTracking, 1)
+                .notIn(Application::getExecutionMode, 
FlinkExecutionMode.getKubernetesMode())));
         applications.forEach(
             (app) -> {
                 WATCHING_APPS.put(app.getId(), app);
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 3a163c637..00c847d72 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -217,6 +217,20 @@ create table if not exists `t_flink_sql` (
   primary key(`id`)
 );
 
+
+-- ----------------------------
+-- Table structure for t_distributed_task
+-- ----------------------------
+create table if not exists `t_distributed_task`
+(
+    `id`                           bigint generated by default as identity not 
null,
+    `action`                       tinyint not null,
+    `engine_type`                  tinyint not null,
+    `properties`                   text,
+    primary key (`id`)
+);
+
+
 -- ----------------------------
 -- Table of t_resource
 -- ----------------------------
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/DistributedTaskMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/DistributedTaskMapper.xml
new file mode 100644
index 000000000..9359de503
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/DistributedTaskMapper.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="org.apache.streampark.console.core.mapper.DistributedTaskMapper">
+</mapper>
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ConsistentHashTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ConsistentHashTest.java
new file mode 100644
index 000000000..c2a5913dc
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ConsistentHashTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ConsistentHashTest {
+
+    Long startTime;
+
+    List<String> servers;
+
+    ConsistentHash<String> hash;
+
+    Integer jobNum = 300000;
+
+    List<Integer> jobIds = new ArrayList<>();
+
+    @BeforeEach
+    public void init() {
+        startTime = System.currentTimeMillis();
+        servers = new ArrayList<>(Arrays.asList("Server-A", "Server-B", 
"Server-C"));
+        hash = new ConsistentHash<>(servers);
+        for (int i = 0; i < jobNum; i++) {
+            jobIds.add(i);
+        }
+    }
+
+    @Test
+    public void initWatch() {
+        Map<String, Long> counts = jobIds.stream()
+            .map(hash::get)
+            .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+        counts.forEach((k, v) -> {
+            log.info("node:{}, initWatch count:{}", k, v);
+        });
+        log.info("time: {}ms", System.currentTimeMillis() - startTime);
+    }
+
+    @Test
+    public void addServer() {
+        servers.add("Server-D");
+        hash.add("Server-D");
+        Map<String, Long> counts = jobIds.stream()
+            .map(hash::get)
+            .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+        counts.forEach((k, v) -> {
+            log.info("node:{}, addServer count:{}", k, v);
+        });
+        log.info("time: {}ms", System.currentTimeMillis() - startTime);
+    }
+
+    @Test
+    public void removeServer() {
+        servers.remove("Server-C");
+        hash.remove("Server-C");
+        Map<String, Long> counts = jobIds.stream()
+            .map(hash::get)
+            .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+        counts.forEach((k, v) -> {
+            log.info("node:{}, removeServer count:{}", k, v);
+        });
+        log.info("time: {}ms", System.currentTimeMillis() - startTime);
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
new file mode 100644
index 000000000..3c7d47837
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.DistributedTask;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+import 
org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl;
+
+import com.fasterxml.jackson.core.JacksonException;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+class DistributedTaskServiceTest {
+
+    private final DistributedTaskServiceImpl distributionTaskService = new 
DistributedTaskServiceImpl();
+
+    private final String serverName = "testServer";
+
+    // the number of virtual nodes for each server
+    private final int numberOfReplicas = 2 << 16;
+
+    @Test
+    void testInit() {
+        distributionTaskService.init(serverName);
+        assert (distributionTaskService.getConsistentHashSize() == 
numberOfReplicas);
+    }
+
+    @Test
+    void testIsLocalProcessing() {
+        distributionTaskService.init(serverName);
+        for (long i = 0; i < numberOfReplicas; i++) {
+            assert (distributionTaskService.isLocalProcessing(i));
+        }
+    }
+
+    @Test
+    void testGetTaskAndApp() {
+        Application application = new Application();
+        application.setId(0L);
+        try {
+            DistributedTask DistributedTask =
+                distributionTaskService.getDistributedTaskByApp(application, 
false, DistributedTaskEnum.START);
+            FlinkTaskItem flinkTaskItem = 
distributionTaskService.getFlinkTaskItem(DistributedTask);
+            Application newApplication = 
distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem);
+            assert (application.equals(newApplication));
+        } catch (JacksonException e) {
+            log.error("testGetTaskAndApp failed:", e);
+        }
+    }
+
+}

Reply via email to