This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 681e88da88 [INLONG-7056][Sort] Adjust sort resources according to data
scale (#10916)
681e88da88 is described below
commit 681e88da886c31f6d368a0cda3ff14d251099c63
Author: PeterZh6 <[email protected]>
AuthorDate: Sat Aug 31 16:46:53 2024 +0800
[INLONG-7056][Sort] Adjust sort resources according to data scale (#10916)
---
.../inlong/audit/consts}/OpenApiConstants.java | 6 +-
.../apache/inlong/audit/cache/AbstractCache.java | 8 +-
.../apache/inlong/audit/cache/RealTimeQuery.java | 4 +-
.../apache/inlong/audit/service/ApiService.java | 102 +++++-----
.../plugin/flink/FlinkParallelismOptimizer.java | 218 +++++++++++++++++++++
.../inlong/manager/plugin/flink/FlinkService.java | 22 ++-
.../manager/plugin/flink/dto/FlinkConfig.java | 5 +
.../manager/plugin/flink/enums/Constants.java | 4 +
.../ApplicationContextProvider.java} | 34 ++--
.../inlong/manager/plugin/util/FlinkUtils.java | 3 +
.../main/resources/flink-sort-plugin.properties | 4 +
11 files changed, 331 insertions(+), 79 deletions(-)
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
similarity index 95%
rename from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
rename to
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
index a727eba46b..cdb2a05fda 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.consts;
/**
* Open api constants
@@ -54,8 +54,8 @@ public class OpenApiConstants {
public static final String PARAMS_END_TIME = "endTime";
public static final String PARAMS_AUDIT_ID = "auditId";
public static final String PARAMS_AUDIT_TAG = "auditTag";
- public static final String PARAMS_INLONG_GROUP_Id = "inlongGroupId";
- public static final String PARAMS_INLONG_STREAM_Id = "inlongStreamId";
+ public static final String PARAMS_INLONG_GROUP_ID = "inlongGroupId";
+ public static final String PARAMS_INLONG_STREAM_ID = "inlongStreamId";
public static final String PARAMS_IP = "ip";
public static final String PARAMS_AUDIT_CYCLE = "auditCycle";
public static final String KEY_HTTP_BODY_SUCCESS = "success";
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
index 766f64f70d..8463501ffd 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
@@ -37,11 +37,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_CACHE_MAX_SIZE;
/**
* Abstract cache.
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
index 45984e203b..21991ed564 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
@@ -51,14 +51,14 @@ import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETE
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
import static
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_QUERY_IDS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_QUERY_IPS_SQL;
import static
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_QUERY_MINUTE_SQL;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
/**
* Real time query data from audit source.
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index fa1c56ab23..3fc095f643 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -48,42 +48,42 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
-import static
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
-import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
-import static
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
-import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
-import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME;
-import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id;
-import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_STREAM_Id;
-import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_IP;
-import static
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_START_TIME;
-import static
org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_DAY_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IPS_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_HOUR_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.HTTP_RESPOND_CODE;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_BACKLOG_SIZE;
+import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_DAY_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IDS_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IPS_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_HOUR_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_MINUTES_PATH;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_COMPONENT;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_CYCLE;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_ID;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_TAG;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_END_TIME;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_GROUP_ID;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_STREAM_ID;
+import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_IP;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_START_TIME;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
import static org.apache.inlong.audit.entities.ApiType.DAY;
import static org.apache.inlong.audit.entities.ApiType.GET_AUDIT_PROXY;
import static org.apache.inlong.audit.entities.ApiType.GET_IDS;
@@ -208,14 +208,14 @@ public class ApiService {
return params.containsKey(PARAMS_START_TIME)
&& params.containsKey(PARAMS_END_TIME)
&& params.containsKey(PARAMS_AUDIT_ID)
- && params.containsKey(PARAMS_INLONG_GROUP_Id)
- && params.containsKey(PARAMS_INLONG_STREAM_Id);
+ && params.containsKey(PARAMS_INLONG_GROUP_ID)
+ && params.containsKey(PARAMS_INLONG_STREAM_ID);
case MINUTES:
return params.containsKey(PARAMS_START_TIME)
&& params.containsKey(PARAMS_END_TIME)
&& params.containsKey(PARAMS_AUDIT_ID)
- && params.containsKey(PARAMS_INLONG_GROUP_Id)
- && params.containsKey(PARAMS_INLONG_STREAM_Id)
+ && params.containsKey(PARAMS_INLONG_GROUP_ID)
+ && params.containsKey(PARAMS_INLONG_STREAM_ID)
&& params.containsKey(PARAMS_AUDIT_CYCLE);
case GET_IDS:
return params.containsKey(PARAMS_START_TIME)
@@ -249,8 +249,8 @@ public class ApiService {
case HOUR:
statData =
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
+ params.get(PARAMS_INLONG_GROUP_ID),
+ params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID),
params.get(PARAMS_AUDIT_TAG));
responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
@@ -259,8 +259,8 @@ public class ApiService {
statData = DayCache.getInstance().getData(
params.get(PARAMS_START_TIME),
params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
+ params.get(PARAMS_INLONG_GROUP_ID),
+ params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID));
responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
break;
@@ -276,8 +276,8 @@ public class ApiService {
statData = RealTimeQuery.getInstance().queryIpsById(
params.get(PARAMS_START_TIME),
params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
+ params.get(PARAMS_INLONG_GROUP_ID),
+ params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID));
responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
break;
@@ -303,22 +303,22 @@ public class ApiService {
case MINUTE:
statData =
RealTimeQuery.getInstance().queryLogTs(params.get(PARAMS_START_TIME),
params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
+ params.get(PARAMS_INLONG_GROUP_ID),
+ params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID));
break;
case MINUTE_10:
statData =
TenMinutesCache.getInstance().getData(params.get(PARAMS_START_TIME),
params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID),
+ params.get(PARAMS_INLONG_GROUP_ID),
+ params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID),
params.get(PARAMS_AUDIT_TAG));
break;
case MINUTE_30:
statData =
HalfHourCache.getInstance().getData(params.get(PARAMS_START_TIME),
params.get(PARAMS_END_TIME),
- params.get(PARAMS_INLONG_GROUP_Id),
- params.get(PARAMS_INLONG_STREAM_Id),
params.get(PARAMS_AUDIT_ID),
+ params.get(PARAMS_INLONG_GROUP_ID),
+ params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID),
params.get(PARAMS_AUDIT_TAG));
break;
default:
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java
new file mode 100644
index 0000000000..121a489655
--- /dev/null
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+ @Value("${audit.query.url:http://127.0.0.1:10080}")
+ public String auditQueryUrl;
+
+ private static final int MAX_PARALLELISM = 2048;
+ private long maximumMessagePerSecondPerCore = 1000L;
+ private static final long DEFAULT_ERROR_DATA_VOLUME = 0L;
+ private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT;
+ private static final String DEFAULT_AUDIT_TYPE = "DataProxy";
+ private static final String AUDIT_CYCLE_REALTIME = "1";
+ // maxmimum data scale counting range in hours
+ private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1;
+ // sample time format: 2024-08-23T22:47:38.866
+ private static final String AUDIT_QUERY_DATE_TIME_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSS";
+
+ private static final String LOGTS_DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ private static final String TIMEZONE_REGEX = "([+-])(\\d):";
+
+ /**
+ * Calculate recommended parallelism based on maximum message per second
per core
+ *
+ * @return Recommended parallelism
+ */
+ public int calculateRecommendedParallelism(List<InlongStreamInfo>
streamInfos) {
+ long averageDataVolume;
+ InlongStreamInfo streamInfo = streamInfos.get(0);
+ try {
+ averageDataVolume = getAverageDataVolume(streamInfo);
+ log.info("Retrieved data volume: {}", averageDataVolume);
+ } catch (Exception e) {
+ log.error("Error retrieving data volume: {}", e.getMessage(), e);
+ averageDataVolume = DEFAULT_ERROR_DATA_VOLUME;
+ }
+ int newParallelism = (int) (averageDataVolume /
maximumMessagePerSecondPerCore);
+ // Ensure parallelism is at most MAX_PARALLELISM
+ newParallelism = Math.min(newParallelism, MAX_PARALLELISM);
+ log.info("Calculated parallelism: {} for data volume: {}",
newParallelism, averageDataVolume);
+ return newParallelism;
+ }
+
+ /**
+ * Initialize maximum message per second per core based on configuration
+ *
+ * @param maximumMessagePerSecondPerCore The maximum messages per second
per core
+ */
+ public void setMaximumMessagePerSecondPerCore(Integer
maximumMessagePerSecondPerCore) {
+ if (maximumMessagePerSecondPerCore == null ||
maximumMessagePerSecondPerCore <= 0) {
+ log.error(
+ "Illegal flink.max.msg.rate.percore property, must be
nonnull and positive, using default value: {}",
+ this.maximumMessagePerSecondPerCore);
+ } else {
+ this.maximumMessagePerSecondPerCore =
maximumMessagePerSecondPerCore;
+ }
+ }
+
+ /**
+ * Get average data volume on the scale specified by
DATA_SCALE_COUNTING_RANGE_IN_HOURS
+ *
+ * @param streamInfo inlong stream info
+ * @return The average data count per hour
+ */
+ private long getAverageDataVolume(InlongStreamInfo streamInfo) {
+ // Since the audit module uses local time, we need to use
ZonedDateTime to get the current time
+ String dataTimeZone =
streamInfo.getSourceList().get(0).getDataTimeZone();
+
+ // This regex pattern matches a time zone offset in the format of
"GMT+/-X:00"
+ // where X is a single digit (e.g., "GMT+8:00"). The pattern captures
the "+" or "-" sign
+ // and the single digit, then it replaces the single digit with two
digits by adding a "0" in front of it.
+ // For example, "GMT+8:00" becomes "GMT+08:00" in order to match
standard offset-based ZoneId.
+ dataTimeZone = dataTimeZone.replaceAll(TIMEZONE_REGEX, "$10$2:");
+ ZoneId dataZone = ZoneId.of(dataTimeZone);
+
+ ZonedDateTime endTime = ZonedDateTime.now(dataZone);
+ ZonedDateTime startTime =
endTime.minusHours(DATA_SCALE_COUNTING_RANGE_IN_HOURS);
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(AUDIT_QUERY_DATE_TIME_FORMAT);
+
+ // counting data volume on with DATA_PROXY_OUTPUT auditId
+ int auditId = AuditIdEnum.getAuditId(DEFAULT_AUDIT_TYPE,
DEFAULT_FLOWTYPE).getValue();
+ StringJoiner urlParameters = new StringJoiner(AMPERSAND)
+ .add(PARAMS_START_TIME + EQUAL + startTime.format(formatter))
+ .add(PARAMS_END_TIME + EQUAL + endTime.format(formatter))
+ .add(PARAMS_INLONG_GROUP_ID + EQUAL +
streamInfo.getInlongGroupId())
+ .add(PARAMS_INLONG_STREAM_ID + EQUAL +
streamInfo.getInlongStreamId())
+ .add(PARAMS_AUDIT_ID + EQUAL + auditId)
+ .add(PARAMS_AUDIT_CYCLE + EQUAL + AUDIT_CYCLE_REALTIME);
+
+ String url = auditQueryUrl + DEFAULT_API_MINUTES_PATH + QUESTION_MARK
+ urlParameters;
+
+ return getAverageDataVolumeFromAuditInfo(url);
+ }
+
+ /**
+ * Request audit data from inlong audit API, parse the response and return
the total count in the given time range.
+ *
+ * @param url The URL to request data from
+ * @return The total count of the audit data
+ */
+ private long getAverageDataVolumeFromAuditInfo(String url) {
+ log.debug("Requesting audit data from URL: {}", url);
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ HttpGet httpGet = new HttpGet(url);
+ try (CloseableHttpResponse response = httpClient.execute(httpGet))
{
+ return parseResponseAndCalculateAverageDataVolume(response);
+ } catch (IOException e) {
+ log.error("Error executing HTTP request to audit API: {}",
url, e);
+ }
+ } catch (IOException e) {
+ log.error("Error creating or closing HTTP client: {}", url, e);
+ }
+ return DEFAULT_ERROR_DATA_VOLUME;
+ }
+
+ /**
+ * Parse the HTTP response and calculate the total count from the audit
data.
+ *
+ * @param response The HTTP response
+ * @return The total count of the audit data
+ * @throws IOException If an I/O error occurs
+ */
+ private long
parseResponseAndCalculateAverageDataVolume(CloseableHttpResponse response)
throws IOException {
+ HttpEntity entity = response.getEntity();
+ if (entity == null) {
+ log.warn("Empty response entity from audit API, returning default
count.");
+ return DEFAULT_ERROR_DATA_VOLUME;
+ }
+
+ String responseString = EntityUtils.toString(entity);
+ log.debug("Flink dynamic parallelism optimizer got response from audit
API: {}", responseString);
+
+ JsonObject jsonObject =
JsonParser.parseString(responseString).getAsJsonObject();
+ AuditInfo[] auditInfoArray = new
Gson().fromJson(jsonObject.getAsJsonArray("data"), AuditInfo[].class);
+
+ ZonedDateTime minLogTs = null;
+ ZonedDateTime maxLogTs = null;
+ DateTimeFormatter logTsFormatter =
+
DateTimeFormatter.ofPattern(LOGTS_DATE_TIME_FORMAT).withZone(ZoneId.systemDefault());
+ long totalCount = 0L;
+ for (AuditInfo auditData : auditInfoArray) {
+ if (auditData != null) {
+ ZonedDateTime logTs =
ZonedDateTime.parse(auditData.getLogTs(), logTsFormatter);
+ if (minLogTs == null || logTs.isBefore(minLogTs)) {
+ minLogTs = logTs;
+ }
+ if (maxLogTs == null || logTs.isAfter(maxLogTs)) {
+ maxLogTs = logTs;
+ }
+ log.debug("parsed AuditInfo, Count: {}, Size: {}",
auditData.getCount(), auditData.getSize());
+ totalCount += auditData.getCount();
+ } else {
+ log.error("Null AuditInfo found in response data.");
+ }
+ }
+
+ if (minLogTs != null && maxLogTs != null) {
+ long timeDifferenceInSeconds = maxLogTs.toEpochSecond() -
minLogTs.toEpochSecond();
+ log.info("Time difference in seconds: {}",
timeDifferenceInSeconds);
+ if (timeDifferenceInSeconds > 0) {
+ return totalCount / timeDifferenceInSeconds;
+ }
+ }
+ return DEFAULT_ERROR_DATA_VOLUME;
+ }
+}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index f3809ce8ae..b9d10acf5b 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.plugin.util.ApplicationContextProvider;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import lombok.extern.slf4j.Slf4j;
@@ -62,13 +63,15 @@ public class FlinkService {
private static final Pattern IP_PORT_PATTERN =
Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
private final FlinkConfig flinkConfig;
- private final Integer parallelism;
+ private Integer parallelism;
private final String savepointDirectory;
// map endpoint to Configuration
private final Map<String, Configuration> configurations = new HashMap<>();
// map Configuration to FlinkClientService
private final Map<Configuration, FlinkClientService> flinkClientServices =
new HashMap<>();
+ private final FlinkParallelismOptimizer flinkParallelismOptimizer;
+
/**
* Constructor of FlinkService.
*/
@@ -76,6 +79,8 @@ public class FlinkService {
flinkConfig = FlinkUtils.getFlinkConfigFromFile();
parallelism = flinkConfig.getParallelism();
savepointDirectory = flinkConfig.getSavepointDirectory();
+ // let spring inject the bean
+ flinkParallelismOptimizer =
ApplicationContextProvider.getContext().getBean(FlinkParallelismOptimizer.class);
}
private static class FlinkServiceHolder {
@@ -213,6 +218,21 @@ public class FlinkService {
}).filter(Objects::nonNull).collect(Collectors.toList());
Configuration configuration =
getFlinkConfiguration(flinkInfo.getEndpoint());
+ log.debug("flink info: {}", flinkInfo);
+ if (flinkConfig.getDynamicParallelismEnable()) {
+
flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore());
+ // get stream info list for auditing
+ int recommendedParallelism =
+
flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList());
+ // Ensure parallelism is at least the default value
+ recommendedParallelism = recommendedParallelism < parallelism ?
parallelism : recommendedParallelism;
+
+ if (recommendedParallelism != parallelism) {
+ log.info("switched to recommended parallelism: {}",
recommendedParallelism);
+ parallelism = recommendedParallelism;
+ }
+ }
+ log.info("current parallelism: {}", parallelism);
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index 7d2948510c..0e16977b70 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -40,4 +40,9 @@ public class FlinkConfig {
// flink version
private String version;
+ // max msg rate per core
+ private Integer maxMsgRatePerCore;
+
+ // whether to enable dynamic parallelism
+ private Boolean dynamicParallelismEnable;
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index b5628e8664..af6d7b7ca6 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -43,6 +43,10 @@ public class Constants {
public static final String FLINK_VERSION = "flink.version";
+ public static final String FLINK_MAX_MSG_RATE_PERCORE =
"flink.max.msg.rate.percore";
+
+ public static final String FLINK_DYNAMIC_PARALLELISM_ENABLE =
"flink.dynamic.parallelism.enable";
+
// dataflow
public static final String SOURCE_INFO = "source_info";
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java
similarity index 54%
copy from
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
copy to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java
index 7d2948510c..fe6afddb8c 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java
@@ -15,29 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.plugin.flink.dto;
+package org.apache.inlong.manager.plugin.util;
-import lombok.Data;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
/**
- * Flink config, including address, port, job manager port, etc.
+ * Get the Spring ApplicationContext
+ * instantiate class with Spring in non-Spring managed environment
*/
-@Data
-public class FlinkConfig {
+@Component
+public class ApplicationContextProvider implements ApplicationContextAware {
- private String address;
+ private static ApplicationContext context;
- private Integer port;
-
- private Integer jobManagerPort;
-
- private String savepointDirectory;
-
- private Integer parallelism;
-
- private boolean drain;
-
- // flink version
- private String version;
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) {
+ context = applicationContext;
+ }
+ public static ApplicationContext getContext() {
+ return context;
+ }
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 1110ef45d8..3797e7dbd6 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -218,6 +218,9 @@ public class FlinkUtils {
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
+
flinkConfig.setDynamicParallelismEnable(Boolean.parseBoolean(properties.getProperty(
+ Constants.FLINK_DYNAMIC_PARALLELISM_ENABLE)));
+
flinkConfig.setMaxMsgRatePerCore(Integer.valueOf(properties.getProperty(Constants.FLINK_MAX_MSG_RATE_PERCORE)));
return flinkConfig;
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
index ff34f913f8..bc8b126311 100644
---
a/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
+++
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
@@ -35,3 +35,7 @@ flink.savepoint.directory=file:///data/inlong-sort/savepoints
flink.parallelism=1
# flink stop request drain
flink.drain=false
+# max msg rate per core
+flink.max.msg.rate.percore=1000
+# switch on or off dynamic parallelism based on data scale
+flink.dynamic.parallelism.enable=true