This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 681e88da88 [INLONG-7056][Sort] Adjust sort resources according to data 
scale (#10916)
681e88da88 is described below

commit 681e88da886c31f6d368a0cda3ff14d251099c63
Author: PeterZh6 <[email protected]>
AuthorDate: Sat Aug 31 16:46:53 2024 +0800

    [INLONG-7056][Sort] Adjust sort resources according to data scale (#10916)
---
 .../inlong/audit/consts}/OpenApiConstants.java     |   6 +-
 .../apache/inlong/audit/cache/AbstractCache.java   |   8 +-
 .../apache/inlong/audit/cache/RealTimeQuery.java   |   4 +-
 .../apache/inlong/audit/service/ApiService.java    | 102 +++++-----
 .../plugin/flink/FlinkParallelismOptimizer.java    | 218 +++++++++++++++++++++
 .../inlong/manager/plugin/flink/FlinkService.java  |  22 ++-
 .../manager/plugin/flink/dto/FlinkConfig.java      |   5 +
 .../manager/plugin/flink/enums/Constants.java      |   4 +
 .../ApplicationContextProvider.java}               |  34 ++--
 .../inlong/manager/plugin/util/FlinkUtils.java     |   3 +
 .../main/resources/flink-sort-plugin.properties    |   4 +
 11 files changed, 331 insertions(+), 79 deletions(-)

diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
similarity index 95%
rename from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
rename to 
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
index a727eba46b..cdb2a05fda 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.consts;
 
 /**
  * Open api constants
@@ -54,8 +54,8 @@ public class OpenApiConstants {
     public static final String PARAMS_END_TIME = "endTime";
     public static final String PARAMS_AUDIT_ID = "auditId";
     public static final String PARAMS_AUDIT_TAG = "auditTag";
-    public static final String PARAMS_INLONG_GROUP_Id = "inlongGroupId";
-    public static final String PARAMS_INLONG_STREAM_Id = "inlongStreamId";
+    public static final String PARAMS_INLONG_GROUP_ID = "inlongGroupId";
+    public static final String PARAMS_INLONG_STREAM_ID = "inlongStreamId";
     public static final String PARAMS_IP = "ip";
     public static final String PARAMS_AUDIT_CYCLE = "auditCycle";
     public static final String KEY_HTTP_BODY_SUCCESS = "success";
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
index 766f64f70d..8463501ffd 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
@@ -37,11 +37,11 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
 import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
 
 /**
  * Abstract cache.
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
index 45984e203b..21991ed564 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
@@ -51,14 +51,14 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETE
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_QUERY_IDS_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_QUERY_IPS_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_QUERY_MINUTE_SQL;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
 
 /**
  * Real time query data from audit source.
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index fa1c56ab23..3fc095f643 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -48,42 +48,42 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
-import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_STREAM_Id;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_IP;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_START_TIME;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
 import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_DAY_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_HOUR_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.HTTP_RESPOND_CODE;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_BACKLOG_SIZE;
+import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_DAY_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IDS_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IPS_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_HOUR_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_MINUTES_PATH;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_CYCLE;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_ID;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_TAG;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_END_TIME;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_GROUP_ID;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_STREAM_ID;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_IP;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_START_TIME;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
 import static org.apache.inlong.audit.entities.ApiType.DAY;
 import static org.apache.inlong.audit.entities.ApiType.GET_AUDIT_PROXY;
 import static org.apache.inlong.audit.entities.ApiType.GET_IDS;
@@ -208,14 +208,14 @@ public class ApiService {
                     return params.containsKey(PARAMS_START_TIME)
                             && params.containsKey(PARAMS_END_TIME)
                             && params.containsKey(PARAMS_AUDIT_ID)
-                            && params.containsKey(PARAMS_INLONG_GROUP_Id)
-                            && params.containsKey(PARAMS_INLONG_STREAM_Id);
+                            && params.containsKey(PARAMS_INLONG_GROUP_ID)
+                            && params.containsKey(PARAMS_INLONG_STREAM_ID);
                 case MINUTES:
                     return params.containsKey(PARAMS_START_TIME)
                             && params.containsKey(PARAMS_END_TIME)
                             && params.containsKey(PARAMS_AUDIT_ID)
-                            && params.containsKey(PARAMS_INLONG_GROUP_Id)
-                            && params.containsKey(PARAMS_INLONG_STREAM_Id)
+                            && params.containsKey(PARAMS_INLONG_GROUP_ID)
+                            && params.containsKey(PARAMS_INLONG_STREAM_ID)
                             && params.containsKey(PARAMS_AUDIT_CYCLE);
                 case GET_IDS:
                     return params.containsKey(PARAMS_START_TIME)
@@ -249,8 +249,8 @@ public class ApiService {
                     case HOUR:
                         statData = 
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
                                 params.get(PARAMS_END_TIME),
-                                params.get(PARAMS_INLONG_GROUP_Id),
-                                params.get(PARAMS_INLONG_STREAM_Id),
+                                params.get(PARAMS_INLONG_GROUP_ID),
+                                params.get(PARAMS_INLONG_STREAM_ID),
                                 params.get(PARAMS_AUDIT_ID),
                                 params.get(PARAMS_AUDIT_TAG));
                         responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
@@ -259,8 +259,8 @@ public class ApiService {
                         statData = DayCache.getInstance().getData(
                                 params.get(PARAMS_START_TIME),
                                 params.get(PARAMS_END_TIME),
-                                params.get(PARAMS_INLONG_GROUP_Id),
-                                params.get(PARAMS_INLONG_STREAM_Id),
+                                params.get(PARAMS_INLONG_GROUP_ID),
+                                params.get(PARAMS_INLONG_STREAM_ID),
                                 params.get(PARAMS_AUDIT_ID));
                         responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
                         break;
@@ -276,8 +276,8 @@ public class ApiService {
                         statData = RealTimeQuery.getInstance().queryIpsById(
                                 params.get(PARAMS_START_TIME),
                                 params.get(PARAMS_END_TIME),
-                                params.get(PARAMS_INLONG_GROUP_Id),
-                                params.get(PARAMS_INLONG_STREAM_Id),
+                                params.get(PARAMS_INLONG_GROUP_ID),
+                                params.get(PARAMS_INLONG_STREAM_ID),
                                 params.get(PARAMS_AUDIT_ID));
                         responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
                         break;
@@ -303,22 +303,22 @@ public class ApiService {
                 case MINUTE:
                     statData = 
RealTimeQuery.getInstance().queryLogTs(params.get(PARAMS_START_TIME),
                             params.get(PARAMS_END_TIME),
-                            params.get(PARAMS_INLONG_GROUP_Id),
-                            params.get(PARAMS_INLONG_STREAM_Id),
+                            params.get(PARAMS_INLONG_GROUP_ID),
+                            params.get(PARAMS_INLONG_STREAM_ID),
                             params.get(PARAMS_AUDIT_ID));
                     break;
                 case MINUTE_10:
                     statData = 
TenMinutesCache.getInstance().getData(params.get(PARAMS_START_TIME),
                             params.get(PARAMS_END_TIME),
-                            params.get(PARAMS_INLONG_GROUP_Id),
-                            params.get(PARAMS_INLONG_STREAM_Id), 
params.get(PARAMS_AUDIT_ID),
+                            params.get(PARAMS_INLONG_GROUP_ID),
+                            params.get(PARAMS_INLONG_STREAM_ID), 
params.get(PARAMS_AUDIT_ID),
                             params.get(PARAMS_AUDIT_TAG));
                     break;
                 case MINUTE_30:
                     statData = 
HalfHourCache.getInstance().getData(params.get(PARAMS_START_TIME),
                             params.get(PARAMS_END_TIME),
-                            params.get(PARAMS_INLONG_GROUP_Id),
-                            params.get(PARAMS_INLONG_STREAM_Id), 
params.get(PARAMS_AUDIT_ID),
+                            params.get(PARAMS_INLONG_GROUP_ID),
+                            params.get(PARAMS_INLONG_STREAM_ID), 
params.get(PARAMS_AUDIT_ID),
                             params.get(PARAMS_AUDIT_TAG));
                     break;
                 default:
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java
new file mode 100644
index 0000000000..121a489655
--- /dev/null
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ *     This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ *     The data volume is calculated based on the average data count per hour.
+ *     The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+    @Value("${audit.query.url:http://127.0.0.1:10080}";)
+    public String auditQueryUrl;
+
+    private static final int MAX_PARALLELISM = 2048;
+    private long maximumMessagePerSecondPerCore = 1000L;
+    private static final long DEFAULT_ERROR_DATA_VOLUME = 0L;
+    private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT;
+    private static final String DEFAULT_AUDIT_TYPE = "DataProxy";
+    private static final String AUDIT_CYCLE_REALTIME = "1";
+    // maxmimum data scale counting range in hours
+    private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1;
+    // sample time format: 2024-08-23T22:47:38.866
+    private static final String AUDIT_QUERY_DATE_TIME_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSS";
+
+    private static final String LOGTS_DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    private static final String TIMEZONE_REGEX = "([+-])(\\d):";
+
+    /**
+     * Calculate recommended parallelism based on maximum message per second 
per core
+     *
+     * @return Recommended parallelism
+     */
+    public int calculateRecommendedParallelism(List<InlongStreamInfo> 
streamInfos) {
+        long averageDataVolume;
+        InlongStreamInfo streamInfo = streamInfos.get(0);
+        try {
+            averageDataVolume = getAverageDataVolume(streamInfo);
+            log.info("Retrieved data volume: {}", averageDataVolume);
+        } catch (Exception e) {
+            log.error("Error retrieving data volume: {}", e.getMessage(), e);
+            averageDataVolume = DEFAULT_ERROR_DATA_VOLUME;
+        }
+        int newParallelism = (int) (averageDataVolume / 
maximumMessagePerSecondPerCore);
+        // Ensure parallelism is at most MAX_PARALLELISM
+        newParallelism = Math.min(newParallelism, MAX_PARALLELISM);
+        log.info("Calculated parallelism: {} for data volume: {}", 
newParallelism, averageDataVolume);
+        return newParallelism;
+    }
+
+    /**
+     * Initialize maximum message per second per core based on configuration
+     *
+     * @param maximumMessagePerSecondPerCore The maximum messages per second 
per core
+     */
+    public void setMaximumMessagePerSecondPerCore(Integer 
maximumMessagePerSecondPerCore) {
+        if (maximumMessagePerSecondPerCore == null || 
maximumMessagePerSecondPerCore <= 0) {
+            log.error(
+                    "Illegal flink.max.msg.rate.percore property, must be 
nonnull and positive, using default value: {}",
+                    this.maximumMessagePerSecondPerCore);
+        } else {
+            this.maximumMessagePerSecondPerCore = 
maximumMessagePerSecondPerCore;
+        }
+    }
+
+    /**
+     * Get average data volume on the scale specified by 
DATA_SCALE_COUNTING_RANGE_IN_HOURS
+     *
+     * @param streamInfo inlong stream info
+     * @return The average data count per hour
+     */
+    private long getAverageDataVolume(InlongStreamInfo streamInfo) {
+        // Since the audit module uses local time, we need to use 
ZonedDateTime to get the current time
+        String dataTimeZone = 
streamInfo.getSourceList().get(0).getDataTimeZone();
+
+        // This regex pattern matches a time zone offset in the format of 
"GMT+/-X:00"
+        // where X is a single digit (e.g., "GMT+8:00"). The pattern captures 
the "+" or "-" sign
+        // and the single digit, then it replaces the single digit with two 
digits by adding a "0" in front of it.
+        // For example, "GMT+8:00" becomes "GMT+08:00" in order to match 
standard offset-based ZoneId.
+        dataTimeZone = dataTimeZone.replaceAll(TIMEZONE_REGEX, "$10$2:");
+        ZoneId dataZone = ZoneId.of(dataTimeZone);
+
+        ZonedDateTime endTime = ZonedDateTime.now(dataZone);
+        ZonedDateTime startTime = 
endTime.minusHours(DATA_SCALE_COUNTING_RANGE_IN_HOURS);
+        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(AUDIT_QUERY_DATE_TIME_FORMAT);
+
+        // counting data volume on with DATA_PROXY_OUTPUT auditId
+        int auditId = AuditIdEnum.getAuditId(DEFAULT_AUDIT_TYPE, 
DEFAULT_FLOWTYPE).getValue();
+        StringJoiner urlParameters = new StringJoiner(AMPERSAND)
+                .add(PARAMS_START_TIME + EQUAL + startTime.format(formatter))
+                .add(PARAMS_END_TIME + EQUAL + endTime.format(formatter))
+                .add(PARAMS_INLONG_GROUP_ID + EQUAL + 
streamInfo.getInlongGroupId())
+                .add(PARAMS_INLONG_STREAM_ID + EQUAL + 
streamInfo.getInlongStreamId())
+                .add(PARAMS_AUDIT_ID + EQUAL + auditId)
+                .add(PARAMS_AUDIT_CYCLE + EQUAL + AUDIT_CYCLE_REALTIME);
+
+        String url = auditQueryUrl + DEFAULT_API_MINUTES_PATH + QUESTION_MARK 
+ urlParameters;
+
+        return getAverageDataVolumeFromAuditInfo(url);
+    }
+
+    /**
+     * Request audit data from inlong audit API, parse the response and return 
the total count in the given time range.
+     *
+     * @param url The URL to request data from
+     * @return The total count of the audit data
+     */
+    private long getAverageDataVolumeFromAuditInfo(String url) {
+        log.debug("Requesting audit data from URL: {}", url);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            HttpGet httpGet = new HttpGet(url);
+            try (CloseableHttpResponse response = httpClient.execute(httpGet)) 
{
+                return parseResponseAndCalculateAverageDataVolume(response);
+            } catch (IOException e) {
+                log.error("Error executing HTTP request to audit API: {}", 
url, e);
+            }
+        } catch (IOException e) {
+            log.error("Error creating or closing HTTP client: {}", url, e);
+        }
+        return DEFAULT_ERROR_DATA_VOLUME;
+    }
+
+    /**
+     * Parse the HTTP response and calculate the total count from the audit 
data.
+     *
+     * @param response The HTTP response
+     * @return The total count of the audit data
+     * @throws IOException If an I/O error occurs
+     */
+    private long 
parseResponseAndCalculateAverageDataVolume(CloseableHttpResponse response) 
throws IOException {
+        HttpEntity entity = response.getEntity();
+        if (entity == null) {
+            log.warn("Empty response entity from audit API, returning default 
count.");
+            return DEFAULT_ERROR_DATA_VOLUME;
+        }
+
+        String responseString = EntityUtils.toString(entity);
+        log.debug("Flink dynamic parallelism optimizer got response from audit 
API: {}", responseString);
+
+        JsonObject jsonObject = 
JsonParser.parseString(responseString).getAsJsonObject();
+        AuditInfo[] auditInfoArray = new 
Gson().fromJson(jsonObject.getAsJsonArray("data"), AuditInfo[].class);
+
+        ZonedDateTime minLogTs = null;
+        ZonedDateTime maxLogTs = null;
+        DateTimeFormatter logTsFormatter =
+                
DateTimeFormatter.ofPattern(LOGTS_DATE_TIME_FORMAT).withZone(ZoneId.systemDefault());
+        long totalCount = 0L;
+        for (AuditInfo auditData : auditInfoArray) {
+            if (auditData != null) {
+                ZonedDateTime logTs = 
ZonedDateTime.parse(auditData.getLogTs(), logTsFormatter);
+                if (minLogTs == null || logTs.isBefore(minLogTs)) {
+                    minLogTs = logTs;
+                }
+                if (maxLogTs == null || logTs.isAfter(maxLogTs)) {
+                    maxLogTs = logTs;
+                }
+                log.debug("parsed AuditInfo, Count: {}, Size: {}", 
auditData.getCount(), auditData.getSize());
+                totalCount += auditData.getCount();
+            } else {
+                log.error("Null AuditInfo found in response data.");
+            }
+        }
+
+        if (minLogTs != null && maxLogTs != null) {
+            long timeDifferenceInSeconds = maxLogTs.toEpochSecond() - 
minLogTs.toEpochSecond();
+            log.info("Time difference in seconds: {}", 
timeDifferenceInSeconds);
+            if (timeDifferenceInSeconds > 0) {
+                return totalCount / timeDifferenceInSeconds;
+            }
+        }
+        return DEFAULT_ERROR_DATA_VOLUME;
+    }
+}
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index f3809ce8ae..b9d10acf5b 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.plugin.util.ApplicationContextProvider;
 import org.apache.inlong.manager.plugin.util.FlinkUtils;
 
 import lombok.extern.slf4j.Slf4j;
@@ -62,13 +63,15 @@ public class FlinkService {
     private static final Pattern IP_PORT_PATTERN = 
Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
 
     private final FlinkConfig flinkConfig;
-    private final Integer parallelism;
+    private Integer parallelism;
     private final String savepointDirectory;
     // map endpoint to Configuration
     private final Map<String, Configuration> configurations = new HashMap<>();
     // map Configuration to FlinkClientService
     private final Map<Configuration, FlinkClientService> flinkClientServices = 
new HashMap<>();
 
+    private final FlinkParallelismOptimizer flinkParallelismOptimizer;
+
     /**
      * Constructor of FlinkService.
      */
@@ -76,6 +79,8 @@ public class FlinkService {
         flinkConfig = FlinkUtils.getFlinkConfigFromFile();
         parallelism = flinkConfig.getParallelism();
         savepointDirectory = flinkConfig.getSavepointDirectory();
+        // let spring inject the bean
+        flinkParallelismOptimizer = 
ApplicationContextProvider.getContext().getBean(FlinkParallelismOptimizer.class);
     }
 
     private static class FlinkServiceHolder {
@@ -213,6 +218,21 @@ public class FlinkService {
         }).filter(Objects::nonNull).collect(Collectors.toList());
 
         Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+        log.debug("flink info: {}", flinkInfo);
+        if (flinkConfig.getDynamicParallelismEnable()) {
+            
flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore());
+            // get stream info list for auditing
+            int recommendedParallelism =
+                    
flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList());
+            // Ensure parallelism is at least the default value
+            recommendedParallelism = recommendedParallelism < parallelism ? 
parallelism : recommendedParallelism;
+
+            if (recommendedParallelism != parallelism) {
+                log.info("switched to recommended parallelism: {}", 
recommendedParallelism);
+                parallelism = recommendedParallelism;
+            }
+        }
+        log.info("current parallelism: {}", parallelism);
 
         PackagedProgram program = PackagedProgram.newBuilder()
                 .setConfiguration(configuration)
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index 7d2948510c..0e16977b70 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -40,4 +40,9 @@ public class FlinkConfig {
     // flink version
     private String version;
 
+    // max msg rate per core
+    private Integer maxMsgRatePerCore;
+
+    // whether to enable dynamic parallelism
+    private Boolean dynamicParallelismEnable;
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index b5628e8664..af6d7b7ca6 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -43,6 +43,10 @@ public class Constants {
 
     public static final String FLINK_VERSION = "flink.version";
 
+    public static final String FLINK_MAX_MSG_RATE_PERCORE = 
"flink.max.msg.rate.percore";
+
+    public static final String FLINK_DYNAMIC_PARALLELISM_ENABLE = 
"flink.dynamic.parallelism.enable";
+
     // dataflow
     public static final String SOURCE_INFO = "source_info";
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java
similarity index 54%
copy from 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
copy to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java
index 7d2948510c..fe6afddb8c 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java
@@ -15,29 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.plugin.flink.dto;
+package org.apache.inlong.manager.plugin.util;
 
-import lombok.Data;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
 
 /**
- * Flink config, including address, port, job manager port, etc.
+ * Get the Spring ApplicationContext
+ * instantiate class with Spring in non-Spring managed environment
  */
-@Data
-public class FlinkConfig {
+@Component
+public class ApplicationContextProvider implements ApplicationContextAware {
 
-    private String address;
+    private static ApplicationContext context;
 
-    private Integer port;
-
-    private Integer jobManagerPort;
-
-    private String savepointDirectory;
-
-    private Integer parallelism;
-
-    private boolean drain;
-
-    // flink version
-    private String version;
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) {
+        context = applicationContext;
+    }
 
+    public static ApplicationContext getContext() {
+        return context;
+    }
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 1110ef45d8..3797e7dbd6 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -218,6 +218,9 @@ public class FlinkUtils {
         
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
         
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
         flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
+        
flinkConfig.setDynamicParallelismEnable(Boolean.parseBoolean(properties.getProperty(
+                Constants.FLINK_DYNAMIC_PARALLELISM_ENABLE)));
+        
flinkConfig.setMaxMsgRatePerCore(Integer.valueOf(properties.getProperty(Constants.FLINK_MAX_MSG_RATE_PERCORE)));
         return flinkConfig;
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
 
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
index ff34f913f8..bc8b126311 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
+++ 
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
@@ -35,3 +35,7 @@ flink.savepoint.directory=file:///data/inlong-sort/savepoints
 flink.parallelism=1
 # flink stop request drain
 flink.drain=false
+# max msg rate per core
+flink.max.msg.rate.percore=1000
+# switch on or off dynamic parallelism based on data scale
+flink.dynamic.parallelism.enable=true


Reply via email to