This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new c2799e488 [Feature-3960] Add task distribution for streampark (#4040)
c2799e488 is described below
commit c2799e488c6e4ce115d75bf1a6ee30b82b927e31
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Sep 11 14:03:00 2024 +0800
[Feature-3960] Add task distribution for streampark (#4040)
* init hash
* add test
* spotless
* refine docs
* add TaskManager
* rename
---
.../application-mysql.yml | 2 +-
.../streampark/common/utils/UUIDUtilsTestCase.java | 2 +-
.../main/assembly/script/schema/mysql-schema.sql | 13 +
.../main/assembly/script/schema/pgsql-schema.sql | 18 +
.../base/config/AsyncExecutorPoolConfig.java | 16 +
.../console/base/util/ConsistentHash.java | 85 ++++
.../streampark/console/base/util/Murmur3Hash.java | 552 +++++++++++++++++++++
.../console/core/bean/FlinkTaskItem.java | 46 ++
.../console/core/entity/DistributedTask.java | 45 ++
.../console/core/enums/DistributedTaskEnum.java | 59 +++
.../console/core/mapper/DistributedTaskMapper.java | 25 +
.../core/registry/ConsoleRegistryClient.java | 5 +
.../core/service/DistributedTaskService.java | 80 +++
.../impl/ApplicationActionServiceImpl.java | 33 +-
.../service/impl/DistributedTaskServiceImpl.java | 239 +++++++++
.../console/core/watcher/FlinkAppHttpWatcher.java | 13 +-
.../src/main/resources/db/schema-h2.sql | 14 +
.../mapper/core/DistributedTaskMapper.xml | 22 +
.../console/base/util/ConsistentHashTest.java | 90 ++++
.../core/service/DistributedTaskServiceTest.java | 69 +++
20 files changed, 1420 insertions(+), 8 deletions(-)
diff --git
a/helm/streampark/conf/streampark-console-config/application-mysql.yml
b/helm/streampark/conf/streampark-console-config/application-mysql.yml
index 7c91f7c8c..3625d2180 100755
--- a/helm/streampark/conf/streampark-console-config/application-mysql.yml
+++ b/helm/streampark/conf/streampark-console-config/application-mysql.yml
@@ -18,6 +18,6 @@
spring:
datasource:
username: root
- password: streampark
+ password: root
driver-class-name: com.mysql.cj.jdbc.Driver
url:
jdbc:mysql://localhost:3306/streampark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
diff --git
a/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
b/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
index b6f61addf..441013471 100644
---
a/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
+++
b/streampark-common/src/test/java/org/apache/streampark/common/utils/UUIDUtilsTestCase.java
@@ -44,7 +44,7 @@ class UUIDUtilsTestCase {
@Test
@SneakyThrows
void testNoDuplicateGenerateUUID() {
- int threadNum = 100;
+ int threadNum = 10;
int uuidNum = 10000;
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 5673d7778..e723e63fc 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -240,6 +240,19 @@ create table `t_flink_sql` (
) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+-- ----------------------------
+-- Table structure for t_distributed_task
+-- ----------------------------
+drop table if exists `t_distributed_task`;
+create table `t_distributed_task` (
+ `id` bigint not null auto_increment,
+ `action` tinyint not null,
+ `engine_type` tinyint not null,
+ `properties` text collate utf8mb4_general_ci,
+ primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+
+
-- ----------------------------
-- Table structure for t_menu
-- ----------------------------
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index f65947a2c..c25619457 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -34,6 +34,7 @@ drop table if exists "public"."t_flink_effective";
drop table if exists "public"."t_flink_config";
drop table if exists "public"."t_flink_cluster";
drop table if exists "public"."t_flink_app";
+drop table if exists "public"."t_distributed_task";
drop table if exists "public"."t_app_build_pipe";
drop table if exists "public"."t_app_backup";
drop table if exists "public"."t_alert_config";
@@ -61,6 +62,7 @@ drop sequence if exists
"public"."streampark_t_flink_effective_id_seq";
drop sequence if exists "public"."streampark_t_flink_config_id_seq";
drop sequence if exists "public"."streampark_t_flink_cluster_id_seq";
drop sequence if exists "public"."streampark_t_flink_app_id_seq";
+drop sequence if exists "public"."streampark_t_distributed_task_id_seq";
drop sequence if exists "public"."streampark_t_app_backup_id_seq";
drop sequence if exists "public"."streampark_t_alert_config_id_seq";
drop sequence if exists "public"."streampark_t_access_token_id_seq";
@@ -494,6 +496,22 @@ create table "public"."t_flink_sql" (
alter table "public"."t_flink_sql" add constraint "t_flink_sql_pkey" primary
key ("id");
+-- ----------------------------
+-- table structure for t_distributed_task
+-- ----------------------------
+create sequence "public"."streampark_t_distributed_task_id_seq"
+ increment 1 start 10000 cache 1 minvalue 10000 maxvalue
9223372036854775807;
+create table "public"."t_distributed_task"
+(
+ "id" int8 not null default
nextval('streampark_t_distributed_task_id_seq'::regclass),
+ "action" int2,
+ "engine_type" int2,
+ "properties" text collate "pg_catalog"."default"
+)
+;
+alter table "public"."t_distributed_task" add constraint
"t_distributed_task_pkey" primary key ("id");
+
+
-- ----------------------------
-- table structure for t_menu
-- ----------------------------
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
index 5d51b3df1..c79e47a8a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
@@ -182,4 +182,20 @@ public class AsyncExecutorPoolConfig extends
AsyncConfigurerSupport {
ThreadUtils.threadFactory("streampark-build-executor-"),
new ThreadPoolExecutor.AbortPolicy());
}
+
+ /**
+ * Create a ThreadPoolTaskExecutor for DistributedTask.
+ *
+ * @return Executor
+ */
+ @Bean("streamparkDistributedTaskExecutor")
+ public Executor distributedTaskExecutor() {
+ return new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors() * 5,
+ Runtime.getRuntime().availableProcessors() * 10,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(1024),
+ ThreadUtils.threadFactory("streampark-distributed-task-"));
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ConsistentHash.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ConsistentHash.java
new file mode 100644
index 000000000..60a34358c
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ConsistentHash.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import java.util.Collection;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentHash<T> {
+
+ // the number of virtual nodes for each server
+ private final int numberOfReplicas = 2 << 16;
+
+ // the hash ring of servers
+ private final SortedMap<Long, T> circle = new TreeMap<>();
+
+ /**
+ * Initialize the ConsistentHash with a collection of servers.
+ * @param servers the collection of servers
+ */
+ public ConsistentHash(Collection<T> servers) {
+ servers.forEach(this::add);
+ }
+
+ /**
+ * Add the virtual nodes of the server to the hash ring.
+ * @param server the server to be added
+ */
+ public void add(T server) {
+ for (int i = 0; i < numberOfReplicas; i++) {
+ circle.put(Murmur3Hash.hash64(server.toString() + i), server);
+ }
+ }
+
+ /**
+ * Remove the virtual nodes of the server from the hash ring.
+ * @param server the server to be removed
+ */
+ public void remove(T server) {
+ for (int i = 0; i < numberOfReplicas; i++) {
+ circle.remove(Murmur3Hash.hash64(server.toString() + i));
+ }
+ }
+
+ /**
+ * Get the server that the key belongs to from the hash ring.
+ * @param key the key
+ * @return the specified server
+ */
+ public T get(Object key) {
+ if (circle.isEmpty()) {
+ return null;
+ }
+ long hash = Murmur3Hash.hash64(key.toString());
+ if (!circle.containsKey(hash)) {
+ SortedMap<Long, T> tailMap = circle.tailMap(hash);
+ hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
+ }
+ return circle.get(hash);
+ }
+
+ /**
+ * Get the size of the hash ring.
+ * @return the size of the hash ring
+ */
+ public long getSize() {
+ return circle.size();
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/Murmur3Hash.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/Murmur3Hash.java
new file mode 100644
index 000000000..6cc185766
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/Murmur3Hash.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+/**
+ * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
+ *
+ * Murmur3 32 and 128 bit variants.
+ * 32-bit Java port of
https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
+ * 128-bit Java port of
https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255
+ *
+ * This is a public domain code with no copyrights.
+ * From homepage of MurmurHash (https://code.google.com/p/smhasher/),
+ * "All MurmurHash versions are public domain software, and the author
disclaims all copyright
+ * to their code."
+ */
+public class Murmur3Hash {
+
+ // from 64-bit linear congruential generator
+ public static final long NULL_HASHCODE = 2862933555777941757L;
+
+ // Constants for 32 bit variant
+ private static final int C1_32 = 0xcc9e2d51;
+ private static final int C2_32 = 0x1b873593;
+ private static final int R1_32 = 15;
+ private static final int R2_32 = 13;
+ private static final int M_32 = 5;
+ private static final int N_32 = 0xe6546b64;
+
+ // Constants for 128 bit variant
+ private static final long C1 = 0x87c37b91114253d5L;
+ private static final long C2 = 0x4cf5ad432745937fL;
+ private static final int R1 = 31;
+ private static final int R2 = 27;
+ private static final int R3 = 33;
+ private static final int M = 5;
+ private static final int N1 = 0x52dce729;
+ private static final int N2 = 0x38495ab5;
+
+ public static final int DEFAULT_SEED = 104729;
+
+ public static int hash32(long l0, long l1) {
+ return hash32(l0, l1, DEFAULT_SEED);
+ }
+
+ public static int hash32(long l0) {
+ return hash32(l0, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 32-bit variant.
+ */
+ public static int hash32(long l0, int seed) {
+ int hash = seed;
+ final long r0 = Long.reverseBytes(l0);
+
+ hash = mix32((int) r0, hash);
+ hash = mix32((int) (r0 >>> 32), hash);
+
+ return fmix32(Long.BYTES, hash);
+ }
+
+ /**
+ * Murmur3 32-bit variant.
+ */
+ public static int hash32(long l0, long l1, int seed) {
+ int hash = seed;
+ final long r0 = Long.reverseBytes(l0);
+ final long r1 = Long.reverseBytes(l1);
+
+ hash = mix32((int) r0, hash);
+ hash = mix32((int) (r0 >>> 32), hash);
+ hash = mix32((int) (r1), hash);
+ hash = mix32((int) (r1 >>> 32), hash);
+
+ return fmix32(Long.BYTES * 2, hash);
+ }
+
+ /**
+ * Murmur3 32-bit variant.
+ *
+ * @param data - input byte array
+ * @return - hashcode
+ */
+ public static int hash32(byte[] data) {
+ return hash32(data, 0, data.length, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 32-bit variant.
+ *
+ * @param data - input byte array
+ * @param length - length of array
+ * @return - hashcode
+ */
+ public static int hash32(byte[] data, int length) {
+ return hash32(data, 0, length, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 32-bit variant.
+ *
+ * @param data - input byte array
+ * @param length - length of array
+ * @param seed - seed. (default 0)
+ * @return - hashcode
+ */
+ public static int hash32(byte[] data, int length, int seed) {
+ return hash32(data, 0, length, seed);
+ }
+
+ /**
+ * Murmur3 32-bit variant.
+ *
+ * @param data - input byte array
+ * @param offset - offset of data
+ * @param length - length of array
+ * @param seed - seed. (default 0)
+ * @return - hashcode
+ */
+ public static int hash32(byte[] data, int offset, int length, int seed) {
+ int hash = seed;
+ final int nblocks = length >> 2;
+
+ // body
+ for (int i = 0; i < nblocks; i++) {
+ int i_4 = i << 2;
+ int k = (data[offset + i_4] & 0xff)
+ | ((data[offset + i_4 + 1] & 0xff) << 8)
+ | ((data[offset + i_4 + 2] & 0xff) << 16)
+ | ((data[offset + i_4 + 3] & 0xff) << 24);
+
+ hash = mix32(k, hash);
+ }
+
+ // tail
+ int idx = nblocks << 2;
+ int k1 = 0;
+ switch (length - idx) {
+ case 3:
+ k1 ^= data[offset + idx + 2] << 16;
+ case 2:
+ k1 ^= data[offset + idx + 1] << 8;
+ case 1:
+ k1 ^= data[offset + idx];
+
+ // mix functions
+ k1 *= C1_32;
+ k1 = Integer.rotateLeft(k1, R1_32);
+ k1 *= C2_32;
+ hash ^= k1;
+ }
+
+ return fmix32(length, hash);
+ }
+
+ private static int mix32(int k, int hash) {
+ k *= C1_32;
+ k = Integer.rotateLeft(k, R1_32);
+ k *= C2_32;
+ hash ^= k;
+ return Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+ }
+
+ private static int fmix32(int length, int hash) {
+ hash ^= length;
+ hash ^= (hash >>> 16);
+ hash *= 0x85ebca6b;
+ hash ^= (hash >>> 13);
+ hash *= 0xc2b2ae35;
+ hash ^= (hash >>> 16);
+
+ return hash;
+ }
+
+ public static long hash64(String data) {
+ return hash64(data.getBytes());
+ }
+
+ /**
+ * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3
128-bit variant.
+ *
+ * @param data - input byte array
+ * @return - hashcode
+ */
+ public static long hash64(byte[] data) {
+ return hash64(data, 0, data.length, DEFAULT_SEED);
+ }
+
+ public static long hash64(long data) {
+ long hash = DEFAULT_SEED;
+ long k = Long.reverseBytes(data);
+ int length = Long.BYTES;
+ // mix functions
+ k *= C1;
+ k = Long.rotateLeft(k, R1);
+ k *= C2;
+ hash ^= k;
+ hash = Long.rotateLeft(hash, R2) * M + N1;
+ // finalization
+ hash ^= length;
+ hash = fmix64(hash);
+ return hash;
+ }
+
+ public static long hash64(int data) {
+ long k1 = Integer.reverseBytes(data) & (-1L >>> 32);
+ int length = Integer.BYTES;
+ long hash = DEFAULT_SEED;
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ hash ^= k1;
+ // finalization
+ hash ^= length;
+ hash = fmix64(hash);
+ return hash;
+ }
+
+ public static long hash64(short data) {
+ long hash = DEFAULT_SEED;
+ long k1 = 0;
+ k1 ^= ((long) data & 0xff) << 8;
+ k1 ^= ((long) ((data & 0xFF00) >> 8) & 0xff);
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ hash ^= k1;
+
+ // finalization
+ hash ^= Short.BYTES;
+ hash = fmix64(hash);
+ return hash;
+ }
+
+ public static long hash64(byte[] data, int offset, int length) {
+ return hash64(data, offset, length, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3
128-bit variant.
+ *
+ * @param data - input byte array
+ * @param length - length of array
+ * @param seed - seed. (default is 0)
+ * @return - hashcode
+ */
+ public static long hash64(byte[] data, int offset, int length, int seed) {
+ long hash = seed;
+ final int nblocks = length >> 3;
+
+ // body
+ for (int i = 0; i < nblocks; i++) {
+ final int i8 = i << 3;
+ long k = ((long) data[offset + i8] & 0xff)
+ | (((long) data[offset + i8 + 1] & 0xff) << 8)
+ | (((long) data[offset + i8 + 2] & 0xff) << 16)
+ | (((long) data[offset + i8 + 3] & 0xff) << 24)
+ | (((long) data[offset + i8 + 4] & 0xff) << 32)
+ | (((long) data[offset + i8 + 5] & 0xff) << 40)
+ | (((long) data[offset + i8 + 6] & 0xff) << 48)
+ | (((long) data[offset + i8 + 7] & 0xff) << 56);
+
+ // mix functions
+ k *= C1;
+ k = Long.rotateLeft(k, R1);
+ k *= C2;
+ hash ^= k;
+ hash = Long.rotateLeft(hash, R2) * M + N1;
+ }
+
+ // tail
+ long k1 = 0;
+ int tailStart = nblocks << 3;
+ switch (length - tailStart) {
+ case 7:
+ k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
+ case 6:
+ k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
+ case 5:
+ k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
+ case 4:
+ k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
+ case 3:
+ k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
+ case 2:
+ k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
+ case 1:
+ k1 ^= ((long) data[offset + tailStart] & 0xff);
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ hash ^= k1;
+ }
+
+ // finalization
+ hash ^= length;
+ hash = fmix64(hash);
+
+ return hash;
+ }
+
+ /**
+ * Murmur3 128-bit variant.
+ *
+ * @param data - input byte array
+ * @return - hashcode (2 longs)
+ */
+ public static long[] hash128(byte[] data) {
+ return hash128(data, 0, data.length, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 128-bit variant.
+ *
+ * @param data - input byte array
+ * @param offset - the first element of array
+ * @param length - length of array
+ * @param seed - seed. (default is 0)
+ * @return - hashcode (2 longs)
+ */
+ public static long[] hash128(byte[] data, int offset, int length, int
seed) {
+ long h1 = seed;
+ long h2 = seed;
+ final int nblocks = length >> 4;
+
+ // body
+ for (int i = 0; i < nblocks; i++) {
+ final int i16 = i << 4;
+ long k1 = ((long) data[offset + i16] & 0xff)
+ | (((long) data[offset + i16 + 1] & 0xff) << 8)
+ | (((long) data[offset + i16 + 2] & 0xff) << 16)
+ | (((long) data[offset + i16 + 3] & 0xff) << 24)
+ | (((long) data[offset + i16 + 4] & 0xff) << 32)
+ | (((long) data[offset + i16 + 5] & 0xff) << 40)
+ | (((long) data[offset + i16 + 6] & 0xff) << 48)
+ | (((long) data[offset + i16 + 7] & 0xff) << 56);
+
+ long k2 = ((long) data[offset + i16 + 8] & 0xff)
+ | (((long) data[offset + i16 + 9] & 0xff) << 8)
+ | (((long) data[offset + i16 + 10] & 0xff) << 16)
+ | (((long) data[offset + i16 + 11] & 0xff) << 24)
+ | (((long) data[offset + i16 + 12] & 0xff) << 32)
+ | (((long) data[offset + i16 + 13] & 0xff) << 40)
+ | (((long) data[offset + i16 + 14] & 0xff) << 48)
+ | (((long) data[offset + i16 + 15] & 0xff) << 56);
+
+ // mix functions for k1
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ h1 ^= k1;
+ h1 = Long.rotateLeft(h1, R2);
+ h1 += h2;
+ h1 = h1 * M + N1;
+
+ // mix functions for k2
+ k2 *= C2;
+ k2 = Long.rotateLeft(k2, R3);
+ k2 *= C1;
+ h2 ^= k2;
+ h2 = Long.rotateLeft(h2, R1);
+ h2 += h1;
+ h2 = h2 * M + N2;
+ }
+
+ // tail
+ long k1 = 0;
+ long k2 = 0;
+ int tailStart = nblocks << 4;
+ switch (length - tailStart) {
+ case 15:
+ k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
+ case 14:
+ k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
+ case 13:
+ k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
+ case 12:
+ k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
+ case 11:
+ k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
+ case 10:
+ k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
+ case 9:
+ k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
+ k2 *= C2;
+ k2 = Long.rotateLeft(k2, R3);
+ k2 *= C1;
+ h2 ^= k2;
+
+ case 8:
+ k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
+ case 7:
+ k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
+ case 6:
+ k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
+ case 5:
+ k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
+ case 4:
+ k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
+ case 3:
+ k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
+ case 2:
+ k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
+ case 1:
+ k1 ^= (long) (data[offset + tailStart] & 0xff);
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ h1 ^= k1;
+ }
+
+ // finalization
+ h1 ^= length;
+ h2 ^= length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = fmix64(h1);
+ h2 = fmix64(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ return new long[]{h1, h2};
+ }
+
+ private static long fmix64(long h) {
+ h ^= (h >>> 33);
+ h *= 0xff51afd7ed558ccdL;
+ h ^= (h >>> 33);
+ h *= 0xc4ceb9fe1a85ec53L;
+ h ^= (h >>> 33);
+ return h;
+ }
+
+ public static class IncrementalHash32 {
+
+ byte[] tail = new byte[3];
+ int tailLen;
+ int totalLen;
+ int hash;
+
+ public final void start(int hash) {
+ tailLen = totalLen = 0;
+ this.hash = hash;
+ }
+
+ public final void add(byte[] data, int offset, int length) {
+ if (length == 0)
+ return;
+ totalLen += length;
+ if (tailLen + length < 4) {
+ System.arraycopy(data, offset, tail, tailLen, length);
+ tailLen += length;
+ return;
+ }
+ int offset2 = 0;
+ if (tailLen > 0) {
+ offset2 = (4 - tailLen);
+ int k = -1;
+ switch (tailLen) {
+ case 1:
+ k = orBytes(tail[0], data[offset], data[offset + 1],
data[offset + 2]);
+ break;
+ case 2:
+ k = orBytes(tail[0], tail[1], data[offset],
data[offset + 1]);
+ break;
+ case 3:
+ k = orBytes(tail[0], tail[1], tail[2], data[offset]);
+ break;
+ default:
+ throw new AssertionError(tailLen);
+ }
+ // mix functions
+ k *= C1_32;
+ k = Integer.rotateLeft(k, R1_32);
+ k *= C2_32;
+ hash ^= k;
+ hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+ }
+ int length2 = length - offset2;
+ offset += offset2;
+ final int nblocks = length2 >> 2;
+
+ for (int i = 0; i < nblocks; i++) {
+ int i_4 = (i << 2) + offset;
+ int k = orBytes(data[i_4], data[i_4 + 1], data[i_4 + 2],
data[i_4 + 3]);
+
+ // mix functions
+ k *= C1_32;
+ k = Integer.rotateLeft(k, R1_32);
+ k *= C2_32;
+ hash ^= k;
+ hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+ }
+
+ int consumed = (nblocks << 2);
+ tailLen = length2 - consumed;
+ if (consumed == length2)
+ return;
+ System.arraycopy(data, offset + consumed, tail, 0, tailLen);
+ }
+
+ public final int end() {
+ int k1 = 0;
+ switch (tailLen) {
+ case 3:
+ k1 ^= tail[2] << 16;
+ case 2:
+ k1 ^= tail[1] << 8;
+ case 1:
+ k1 ^= tail[0];
+
+ // mix functions
+ k1 *= C1_32;
+ k1 = Integer.rotateLeft(k1, R1_32);
+ k1 *= C2_32;
+ hash ^= k1;
+ }
+
+ // finalization
+ hash ^= totalLen;
+ hash ^= (hash >>> 16);
+ hash *= 0x85ebca6b;
+ hash ^= (hash >>> 13);
+ hash *= 0xc2b2ae35;
+ hash ^= (hash >>> 16);
+ return hash;
+ }
+ }
+
+ private static int orBytes(byte b1, byte b2, byte b3, byte b4) {
+ return (b1 & 0xff) | ((b2 & 0xff) << 8) | ((b3 & 0xff) << 16) | ((b4 &
0xff) << 24);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkTaskItem.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkTaskItem.java
new file mode 100644
index 000000000..c99cebc72
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkTaskItem.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class FlinkTaskItem implements Serializable {
+
+ /** appId */
+ private Long appId;
+
+ private Boolean autoStart;
+
+ private String args;
+
+ private String dynamicProperties;
+
+ /** running job */
+ private String savepointPath;
+
+ private Boolean restoreOrTriggerSavepoint = false;
+
+ private Boolean drain = false;
+
+ private Boolean nativeFormat = false;
+
+ private Integer restoreMode;
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/DistributedTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/DistributedTask.java
new file mode 100644
index 000000000..01ac7b9cc
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/DistributedTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+
+@Data
+@TableName("t_distributed_task")
+@Slf4j
+public class DistributedTask implements Serializable {
+
+ @TableId(type = IdType.AUTO)
+ private Long id;
+
+ private DistributedTaskEnum action;
+
+ private EngineTypeEnum engineType;
+
+ private String properties;
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
new file mode 100644
index 000000000..e27a9ce95
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.enums;
+
+/**
+ * The DistributedTaskEnum represents the possible actions that can be
performed on a task.
+ */
+public enum DistributedTaskEnum {
+
+ /**
+ * Starts the specified application.
+ */
+ START(0),
+
+ /**
+ * Restarts the given application.
+ */
+ RESTART(1),
+
+ /**
+ * Revokes access for the given application.
+ */
+ REVOKE(2),
+
+ /**
+ * Cancels the given application. Throws an exception if cancellation
fails.
+ */
+ CANCEL(3),
+
+ /**
+ * Forces the given application to stop.
+ */
+ ABORT(4);
+
+ private final int value;
+
+ DistributedTaskEnum(int value) {
+ this.value = value;
+ }
+
+ public int get() {
+ return this.value;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DistributedTaskMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DistributedTaskMapper.java
new file mode 100644
index 000000000..a9318e93f
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DistributedTaskMapper.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.DistributedTask;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface DistributedTaskMapper extends BaseMapper<DistributedTask> {
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
index 65fa27527..d04b35104 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.IStoppable;
import org.apache.streampark.common.utils.JSONUtils;
import org.apache.streampark.common.utils.NetworkUtils;
import org.apache.streampark.console.core.config.ConsoleConfig;
+import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.task.ConsoleHeartBeatTask;
import org.apache.streampark.registry.api.RegistryClient;
import org.apache.streampark.registry.api.RegistryException;
@@ -54,6 +55,9 @@ public class ConsoleRegistryClient implements AutoCloseable {
@Autowired
private ConsoleConnectStrategy consoleConnectStrategy;
+ @Autowired
+ private DistributedTaskService distributedTaskService;
+
private ConsoleHeartBeatTask consoleHeartBeatTask;
public void start() {
@@ -138,6 +142,7 @@ public class ConsoleRegistryClient implements AutoCloseable
{
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
consoleHeartBeatTask.start();
+ distributedTaskService.init(consoleConfig.getConsoleAddress());
log.info("Console node : {} registered to registry center
successfully", consoleConfig.getConsoleAddress());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
new file mode 100644
index 000000000..7b29ef08c
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.DistributedTask;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.List;
+
+/**
+ * DistributedTaskService is the interface for managing tasks.
+ */
+public interface DistributedTaskService extends IService<DistributedTask> {
+
+ /**
+ * Add the current console server itself to the consistent hash ring.
+ * @param serverName String
+ */
+ void init(String serverName);
+
+ /**
+ * This interface is responsible for polling the database to retrieve task
records and execute the corresponding operations.
+ * @param DistributedTask DistributedTask
+ */
+ void executeDistributedTask(DistributedTask DistributedTask) throws
Exception;
+
+ /**
+ * Through this interface, the watcher obtains the list of tasks that need
to be monitored.
+ * @param applications List<Application>
+ * @return List<Application> List of tasks that need to be monitored
+ */
+ List<Application> getMonitoredTaskList(List<Application> applications);
+
+ /**
+ * This interface handles task redistribution when server nodes are added.
+ * @param server String
+ */
+ void addServerRedistribute(String server);
+
+ /**
+ * This interface handles task redistribution when server nodes are
removed.
+ * @param server String
+ */
+ void removeServerRedistribute(String server);
+
+ /**
+ * Determine whether the task is processed locally.
+ *
+ * @param appId Long
+ * @return boolean
+ */
+ public boolean isLocalProcessing(Long appId);
+
+ /**
+ * Save Distributed Task.
+ *
+ * @param appParam Application
+ * @param autoStart boolean
+ * @param action It may be one of the following values: START, RESTART,
REVOKE, CANCEL, ABORT
+ */
+ public void saveDistributedTask(Application appParam, boolean autoStart,
DistributedTaskEnum action);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 110dae8b9..ef768f45f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.OperationEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
@@ -57,6 +58,7 @@ import
org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
+import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
@@ -175,6 +177,9 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Autowired
private ResourceService resourceService;
+ @Autowired
+ private DistributedTaskService distributedTaskService;
+
@Autowired
private FlinkClusterWatcher flinkClusterWatcher;
@@ -191,6 +196,12 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
ApiAlertException.throwIfNull(
application, String.format("The application id=%s not found,
revoke failed.", appId));
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appId)) {
+ distributedTaskService.saveDistributedTask(application, false,
DistributedTaskEnum.REVOKE);
+ return;
+ }
+
// 1) delete files that have been published to workspace
application.getFsOperator().delete(application.getAppHome());
@@ -213,15 +224,25 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Override
public void restart(Application appParam) throws Exception {
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+ distributedTaskService.saveDistributedTask(appParam, false,
DistributedTaskEnum.RESTART);
+ return;
+ }
this.cancel(appParam);
this.start(appParam, false);
}
@Override
public void abort(Long id) {
+ Application application = this.baseMapper.selectApp(id);
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(id)) {
+ distributedTaskService.saveDistributedTask(application, false,
DistributedTaskEnum.ABORT);
+ return;
+ }
CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(id);
CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(id);
- Application application = this.baseMapper.selectApp(id);
if (application.isKubernetesModeJob()) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
@@ -239,6 +260,11 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Override
public void cancel(Application appParam) throws Exception {
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+ distributedTaskService.saveDistributedTask(appParam, false,
DistributedTaskEnum.CANCEL);
+ return;
+ }
FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionStateEnum.CANCELLING);
Application application = getById(appParam.getId());
application.setState(FlinkAppStateEnum.CANCELLING.getValue());
@@ -373,6 +399,11 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Override
public void start(Application appParam, boolean auto) throws Exception {
+ // For HA purposes, if the task is not processed locally, save the
Distribution task and return
+ if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
+ distributedTaskService.saveDistributedTask(appParam, auto,
DistributedTaskEnum.START);
+ return;
+ }
// 1) check application
final Application application = getById(appParam.getId());
AssertUtils.notNull(application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
new file mode 100644
index 000000000..10aa929ec
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.base.util.ConsistentHash;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.DistributedTask;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
+import org.apache.streampark.console.core.mapper.DistributedTaskMapper;
+import org.apache.streampark.console.core.service.DistributedTaskService;
+import
org.apache.streampark.console.core.service.application.ApplicationActionService;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
+public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMapper, DistributedTask>
+ implements
+ DistributedTaskService {
+
+ /**
+ * Server name
+ */
+ private String serverId;
+
+ /**
+ * Consistent hash algorithm for task distribution
+ */
+ private final ConsistentHash<String> consistentHash = new
ConsistentHash<>(Collections.emptyList());
+
+ @Qualifier("streamparkDistributedTaskExecutor")
+ @Autowired
+ private Executor taskExecutor;
+
+ @Autowired
+ private ApplicationActionService applicationActionService;
+
+ /**
+ * Task execution status
+ */
+ private final ConcurrentHashMap<Long, Boolean> runningTasks = new
ConcurrentHashMap<>();
+
+ /**
+ * Add the current console server itself to the consistent hash ring.
+ */
+ public void init(String serverName) {
+ this.serverId = serverName;
+ consistentHash.add(serverName);
+ }
+
+ @Scheduled(fixedDelay = 50)
+ public void pollDistributedTask() {
+ List<DistributedTask> distributedTaskList = this.list();
+ for (DistributedTask DistributedTask : distributedTaskList) {
+ long taskId = DistributedTask.getId();
+ if (DistributedTask.getEngineType() != EngineTypeEnum.FLINK ||
!isLocalProcessing(taskId)) {
+ continue;
+ }
+ if (runningTasks.putIfAbsent(taskId, true) == null) {
+ taskExecutor.execute(() -> {
+ try {
+ // Execute Distributed task
+ executeDistributedTask(DistributedTask);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ runningTasks.remove(DistributedTask.getId());
+ this.removeById(DistributedTask.getId());
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * This interface is responsible for polling the database to retrieve task
records and execute the corresponding operations.
+ * @param DistributedTask DistributedTask
+ */
+ @Override
+ public void executeDistributedTask(DistributedTask DistributedTask) throws
Exception {
+ // Execute Distributed task
+ log.info("Execute Distributed task: {}", DistributedTask);
+ FlinkTaskItem flinkTaskItem = getFlinkTaskItem(DistributedTask);
+ Application appParam = getAppByFlinkTaskItem(flinkTaskItem);
+ switch (DistributedTask.getAction()) {
+ case START:
+ applicationActionService.start(appParam,
flinkTaskItem.getAutoStart());
+ break;
+ case RESTART:
+ applicationActionService.restart(appParam);
+ break;
+ case REVOKE:
+ applicationActionService.revoke(appParam.getId());
+ break;
+ case CANCEL:
+ applicationActionService.cancel(appParam);
+ break;
+ case ABORT:
+ applicationActionService.abort(appParam.getId());
+ break;
+ default:
+ log.error("Unsupported task: {}", DistributedTask.getAction());
+ }
+ }
+
+ /**
+ * Through this interface, the watcher obtains the list of tasks that need
to be monitored.
+ * @param applications List<Application>
+ * @return List<Application> List of tasks that need to be monitored
+ */
+ @Override
+ public List<Application> getMonitoredTaskList(List<Application>
applications) {
+ return applications.stream()
+ .filter(application -> isLocalProcessing(application.getId()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * This interface handles task redistribution when server nodes are added.
+ *
+ * @param server String
+ */
+ @Override
+ public void addServerRedistribute(String server) {
+
+ }
+
+ /**
+ * This interface handles task redistribution when server nodes are
removed.
+ *
+ * @param server String
+ */
+ @Override
+ public void removeServerRedistribute(String server) {
+
+ }
+
+ /**
+ * Determine whether the task is processed locally.
+ *
+ * @param appId Long
+ * @return boolean
+ */
+ @Override
+ public boolean isLocalProcessing(Long appId) {
+ return consistentHash.get(appId).equals(serverId);
+ }
+
+ /**
+ * Save Distributed task.
+ *
+ * @param appParam Application
+ * @param autoStart boolean
+ * @param action It may be one of the following values: START, RESTART,
REVOKE, CANCEL, ABORT
+ */
+ @Override
+ public void saveDistributedTask(Application appParam, boolean autoStart,
DistributedTaskEnum action) {
+ try {
+ DistributedTask DistributedTask =
getDistributedTaskByApp(appParam, autoStart, action);
+ this.save(DistributedTask);
+ } catch (JsonProcessingException e) {
+ log.error("Failed to save Distributed task: {}", e.getMessage());
+ }
+ }
+
+ public DistributedTask getDistributedTaskByApp(Application appParam,
boolean autoStart,
+ DistributedTaskEnum action)
throws JsonProcessingException {
+ FlinkTaskItem flinkTaskItem = new FlinkTaskItem();
+ flinkTaskItem.setAppId(appParam.getId());
+ flinkTaskItem.setAutoStart(autoStart);
+ flinkTaskItem.setArgs(appParam.getArgs());
+ flinkTaskItem.setDynamicProperties(appParam.getDynamicProperties());
+ flinkTaskItem.setSavepointPath(appParam.getSavepointPath());
+
flinkTaskItem.setRestoreOrTriggerSavepoint(appParam.getRestoreOrTriggerSavepoint());
+ flinkTaskItem.setDrain(appParam.getDrain());
+ flinkTaskItem.setNativeFormat(appParam.getNativeFormat());
+ flinkTaskItem.setRestoreMode(appParam.getRestoreMode());
+
+ DistributedTask distributedTask = new DistributedTask();
+ distributedTask.setAction(action);
+ distributedTask.setEngineType(EngineTypeEnum.FLINK);
+ distributedTask.setProperties(JacksonUtils.write(flinkTaskItem));
+ return distributedTask;
+ }
+
+ public FlinkTaskItem getFlinkTaskItem(DistributedTask DistributedTask)
throws JsonProcessingException {
+ return JacksonUtils.read(DistributedTask.getProperties(),
FlinkTaskItem.class);
+ }
+
+ public Application getAppByFlinkTaskItem(FlinkTaskItem flinkTaskItem) {
+ Application appParam = new Application();
+ appParam.setId(flinkTaskItem.getAppId());
+ appParam.setArgs(flinkTaskItem.getArgs());
+ appParam.setDynamicProperties(flinkTaskItem.getDynamicProperties());
+ appParam.setSavepointPath(flinkTaskItem.getSavepointPath());
+
appParam.setRestoreOrTriggerSavepoint(flinkTaskItem.getRestoreOrTriggerSavepoint());
+ appParam.setDrain(flinkTaskItem.getDrain());
+ appParam.setNativeFormat(flinkTaskItem.getNativeFormat());
+ appParam.setRestoreMode(flinkTaskItem.getRestoreMode());
+ return appParam;
+ }
+
+ public long getConsistentHashSize() {
+ return consistentHash.getSize();
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 8436cc1c5..b15c9ab31 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -35,6 +35,7 @@ import
org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.alert.AlertService;
@@ -98,6 +99,9 @@ public class FlinkAppHttpWatcher {
@Autowired
private SavepointService savepointService;
+ @Autowired
+ private DistributedTaskService distributedTaskService;
+
// track interval every 5 seconds
public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
@@ -176,11 +180,10 @@ public class FlinkAppHttpWatcher {
@PostConstruct
public void init() {
WATCHING_APPS.clear();
- List<Application> applications =
- applicationManageService.list(
- new LambdaQueryWrapper<Application>()
- .eq(Application::getTracking, 1)
- .notIn(Application::getExecutionMode,
FlinkExecutionMode.getKubernetesMode()));
+ List<Application> applications =
distributedTaskService.getMonitoredTaskList(applicationManageService.list(
+ new LambdaQueryWrapper<Application>()
+ .eq(Application::getTracking, 1)
+ .notIn(Application::getExecutionMode,
FlinkExecutionMode.getKubernetesMode())));
applications.forEach(
(app) -> {
WATCHING_APPS.put(app.getId(), app);
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 3a163c637..00c847d72 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -217,6 +217,20 @@ create table if not exists `t_flink_sql` (
primary key(`id`)
);
+
+-- ----------------------------
+-- Table structure for t_distributed_task
+-- ----------------------------
+create table if not exists `t_distributed_task`
+(
+ `id` bigint generated by default as identity not
null,
+ `action` tinyint not null,
+ `engine_type` tinyint not null,
+ `properties` text,
+ primary key (`id`)
+);
+
+
-- ----------------------------
-- Table of t_resource
-- ----------------------------
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/DistributedTaskMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/DistributedTaskMapper.xml
new file mode 100644
index 000000000..9359de503
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/DistributedTaskMapper.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!DOCTYPE mapper
+ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper
namespace="org.apache.streampark.console.core.mapper.DistributedTaskMapper">
+</mapper>
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ConsistentHashTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ConsistentHashTest.java
new file mode 100644
index 000000000..c2a5913dc
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ConsistentHashTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ConsistentHashTest {
+
+ Long startTime;
+
+ List<String> servers;
+
+ ConsistentHash<String> hash;
+
+ Integer jobNum = 300000;
+
+ List<Integer> jobIds = new ArrayList<>();
+
+ @BeforeEach
+ public void init() {
+ startTime = System.currentTimeMillis();
+ servers = new ArrayList<>(Arrays.asList("Server-A", "Server-B",
"Server-C"));
+ hash = new ConsistentHash<>(servers);
+ for (int i = 0; i < jobNum; i++) {
+ jobIds.add(i);
+ }
+ }
+
+ @Test
+ public void initWatch() {
+ Map<String, Long> counts = jobIds.stream()
+ .map(hash::get)
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+ counts.forEach((k, v) -> {
+ log.info("node:{}, initWatch count:{}", k, v);
+ });
+ log.info("time: {}ms", System.currentTimeMillis() - startTime);
+ }
+
+ @Test
+ public void addServer() {
+ servers.add("Server-D");
+ hash.add("Server-D");
+ Map<String, Long> counts = jobIds.stream()
+ .map(hash::get)
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+ counts.forEach((k, v) -> {
+ log.info("node:{}, addServer count:{}", k, v);
+ });
+ log.info("time: {}ms", System.currentTimeMillis() - startTime);
+ }
+
+ @Test
+ public void removeServer() {
+ servers.remove("Server-C");
+ hash.remove("Server-C");
+ Map<String, Long> counts = jobIds.stream()
+ .map(hash::get)
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+ counts.forEach((k, v) -> {
+ log.info("node:{}, removeServer count:{}", k, v);
+ });
+ log.info("time: {}ms", System.currentTimeMillis() - startTime);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
new file mode 100644
index 000000000..3c7d47837
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.bean.FlinkTaskItem;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.DistributedTask;
+import org.apache.streampark.console.core.enums.DistributedTaskEnum;
+import
org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl;
+
+import com.fasterxml.jackson.core.JacksonException;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+class DistributedTaskServiceTest {
+
+ private final DistributedTaskServiceImpl distributionTaskService = new
DistributedTaskServiceImpl();
+
+ private final String serverName = "testServer";
+
+ // the number of virtual nodes for each server
+ private final int numberOfReplicas = 2 << 16;
+
+ @Test
+ void testInit() {
+ distributionTaskService.init(serverName);
+ assert (distributionTaskService.getConsistentHashSize() ==
numberOfReplicas);
+ }
+
+ @Test
+ void testIsLocalProcessing() {
+ distributionTaskService.init(serverName);
+ for (long i = 0; i < numberOfReplicas; i++) {
+ assert (distributionTaskService.isLocalProcessing(i));
+ }
+ }
+
+ @Test
+ void testGetTaskAndApp() {
+ Application application = new Application();
+ application.setId(0L);
+ try {
+ DistributedTask DistributedTask =
+ distributionTaskService.getDistributedTaskByApp(application,
false, DistributedTaskEnum.START);
+ FlinkTaskItem flinkTaskItem =
distributionTaskService.getFlinkTaskItem(DistributedTask);
+ Application newApplication =
distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem);
+ assert (application.equals(newApplication));
+ } catch (JacksonException e) {
+ log.error("testGetTaskAndApp failed:", e);
+ }
+ }
+
+}