This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 4e511b880eb57c30a0908fb7abd9e5276a9c2219 Author: healchow <[email protected]> AuthorDate: Thu Nov 10 19:03:00 2022 +0800 [INLONG-6502][Manager] Optimize some log and code formats (#6503) --- .../manager/common/consts/InlongConstants.java | 8 ++ .../inlong/manager/common/enums/ClusterStatus.java | 15 +-- .../inlong/manager/common/enums/GroupStatus.java | 8 +- .../inlong/manager/dao/RestTemplateConfig.java | 5 +- .../manager/plugin/flink/TaskRunService.java | 50 ++++---- .../plugin/listener/DeleteSortListener.java | 24 ++-- .../plugin/listener/DeleteStreamListener.java | 27 ++--- .../inlong/manager/plugin/util/FlinkUtils.java | 2 +- .../manager/pojo/sink/mysql/MySQLSinkDTO.java | 13 +- .../manager/service/core/SortConfigLoader.java | 32 +++-- .../manager/service/core/impl/AbstractService.java | 119 ------------------- .../service/group/InlongGroupProcessService.java | 23 ++-- .../service/heartbeat/HeartbeatManager.java | 2 +- .../listener/queue/QueueResourceListener.java | 26 ++-- .../service/listener/sort/SortConfigListener.java | 6 +- .../service/operationlog/OperationLogPool.java | 29 ++--- .../queue/kafka/KafkaResourceOperators.java | 23 ++-- .../resource/queue/pulsar/PulsarOperator.java | 2 +- .../queue/pulsar/PulsarResourceOperator.java | 131 ++++++++++----------- .../resource/sink/es/ElasticsearchConfig.java | 16 ++- .../resource/sink/mysql/MySQLJdbcUtils.java | 88 +++++++------- .../resource/sink/mysql/MySQLResourceOperator.java | 18 +-- .../resource/sort/DefaultSortConfigOperator.java | 13 +- .../service/source/AbstractSourceOperator.java | 4 +- .../service/source/SourceSnapshotOperator.java | 21 ++-- .../source/pulsar/PulsarSourceOperator.java | 1 + .../service/stream/InlongStreamProcessService.java | 26 ++-- .../service/stream/InlongStreamServiceImpl.java | 5 +- .../inlong/manager/service/RestTemplateConfig.java | 5 +- .../resources/application-unit-test.properties | 6 +- .../manager/web/auth/openapi/OpenAPIFilter.java | 2 +- .../manager/web/auth/web/AuthenticationFilter.java | 2 +- .../manager/web/config/RestTemplateConfig.java | 10 +- .../src/main/resources/application-prod.properties | 3 +- .../src/main/resources/application-test.properties | 5 +- .../event/process/ProcessEventListener.java | 13 +- .../workflow/processor/ServiceTaskProcessor.java | 18 +-- .../workflow/processor/StartEventProcessor.java | 24 ++-- 38 files changed, 365 insertions(+), 460 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index 28c2435d2..5d0b0a13d 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -22,6 +22,14 @@ package org.apache.inlong.manager.common.consts; */ public class InlongConstants { + /** + * Thread pool related config. + */ + public static final int CORE_POOL_SIZE = 10; + public static final int MAX_POOL_SIZE = 20; + public static final long ALIVE_TIME_MS = 100L; + public static final int QUEUE_SIZE = 10000; + /** * Group config */ diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java index 0cccd1589..e6f3b0d3a 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java @@ -17,8 +17,6 @@ package org.apache.inlong.manager.common.enums; -import org.apache.inlong.manager.common.exceptions.WorkflowException; - /** * Status enum of cluster */ @@ -28,7 +26,7 @@ public enum ClusterStatus { INITING(2); - int status; + final int status; ClusterStatus(int status) { this.status = status; @@ -37,15 +35,4 @@ public enum ClusterStatus { public int getStatus() { return status; } - - public static ClusterStatus fromStatus(int status) { - switch (status) { - case 1: - return NORMAL; - case 2: - return INITING; - default: - throw new WorkflowException("unknown status: " + status); - } - } } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java index dafcfe747..e60b898d5 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java @@ -49,7 +49,7 @@ public enum GroupStatus { DELETING(41, "deleting"), DELETED(40, "deleted"), - // GROUP_FINISH is used for batch task. + // FINISH is used for batch task. FINISH(131, "finish"); private static final Map<GroupStatus, Set<GroupStatus>> GROUP_STATE_AUTOMATON = Maps.newHashMap(); @@ -60,8 +60,7 @@ public enum GroupStatus { static { GROUP_STATE_AUTOMATON.put(DRAFT, Sets.newHashSet(DRAFT, TO_BE_SUBMIT, DELETING)); GROUP_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING)); - GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL, - Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED, DELETING)); + GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL, Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED)); GROUP_STATE_AUTOMATON.put(APPROVE_REJECTED, Sets.newHashSet(APPROVE_REJECTED, TO_BE_APPROVAL, DELETING)); GROUP_STATE_AUTOMATON.put(APPROVE_PASSED, Sets.newHashSet(APPROVE_PASSED, CONFIG_ING, DELETING)); @@ -149,6 +148,7 @@ public enum GroupStatus { @Override public String toString() { - return name().toLowerCase(Locale.ROOT).replace("group_", ""); + return name().toLowerCase(Locale.ROOT); } + } diff --git a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java index 4a5e7b804..3cbb57840 100644 --- a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java +++ b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java @@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.protocol.HttpContext; +import org.apache.inlong.common.constant.ProtocolType; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -79,8 +80,8 @@ public class RestTemplateConfig { public PoolingHttpClientConnectionManager httpClientConnectionManager() { // Support HTTP, HTTPS Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create() - .register("http", PlainConnectionSocketFactory.getSocketFactory()) - .register("https", SSLConnectionSocketFactory.getSocketFactory()) + .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory()) + .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory()) .build(); PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager( registry); diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java index 51109e2c7..bbc9c64fd 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java @@ -17,71 +17,61 @@ package org.apache.inlong.manager.plugin.flink; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS; +import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE; + /** * Task run service. */ public class TaskRunService { - private static final ExecutorService executorService; - - private static final int CORE_POOL_SIZE = 16; - private static final int MAXIMUM_POOL_SIZE = 32; - private static final int QUEUE_SIZE = 10000; - private static final long KEEP_ALIVE_TIME = 0L; - - static { - executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, - KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(QUEUE_SIZE)); - } + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + ALIVE_TIME_MS, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(QUEUE_SIZE), + new ThreadFactoryBuilder().setNameFormat("inlong-plugin-%s").build(), + new CallerRunsPolicy()); /** * execute - * - * @param runnable */ public static void execute(Runnable runnable) { - executorService.execute(runnable); + EXECUTOR_SERVICE.execute(runnable); } /** * submit - * - * @param runnable - * @return */ public static Future<?> submit(Runnable runnable) { - return executorService.submit(runnable); + return EXECUTOR_SERVICE.submit(runnable); } /** * submit - * - * @param runnable - * @param defaultValue - * @param <T> - * @return */ public static <T> Future<T> submit(Runnable runnable, T defaultValue) { - return executorService.submit(runnable, defaultValue); + return EXECUTOR_SERVICE.submit(runnable, defaultValue); } /** * submit - * - * @param callable - * @param <T> - * @return */ public static <T> Future<T> submit(Callable<T> callable) { - return executorService.submit(callable); + return EXECUTOR_SERVICE.submit(callable); } } diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java index 6e65f30dc..7eae0e9ca 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java @@ -42,7 +42,7 @@ import java.util.Map; import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg; /** - * Listener of delete sort. + * Listener of delete Sort task or config. */ @Slf4j public class DeleteSortListener implements SortOperateListener { @@ -57,17 +57,17 @@ public class DeleteSortListener implements SortOperateListener { ProcessForm processForm = context.getProcessForm(); String groupId = processForm.getInlongGroupId(); if (!(processForm instanceof GroupResourceProcessForm)) { - log.info("not add delete group listener, not GroupResourceProcessForm for groupId [{}]", groupId); + log.info("not add delete group listener, not GroupResourceProcessForm for groupId={}", groupId); return false; } GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm; if (groupProcessForm.getGroupOperateType() != GroupOperateType.DELETE) { - log.info("not add delete group listener, as the operate was not DELETE for groupId [{}]", groupId); + log.info("not add delete group listener, as the operate was not DELETE for groupId={}", groupId); return false; } - log.info("add delete group listener for groupId [{}]", groupId); + log.info("add delete group listener for groupId={}", groupId); return true; } @@ -76,7 +76,7 @@ public class DeleteSortListener implements SortOperateListener { ProcessForm processForm = context.getProcessForm(); String groupId = processForm.getInlongGroupId(); if (!(processForm instanceof GroupResourceProcessForm)) { - String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId); + String message = String.format("process form was not GroupResourceProcessForm for groupId=%s", groupId); log.error(message); return ListenerResult.fail(message); } @@ -90,10 +90,8 @@ public class DeleteSortListener implements SortOperateListener { extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); if (StringUtils.isEmpty(sortExt)) { - String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty", - groupId); - log.error(message); - return ListenerResult.fail(message); + log.warn("no need to delete sort for groupId={}, as the sort properties is empty", groupId); + return ListenerResult.success(); } Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( @@ -102,7 +100,7 @@ public class DeleteSortListener implements SortOperateListener { kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { - String message = String.format("sort job id is empty for groupId [%s]", groupId); + String message = String.format("sort job id is empty for groupId=%s", groupId); return ListenerResult.fail(message); } @@ -115,16 +113,16 @@ public class DeleteSortListener implements SortOperateListener { FlinkOperation flinkOperation = new FlinkOperation(flinkService); try { flinkOperation.delete(flinkInfo); - log.info("job delete success for [{}]", jobId); + log.info("job delete success for jobId={}", jobId); return ListenerResult.success(); } catch (Exception e) { flinkInfo.setException(true); flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); flinkOperation.pollJobStatus(flinkInfo); - String message = String.format("delete sort failed for groupId [%s] ", groupId); + String message = String.format("delete sort failed for groupId=%s", groupId); log.error(message, e); - return ListenerResult.fail(message + e.getMessage()); + return ListenerResult.fail(message + ": " + e.getMessage()); } } diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java index 7c50d9738..64006410b 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java @@ -59,19 +59,19 @@ public class DeleteStreamListener implements SortOperateListener { ProcessForm processForm = context.getProcessForm(); String groupId = processForm.getInlongGroupId(); if (!(processForm instanceof StreamResourceProcessForm)) { - log.info("not add delete stream listener, not StreamResourceProcessForm for groupId [{}]", groupId); + log.info("not add delete stream listener, not StreamResourceProcessForm for groupId={}", groupId); return false; } StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm; String streamId = streamProcessForm.getStreamInfo().getInlongStreamId(); if (streamProcessForm.getGroupOperateType() != GroupOperateType.DELETE) { - log.info("not add delete stream listener, as the operate was not DELETE for groupId [{}] streamId [{}]", + log.info("not add delete stream listener, as the operate was not DELETE for groupId={} streamId={}", groupId, streamId); return false; } - log.info("add delete stream listener for groupId [{}] streamId [{}]", groupId, streamId); + log.info("add delete stream listener for groupId={} streamId={}", groupId, streamId); return true; } @@ -81,26 +81,23 @@ public class DeleteStreamListener implements SortOperateListener { StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm; InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo(); List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList(); - log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList); + log.info("inlong group: {} ext info: {}", groupInfo.getInlongGroupId(), groupExtList); + InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo(); List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList(); - log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList); + log.info("inlong stream: {} ext info: {}", streamInfo.getInlongStreamId(), streamExtList); Map<String, String> kvConf = new HashMap<>(); groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); - streamExtList.forEach(extInfo -> { - kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue()); - }); + streamExtList.forEach(extInfo -> kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue())); final String groupId = streamInfo.getInlongGroupId(); final String streamId = streamInfo.getInlongStreamId(); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); if (StringUtils.isEmpty(sortExt)) { - String message = String.format( - "delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty", + log.warn("no need to delete sort for groupId={} streamId={}, as the sort properties is empty", groupId, streamId); - log.error(message); - return ListenerResult.fail(message); + return ListenerResult.success(); } Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( @@ -109,7 +106,7 @@ public class DeleteStreamListener implements SortOperateListener { kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { - String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId); + String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId); return ListenerResult.fail(message); } @@ -122,14 +119,14 @@ public class DeleteStreamListener implements SortOperateListener { FlinkOperation flinkOperation = new FlinkOperation(flinkService); try { flinkOperation.delete(flinkInfo); - log.info("job delete success for [{}]", jobId); + log.info("job delete success for jobId={}", jobId); return ListenerResult.success(); } catch (Exception e) { flinkInfo.setException(true); flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); flinkOperation.pollJobStatus(flinkInfo); - String message = String.format("delete sort failed for groupId [%s] streamId [%s]", groupId, streamId); + String message = String.format("delete sort failed for groupId=%s streamId=%s", groupId, streamId); log.error(message, e); return ListenerResult.fail(message + e.getMessage()); } diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java index 25052df49..c3daefecf 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java @@ -101,7 +101,7 @@ public class FlinkUtils { File baseDir = new File(baseDirName); if (!baseDir.exists() || !baseDir.isDirectory()) { - log.error("baseDirName find fail :{}", baseDirName); + log.error("baseDirName find fail: {}", baseDirName); return result; } String tempName; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java index 21604f777..6363a8ec1 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java @@ -145,26 +145,25 @@ public class MySQLSinkDTO { } String connUri = jdbcUrl.substring(pos1 + 1); - int pos; if (connUri.startsWith("//")) { - if ((pos = connUri.indexOf('/', 2)) != -1) { + int pos = connUri.indexOf('/', 2); + if (pos != -1) { database = connUri.substring(pos + 1); } } else { database = connUri; } + if (Strings.isNullOrEmpty(database)) { + throw new IllegalArgumentException("Invalid JDBC URL: " + jdbcUrl); + } + if (database.contains("?")) { database = database.substring(0, database.indexOf("?")); } - if (database.contains(";")) { database = database.substring(0, database.indexOf(";")); } - - if (Strings.isNullOrEmpty(database)) { - throw new IllegalArgumentException("Invalid JDBC url."); - } return database; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java index 6b0a02fc5..7f5469aa6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java @@ -34,64 +34,76 @@ import java.util.List; * Loader for sort service to load configs thought Cursor */ public interface SortConfigLoader { + /** * Load all clusters by cursor - * @return List of clusters, including MQ cluster and DataNode cluster. + * + * @return list of clusters, including MQ cluster and DataProxy cluster */ List<SortSourceClusterInfo> loadAllClusters(); /** * Load stream sinks by cursor - * @return List of Stream sinks. + * + * @return list of stream sinks */ List<SortSourceStreamSinkInfo> loadAllStreamSinks(); /** * Load groups by cursor - * @return List of group info + * + * @return list of group info */ List<SortSourceGroupInfo> loadAllGroup(); /** - * Load group backup info by cursor - * @param keyName Key name - * @return List of group backup info + * Load backup group info by cursor + * + * # @param keyName key name + * + * @return list of backup group info */ List<InlongGroupExtEntity> loadGroupBackupInfo(String keyName); /** - * Load stream backup info by cursor - * @param keyName Key name - * @return List of stream backup info + * Load backup stream info by cursor + * + * @param keyName key name + * @return list of backup stream info */ List<InlongStreamExtEntity> loadStreamBackupInfo(String keyName); /** * Load all inlong stream info by cursor - * @return List of stream info + * + * @return list of stream info */ List<SortSourceStreamInfo> loadAllStreams(); /** * Load all inlong stream sink entity by cursor + * * @return List of stream sink entity */ List<StreamSinkEntity> loadAllStreamSinkEntity(); /** * Load all task info + * * @return List of tasks */ List<SortTaskInfo> loadAllTask(); /** * Load all data node entity + * * @return List of data node */ List<DataNodeEntity> loadAllDataNodeEntity(); /** * Load all fields info + * * @return List of fields info */ List<SortFieldInfo> loadAllFields(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java deleted file mode 100644 index 3b819f898..000000000 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.service.core.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Value; - -public abstract class AbstractService<T> implements AutoCloseable, InitializingBean { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractService.class); - - - @Value("${msg.to.db.batch.size:10}") - private int batchSize = 10; - - @Value("${msg.to.db.queue.size:10000}") - private int queueSize = 10000; - - @Value("${msg.to.db.core.pool.size:2}") - private int corePoolSize = 2; - - @Value("${msg.to.db.max.pool.size:2}") - private int maximumPoolSize = 2; - - @Value("${msg.to.db.queue.pool.size:10}") - private int syncSendQueueSize = 10; - - private volatile boolean isClose = false; - - private LinkedBlockingQueue<T> dataQueue; - - private ThreadPoolExecutor pool; - - /** - * batch insert entities - * - * @param entryList entryList - * @return boolean true/false - */ - abstract boolean batchInsertEntities(List<T> entryList); - - /** - * put Data - * - * @param data data - * @return boolean true/false - */ - public boolean putData(T data) { - if (dataQueue == null) { - return false; - } - return dataQueue.offer(data); - } - - @Override - public void close() { - isClose = true; - } - - @Override - public void afterPropertiesSet() { - dataQueue = new LinkedBlockingQueue(queueSize); - pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, - 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(syncSendQueueSize), - Executors.defaultThreadFactory()); - for (int i = 0; i < corePoolSize; i++) { - pool.execute(new Task()); - } - } - - class Task implements Runnable { - @Override - public void run() { - while (!isClose) { - try { - List<T> entryList = new ArrayList<>(); - int count = 0; - while (count < batchSize) { - T data = dataQueue.poll(1, TimeUnit.MILLISECONDS); - if (data != null) { - entryList.add(data); - } - count++; - } - if (CollectionUtils.isNotEmpty(entryList)) { - batchInsertEntities(entryList); - } - } catch (Exception e) { - LOGGER.error("batchInsertEntities has exception = {}", e); - } - } - } - } -} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java index 8d950b706..de044f0aa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java @@ -48,6 +48,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS; +import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE; + /** * Operation to the inlong group process */ @@ -56,12 +61,12 @@ public class InlongGroupProcessService { private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupProcessService.class); - private final ExecutorService executorService = new ThreadPoolExecutor( - 20, - 40, - 0L, + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + ALIVE_TIME_MS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), + new LinkedBlockingQueue<>(QUEUE_SIZE), new ThreadFactoryBuilder().setNameFormat("inlong-group-process-%s").build(), new CallerRunsPolicy()); @@ -107,7 +112,7 @@ public class InlongGroupProcessService { groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator); InlongGroupInfo groupInfo = groupService.get(groupId); GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND); - executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form)); + EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form)); LOGGER.info("success to suspend process asynchronously for groupId={} by operator={}", groupId, operator); return groupId; @@ -147,7 +152,7 @@ public class InlongGroupProcessService { groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator); InlongGroupInfo groupInfo = groupService.get(groupId); GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART); - executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form)); + EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form)); LOGGER.info("success to restart process asynchronously for groupId={} by operator={}", groupId, operator); return groupId; @@ -181,7 +186,7 @@ public class InlongGroupProcessService { */ public String deleteProcessAsync(String groupId, String operator) { LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator); - executorService.execute(() -> { + EXECUTOR_SERVICE.execute(() -> { try { invokeDeleteProcess(groupId, operator); } catch (Exception ex) { @@ -255,7 +260,7 @@ public class InlongGroupProcessService { List<WorkflowProcessEntity> entities = workflowQueryService.listProcessEntity(processQuery); entities.sort(Comparator.comparingInt(WorkflowProcessEntity::getId)); WorkflowProcessEntity lastProcess = entities.get(entities.size() - 1); - executorService.execute(() -> { + EXECUTOR_SERVICE.execute(() -> { workflowService.continueProcess(lastProcess.getId(), operator, "Reset group status"); }); return true; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java index 010afee02..16a96a8f1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java @@ -79,7 +79,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { } }).build(); - // The expire time of cluster info cache must be greater than heartbeat cache + // The expiry time of cluster info cache must be greater than heartbeat cache // because the eviction handler needs to query cluster info cache clusterInfoCache = Caffeine.newBuilder() .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java index 390c11b48..152832ba1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java @@ -48,6 +48,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS; +import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE; import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT; import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE; @@ -59,21 +63,23 @@ import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_R @Service public class QueueResourceListener implements QueueOperateListener { + private static final Integer TIMEOUT_SECONDS = 180; + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( - 20, - 40, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(), + CORE_POOL_SIZE, + MAX_POOL_SIZE, + ALIVE_TIME_MS, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(QUEUE_SIZE), + new ThreadFactoryBuilder().setNameFormat("inlong-mq-process-%s").build(), new CallerRunsPolicy()); @Autowired private InlongGroupService groupService; @Autowired - private QueueResourceOperatorFactory queueOperatorFactory; - @Autowired private WorkflowService workflowService; + @Autowired + private QueueResourceOperatorFactory queueOperatorFactory; @Override public TaskEvent event() { @@ -152,7 +158,9 @@ public class QueueResourceListener implements QueueOperateListener { } }); try { - future.get(180, TimeUnit.SECONDS); + // wait for the current process complete before starting the next stream, + // otherwise, an exception is thrown and the next stream process will not be started. + future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { String msg = "failed to execute stream process in asynchronously "; log.error(msg, e); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index 08a310ddb..39a3b78dc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -77,15 +77,17 @@ public class SortConfigListener implements SortOperateListener { GroupOperateType operateType = form.getGroupOperateType(); if (operateType == GroupOperateType.SUSPEND || operateType == GroupOperateType.DELETE) { - LOGGER.info("not build sort config for groupId={}, as the group operate type={}", groupId, operateType); + LOGGER.info("no need to build sort config for groupId={} as the operate type is {}", groupId, operateType); return ListenerResult.success(); } + InlongGroupInfo groupInfo = form.getGroupInfo(); List<InlongStreamInfo> streamInfos = form.getStreamInfos(); if (CollectionUtils.isEmpty(streamInfos)) { - LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId); + LOGGER.warn("no need to build sort config for groupId={}, as not found any stream", groupId); return ListenerResult.success(); } + int sinkCount = streamInfos.stream() .map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size()) .reduce(0, Integer::sum); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java index 85b8e7eb7..cf7bd4828 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java @@ -36,6 +36,9 @@ import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; +import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS; +import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE; + /** * Operation log thread pool */ @@ -43,24 +46,23 @@ import java.util.stream.IntStream; @Component public class OperationLogPool { - private static final int BUFFER_SIZE = 500; - private static final int MAX_WAIT_TIME_SECOND = 30; - private static final int MAX_QUEUE_SIZE = 10000; + private static final int TIMEOUT_SECOND = 30; private static final int THREAD_NUM = 3; - private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>( - MAX_QUEUE_SIZE); + private static final int BUFFER_SIZE = 500; + + private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(QUEUE_SIZE); - private final ExecutorService executorService = new ThreadPoolExecutor( + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( THREAD_NUM, THREAD_NUM, - 0L, + ALIVE_TIME_MS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat("async-operation-log-%s").build(), + new LinkedBlockingQueue<>(QUEUE_SIZE), + new ThreadFactoryBuilder().setNameFormat("inlong-operation-log-%s").build(), new CallerRunsPolicy()); @Autowired - private OperationLogEntityMapper operationLogEntityMapper; + private OperationLogEntityMapper operationLogMapper; public static void publish(OperationLogEntity operation) { if (!OPERATION_POOL.offer(operation)) { @@ -71,7 +73,7 @@ public class OperationLogPool { @PostConstruct public void init() { IntStream.range(0, THREAD_NUM).forEach( - i -> executorService.submit(this::saveOperationLog) + i -> EXECUTOR_SERVICE.submit(this::saveOperationLog) ); } @@ -79,14 +81,13 @@ public class OperationLogPool { List<OperationLogEntity> buffer = new ArrayList<>(BUFFER_SIZE); while (true) { buffer.clear(); - int size = 0; try { - size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, MAX_WAIT_TIME_SECOND, TimeUnit.SECONDS); + int size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, TIMEOUT_SECOND, TimeUnit.SECONDS); if (buffer.isEmpty()) { continue; } long startTime = System.currentTimeMillis(); - operationLogEntityMapper.insertBatch(buffer); + operationLogMapper.insertBatch(buffer); log.info("receive {} logs and saved cost {} ms", size, System.currentTimeMillis() - startTime); } catch (InterruptedException e) { log.error("save operation log interrupted", e); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java index 2f65aa735..2157acb1d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java @@ -51,14 +51,14 @@ public class KafkaResourceOperators implements QueueResourceOperator { */ private static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group"; - @Autowired - private InlongClusterService clusterService; @Autowired private KafkaOperator kafkaOperator; @Autowired private InlongStreamService streamService; @Autowired private InlongConsumeService consumeService; + @Autowired + private InlongClusterService clusterService; @Override public boolean accept(String mqType) { @@ -67,7 +67,8 @@ public class KafkaResourceOperators implements QueueResourceOperator { @Override public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) { - log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId()); + log.info("skip to create kafka topic for groupId={}, just create in each inlong stream", + groupInfo.getInlongGroupId()); } @Override @@ -84,7 +85,7 @@ public class KafkaResourceOperators implements QueueResourceOperator { return; } for (InlongStreamBriefInfo streamInfo : streamInfoList) { - this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId()); + this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource()); } } catch (Exception e) { log.error("failed to delete kafka resource for groupId=" + groupId, e); @@ -94,8 +95,7 @@ public class KafkaResourceOperators implements QueueResourceOperator { } @Override - public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - String operator) { + public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) { Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null"); Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null"); Preconditions.checkNotNull(operator, "operator cannot be null"); @@ -117,8 +117,7 @@ public class KafkaResourceOperators implements QueueResourceOperator { } @Override - public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - String operator) { + public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) { Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null"); Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null"); @@ -141,7 +140,7 @@ public class KafkaResourceOperators implements QueueResourceOperator { * Create Kafka Topic and Subscription, and save the consumer group info. */ private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String topicName) throws Exception { - // 1. create kafka topic + // create Kafka topic ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA); kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName); @@ -153,7 +152,8 @@ public class KafkaResourceOperators implements QueueResourceOperator { } // Kafka consumers do not need to register in advance - // 2. insert the consumer group info + + // save the consumer group info for the Kafka topic String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName); Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup); log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}", @@ -163,9 +163,8 @@ public class KafkaResourceOperators implements QueueResourceOperator { /** * Delete Kafka Topic and Subscription, and delete the consumer group info. */ - private void deleteKafkaTopic(InlongGroupInfo groupInfo, String streamId) { + private void deleteKafkaTopic(InlongGroupInfo groupInfo, String topicName) { ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA); - String topicName = groupInfo.getInlongGroupId() + "_" + streamId; kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo, topicName); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index 3ebeeca37..b82074d74 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -353,7 +353,7 @@ public class PulsarOperator { LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e); if (count == RETRY_TIMES) { LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic); - throw new BusinessException("check if the subscription exists error"); + throw new BusinessException("check if the subscription exists error: " + e.getMessage()); } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java index 2e379b285..69a38ec6e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java @@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; @@ -44,7 +45,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; -import java.util.stream.Collectors; /** * Operator for create Pulsar Tenant, Namespace, Topic and Subscription @@ -80,72 +80,67 @@ public class PulsarResourceOperator implements QueueResourceOperator { Preconditions.checkNotNull(operator, "operator cannot be null"); String groupId = groupInfo.getInlongGroupId(); - log.info("begin to create pulsar resource for groupId={}", groupId); + String clusterTag = groupInfo.getInlongClusterTag(); + log.info("begin to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag); // get pulsar cluster via the inlong cluster tag from the inlong group - String clusterTag = groupInfo.getInlongClusterTag(); - List<PulsarClusterInfo> pulsarClusters = - clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream() - .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) - .collect(Collectors.toList()); - for (PulsarClusterInfo pulsarCluster : pulsarClusters) { + List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); + for (ClusterInfo clusterInfo : clusterInfos) { + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String clusterName = pulsarCluster.getName(); // create pulsar tenant and namespace String tenant = pulsarCluster.getTenant(); if (StringUtils.isEmpty(tenant)) { tenant = InlongConstants.DEFAULT_PULSAR_TENANT; } - String namespace = groupInfo.getMqResource(); - InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo; + // if the group was not successful, need create tenant and namespace if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) { pulsarOperator.createTenant(pulsarAdmin, tenant); - log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}", - groupId, tenant, clusterName); - pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace); - log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}", - groupId, namespace, clusterName); + String namespace = groupInfo.getMqResource(); + pulsarOperator.createNamespace(pulsarAdmin, (InlongPulsarInfo) groupInfo, tenant, namespace); + + log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}", + groupId, tenant, namespace, pulsarCluster); } } catch (Exception e) { - String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId, - pulsarCluster.toString()); - log.error(msg, e); + String msg = "failed to create pulsar resource for groupId=" + groupId; + log.error(msg + ", cluster=" + pulsarCluster, e); throw new WorkflowListenerException(msg + ": " + e.getMessage()); } } - log.info("success to create pulsar resource for groupId={}", groupId); + + log.info("success to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag); } @Override public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) { Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null"); - String groupId = groupInfo.getInlongGroupId(); - log.info("begin to delete pulsar resource for groupId={}", groupId); - - List<PulsarClusterInfo> pulsarClusters = - clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream() - .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) - .collect(Collectors.toList()); - for (PulsarClusterInfo clusterInfo : pulsarClusters) { - String clusterName = clusterInfo.getName(); + String clusterTag = groupInfo.getInlongClusterTag(); + log.info("begin to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag); + + List<InlongStreamBriefInfo> streamInfos = streamService.getTopicList(groupId); + if (CollectionUtils.isEmpty(streamInfos)) { + log.warn("skip to delete pulsar resource as no streams for groupId={}", groupId); + return; + } + + List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); + for (ClusterInfo clusterInfo : clusterInfos) { + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try { - List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); - if (streamInfoList == null || streamInfoList.isEmpty()) { - log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}", - groupId, clusterName); - return; - } - for (InlongStreamBriefInfo streamInfo : streamInfoList) { - this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource()); + for (InlongStreamBriefInfo streamInfo : streamInfos) { + this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource()); } } catch (Exception e) { - log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e); - throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage()); + String msg = "failed to delete pulsar resource for groupId=" + groupId; + log.error(msg + ", cluster=" + pulsarCluster, e); + throw new WorkflowListenerException(msg + ": " + e.getMessage()); } } - log.info("success to delete pulsar resource for groupId={}", groupId); + + log.info("success to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag); } @Override @@ -156,22 +151,25 @@ public class PulsarResourceOperator implements QueueResourceOperator { String groupId = streamInfo.getInlongGroupId(); String streamId = streamInfo.getInlongStreamId(); - log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId); + String clusterTag = groupInfo.getInlongClusterTag(); + log.info("begin to create pulsar resource for groupId={}, streamId={}, clusterTag={}", + groupId, streamId, clusterTag); - List<PulsarClusterInfo> pulsarClusters = - clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream() - .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) - .collect(Collectors.toList()); - for (PulsarClusterInfo pulsarCluster : pulsarClusters) { + // get pulsar cluster via the inlong cluster tag from the inlong group + List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); + for (ClusterInfo clusterInfo : clusterInfos) { + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try { // create pulsar topic and subscription - this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource()); - this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, - streamInfo.getMqResource(), streamId); + String topicName = streamInfo.getMqResource(); + this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, topicName); + this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, topicName, streamId); + + log.info("success to create pulsar resource for groupId={}, streamId={}, topic={}, cluster={}", + groupId, streamId, topicName, pulsarCluster); } catch (Exception e) { - String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s", - groupId, streamId, pulsarCluster.getName()); - log.error(msg, e); + String msg = "failed to create pulsar resource for groupId=" + groupId + ", streamId=" + streamId; + log.error(msg + ", cluster=" + pulsarCluster, e); throw new WorkflowListenerException(msg + ": " + e.getMessage()); } } @@ -186,25 +184,24 @@ public class PulsarResourceOperator implements QueueResourceOperator { String groupId = streamInfo.getInlongGroupId(); String streamId = streamInfo.getInlongStreamId(); - log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId); - - List<PulsarClusterInfo> pulsarClusters = - clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream() - .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) - .collect(Collectors.toList()); - for (PulsarClusterInfo clusterInfo : pulsarClusters) { - String clusterName = clusterInfo.getName(); + String clusterTag = groupInfo.getInlongClusterTag(); + log.info("begin to delete pulsar resource for groupId={}, streamId={}, clusterTag={}", + groupId, streamId, clusterTag); + + List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); + for (ClusterInfo clusterInfo : clusterInfos) { + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try { - this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource()); - log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}", - groupId, streamId, clusterName); + this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource()); + log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}", + groupId, streamId, streamInfo.getMqResource(), pulsarCluster); } catch (Exception e) { - String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s", - groupId, streamId, clusterName); - log.error(msg, e); - throw new WorkflowListenerException(msg); + String msg = "failed to delete pulsar topic for groupId=" + groupId + ", streamId=" + streamId; + log.error(msg + ", cluster=" + pulsarCluster, e); + throw new WorkflowListenerException(msg + ": " + e.getMessage()); } } + log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java index 6cccff706..c2df68f05 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java @@ -17,17 +17,14 @@ package org.apache.inlong.manager.service.resource.sink.es; -import java.util.ArrayList; -import java.util.List; - import lombok.Data; - import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.inlong.common.constant.ProtocolType; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; @@ -36,6 +33,9 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; + /** * Elasticsearch config information, including host, port, etc. */ @@ -43,6 +43,8 @@ import org.springframework.stereotype.Component; @Component public class ElasticsearchConfig { + private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class); + private static RestHighLevelClient highLevelClient; @Value("${es.index.search.hostname}") private String host; @Value("${es.index.search.port}") @@ -54,10 +56,6 @@ public class ElasticsearchConfig { @Value("${es.auth.password}") private String password; - private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class); - - private static RestHighLevelClient highLevelClient; - /** * highLevelClient * @@ -75,7 +73,7 @@ public class ElasticsearchConfig { for (String host : hostArrays) { if (StringUtils.isNotBlank(host)) { host = host.trim(); - hosts.add(new HttpHost(host, port, "http")); + hosts.add(new HttpHost(host, port, ProtocolType.HTTP)); } } RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0])); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java index 441779c5c..d463ca9ab 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java @@ -38,10 +38,8 @@ import java.util.Objects; public class MySQLJdbcUtils { private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql"; - private static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - - private static final Logger LOG = LoggerFactory.getLogger(MySQLJdbcUtils.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MySQLJdbcUtils.class); /** * Get MySQL connection from the url and user. @@ -52,46 +50,46 @@ public class MySQLJdbcUtils { * @return {@link Connection} * @throws Exception on get connection error */ - public static Connection getConnection(final String url, final String user, final String password) - throws Exception { + public static Connection getConnection(String url, String user, String password) throws Exception { if (StringUtils.isBlank(url) || !url.startsWith(MYSQL_JDBC_PREFIX)) { - throw new Exception("MySQL server URL was invalid, it should start with jdbc:mysql"); + throw new Exception("MySQL JDBC URL was invalid, it should start with jdbc:mysql"); } + Connection conn; try { Class.forName(MYSQL_DRIVER_CLASS); conn = DriverManager.getConnection(url, user, password); } catch (Exception e) { - final String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!"; - LOG.error(errorMsg, e); + String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!"; + LOGGER.error(errorMsg, e); throw new Exception(errorMsg + " other error msg: " + e.getMessage()); } if (Objects.isNull(conn)) { throw new Exception("get MySQL connection failed, please contact administrator."); } - LOG.info("get MySQL connection success, url={}", url); + LOGGER.info("get MySQL connection success for url={}", url); return conn; } /** * Execute SQL command on MySQL. * - * @param conn JDBC Connection {@link Connection} - * @param sql SQL string to be executed + * @param conn JDBC {@link Connection} + * @param sql SQL to be executed * @throws Exception on execute SQL error */ public static void executeSql(final Connection conn, final String sql) throws Exception { try (Statement stmt = conn.createStatement()) { stmt.execute(sql); - LOG.info("execute sql [{}] success", sql); + LOGGER.info("execute sql [{}] success", sql); } } /** * Execute batch query SQL on MySQL. * - * @param conn JDBC Connection {@link Connection} - * @param sqls SQL string to be executed + * @param conn JDBC {@link Connection} + * @param sqls SQL to be executed * @throws Exception on get execute SQL batch error */ public static void executeSqlBatch(final Connection conn, final List<String> sqls) throws Exception { @@ -101,7 +99,7 @@ public class MySQLJdbcUtils { stmt.execute(entry); } conn.commit(); - LOG.info("execute sql [{}] success", sqls); + LOGGER.info("execute sql [{}] success", sqls); } finally { conn.setAutoCommit(true); } @@ -110,7 +108,7 @@ public class MySQLJdbcUtils { /** * Create MySQL database * - * @param conn JDBC Connection {@link Connection} + * @param conn JDBC {@link Connection} * @param dbName database name * @throws Exception on create database error */ @@ -118,58 +116,58 @@ public class MySQLJdbcUtils { if (!checkDbExist(conn, dbName)) { final String createDbSql = MySQLSqlBuilder.buildCreateDbSql(dbName); executeSql(conn, createDbSql); - LOG.info("execute sql [{}] success", createDbSql); + LOGGER.info("execute sql [{}] success", createDbSql); } else { - LOG.info("The database [{}] are exists", dbName); + LOGGER.info("The database [{}] are exists", dbName); } } /** * Check database from the MySQL information_schema. * - * @param conn JDBC Connection {@link Connection} - * @param dbName MySQL database name + * @param conn JDBC {@link Connection} + * @param dbName database name * @return true if table exist, otherwise false * @throws Exception on check database exist error */ public static boolean checkDbExist(final Connection conn, final String dbName) throws Exception { - boolean result = false; final String checkDbSql = MySQLSqlBuilder.getCheckDatabase(dbName); try (Statement stmt = conn.createStatement(); ResultSet resultSet = stmt.executeQuery(checkDbSql)) { if (Objects.nonNull(resultSet)) { if (resultSet.next()) { + LOGGER.info("check db exist for db={}, result=true", dbName); return true; } } } - LOG.info("check db exist for db={}, result={}", dbName, result); - return result; + LOGGER.info("check db exist for db={}, result=false", dbName); + return false; } /** * Create MySQL table by MySQLTableInfo * - * @param conn JDBC Connection {@link Connection} - * @param tableInfo MySQL table info {@link MySQLTableInfo} + * @param conn JDBC {@link Connection} + * @param tableInfo table info {@link MySQLTableInfo} * @throws Exception on create table error */ public static void createTable(final Connection conn, final MySQLTableInfo tableInfo) throws Exception { if (checkTablesExist(conn, tableInfo.getDbName(), tableInfo.getTableName())) { - LOG.info("The table [{}] are exists", tableInfo.getTableName()); + LOGGER.info("The table [{}] are exists", tableInfo.getTableName()); } else { final String createTableSql = MySQLSqlBuilder.buildCreateTableSql(tableInfo); executeSql(conn, createTableSql); - LOG.info("execute sql [{}] success", createTableSql); + LOGGER.info("execute sql [{}] success", createTableSql); } } /** * Check tables from the MySQL information_schema. * - * @param conn JDBC Connection {@link Connection} - * @param dbName MySQL database name - * @param tableName MySQL table name + * @param conn JDBC {@link Connection} + * @param dbName database name + * @param tableName table name * @return true if table exist, otherwise false * @throws Exception on check table exist error */ @@ -185,23 +183,22 @@ public class MySQLJdbcUtils { } } } - LOG.info("check table exist for db={} table={}, result={}", dbName, tableName, result); + LOGGER.info("check table exist for db={} table={}, result={}", dbName, tableName, result); return result; } /** - * Check whether the column exists in the table. + * Check whether the column exists in the MySQL table. * * @param conn JDBC Connection {@link Connection} - * @param dbName MySQL database name - * @param tableName MySQL table name - * @param column MySQL table column name + * @param dbName database name + * @param tableName table name + * @param column table column name * @return true if column exist in the table, otherwise false * @throws Exception on check column exist error */ public static boolean checkColumnExist(final Connection conn, final String dbName, final String tableName, - final String column) - throws Exception { + final String column) throws Exception { boolean result = false; final String checkTableSql = MySQLSqlBuilder.getCheckColumn(dbName, tableName, column); try (Statement stmt = conn.createStatement(); @@ -212,16 +209,16 @@ public class MySQLJdbcUtils { } } } - LOG.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column); + LOGGER.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column); return result; } /** - * Query all columns of the tableName. + * Query all MySQL table columns by the given tableName. * - * @param conn JDBC Connection {@link Connection} - * @param dbName MySQL database name - * @param tableName MySQL table name + * @param conn JDBC {@link Connection} + * @param dbName database name + * @param tableName table name * @return {@link List} * @throws Exception on get columns error */ @@ -247,9 +244,9 @@ public class MySQLJdbcUtils { * Add columns for MySQL table. * * @param conn JDBC Connection {@link Connection} - * @param dbName MySQL database name - * @param tableName MySQL table name - * @param columns MySQL columns to be added + * @param dbName database name + * @param tableName table name + * @param columns columns to be added * @throws Exception on add columns error */ public static void addColumns(final Connection conn, final String dbName, final String tableName, @@ -264,4 +261,5 @@ public class MySQLJdbcUtils { final List<String> addColumnSql = MySQLSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos); executeSqlBatch(conn, addColumnSql); } + } \ No newline at end of file diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java index d60232ac0..6c859cf2a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java @@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.resource.sink.mysql; import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.exceptions.WorkflowException; +import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; +import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo; import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO; import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo; -import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; -import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; import org.slf4j.Logger; @@ -90,19 +90,19 @@ public class MySQLResourceOperator implements SinkResourceOperator { columnList.add(columnInfo); } - final MySQLSinkDTO mySQLSink = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams()); - final MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(mySQLSink, columnList); - - try (Connection conn = MySQLJdbcUtils.getConnection(mySQLSink.getJdbcUrl(), mySQLSink.getUsername(), - mySQLSink.getPassword())) { + MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams()); + MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList); + try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(), + sinkDTO.getPassword())) { // 1. create database if not exists MySQLJdbcUtils.createDb(conn, tableInfo.getDbName()); // 2. table not exists, create it MySQLJdbcUtils.createTable(conn, tableInfo); // 3. table exists, add columns - skip the exists columns MySQLJdbcUtils.addColumns(conn, tableInfo.getDbName(), tableInfo.getTableName(), columnList); + // 4. update the sink status to success - final String info = "success to create MySQL resource"; + String info = "success to create MySQL resource"; sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); LOG.info(info + " for sinkInfo={}", sinkInfo); } catch (Throwable e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 2449b835e..08ec23226 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -78,16 +78,16 @@ public class DefaultSortConfigOperator implements SortConfigOperator { public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream) throws Exception { if (isStream) { - LOGGER.warn("stream workflow no need to build sort config for disable zk"); + LOGGER.warn("no need to build sort config for stream process when disable zk"); return; } if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) { - LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk"); + LOGGER.warn("no need to build sort config as the group is null or streams is empty when disable zk"); return; } - GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos); - String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo); + GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos); + String dataflow = OBJECT_MAPPER.writeValueAsString(sortConfigInfo); this.addToGroupExt(groupInfo, dataflow); if (LOGGER.isDebugEnabled()) { @@ -127,9 +127,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator { if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) { if (CollectionUtils.isNotEmpty(transformResponses)) { relations = NodeRelationUtils.createNodeRelations(inlongStream); - - // in standard mode, replace upstream source node and transform input fields node to mq node - // mq node name, which is inlong stream id + // in standard mode, replace upstream source node and transform input fields node + // to MQ node (which is inlong stream id) String mqNodeName = sources.get(0).getSourceName(); Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses); adjustTransformField(transformResponses, nodeNameSet, mqNodeName); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 6247822b3..a3e68b8b5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -236,7 +236,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { } } - private void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) { + protected void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) { Integer sourceId = entity.getId(); if (CollectionUtils.isEmpty(fieldInfos)) { return; @@ -270,7 +270,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { streamFieldMapper.insertAll(list); } - private void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) { + protected void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) { LOGGER.info("begin to save source fields={}", fieldInfos); if (CollectionUtils.isEmpty(fieldInfos)) { return; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java index ddcdc0dde..e94cf2276 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java @@ -36,7 +36,6 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -44,6 +43,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS; +import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE; + /** * Operate the source snapshot */ @@ -51,14 +55,15 @@ import java.util.concurrent.TimeUnit; public class SourceSnapshotOperator implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperator.class); - private final ExecutorService executorService = new ThreadPoolExecutor( - 1, - 1, - 10L, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>(100), + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + ALIVE_TIME_MS, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(QUEUE_SIZE), new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(), new CallerRunsPolicy()); + @Autowired private StreamSourceEntityMapper sourceMapper; @@ -101,7 +106,7 @@ public class SourceSnapshotOperator implements AutoCloseable { snapshotQueue = new LinkedBlockingQueue<>(queueSize); } SaveSnapshotTaskRunnable taskRunnable = new SaveSnapshotTaskRunnable(); - this.executorService.execute(taskRunnable); + EXECUTOR_SERVICE.execute(taskRunnable); LOGGER.info("source snapshot operate thread started successfully"); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java index 12ebe6200..c4af55daa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java @@ -142,6 +142,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator { && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) { pulsarSource.setSerializationType(sourceInfo.getSerializationType()); } + // currently, only reuse the primary key from Kafka source if (SourceType.KAFKA.equals(sourceInfo.getSourceType())) { pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java index eaa05ba41..43fff8cdd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java @@ -42,6 +42,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS; +import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE; + /** * Operation related to inlong stream process */ @@ -49,12 +54,12 @@ import java.util.concurrent.TimeUnit; @Service public class InlongStreamProcessService { - private final ExecutorService executorService = new ThreadPoolExecutor( - 20, - 40, - 0L, + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + ALIVE_TIME_MS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), + new LinkedBlockingQueue<>(QUEUE_SIZE), new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(), new CallerRunsPolicy()); @@ -100,7 +105,7 @@ public class InlongStreamProcessService { ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus(); return processStatus == ProcessStatus.COMPLETED; } else { - executorService.execute(() -> workflowService.start(processName, operator, processForm)); + EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm)); return true; } } @@ -143,7 +148,7 @@ public class InlongStreamProcessService { ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus(); return processStatus == ProcessStatus.COMPLETED; } else { - executorService.execute(() -> workflowService.start(processName, operator, processForm)); + EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm)); return true; } } @@ -158,8 +163,7 @@ public class InlongStreamProcessService { throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); } GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus()); - if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL - && groupStatus != GroupStatus.RESTARTED) { + if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) { throw new BusinessException( String.format("group status=%s not support restart stream for groupId=%s", groupStatus, groupId)); } @@ -185,7 +189,7 @@ public class InlongStreamProcessService { ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus(); return processStatus == ProcessStatus.COMPLETED; } else { - executorService.execute(() -> workflowService.start(processName, operator, processForm)); + EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm)); return true; } } @@ -235,7 +239,7 @@ public class InlongStreamProcessService { return false; } } else { - executorService.execute(() -> { + EXECUTOR_SERVICE.execute(() -> { WorkflowResult workflowResult = workflowService.start(processName, operator, processForm); ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus(); if (processStatus == ProcessStatus.COMPLETED) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 2afa702ca..633d46fca 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -504,7 +504,8 @@ public class InlongStreamServiceImpl implements InlongStreamService { /** * Update field information - * <p/>First physically delete the existing field information, and then add the field information of this batch + * <p/> + * First physically delete the existing field information, and then add the field information of this batch */ @Transactional(rollbackFor = Throwable.class) void updateField(String groupId, String streamId, List<StreamField> fieldList) { @@ -525,7 +526,7 @@ public class InlongStreamServiceImpl implements InlongStreamService { if (CollectionUtils.isEmpty(infoList)) { return; } - infoList.stream().forEach(streamField -> streamField.setId(null)); + infoList.forEach(streamField -> streamField.setId(null)); List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(infoList, InlongStreamFieldEntity::new); for (InlongStreamFieldEntity entity : list) { diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java index 9306364bc..d07626cae 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java @@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.protocol.HttpContext; +import org.apache.inlong.common.constant.ProtocolType; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -79,8 +80,8 @@ public class RestTemplateConfig { public PoolingHttpClientConnectionManager httpClientConnectionManager() { // Support HTTP, HTTPS Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create() - .register("http", PlainConnectionSocketFactory.getSocketFactory()) - .register("https", SSLConnectionSocketFactory.getSocketFactory()) + .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory()) + .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory()) .build(); PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager( registry); diff --git a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties index b02b1d211..862eba470 100644 --- a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties +++ b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties @@ -18,7 +18,7 @@ # # Log level -logging.level.root=INFO +logging.level.root=info logging.level.org.apache.inlong.manager=debug spring.sql.init.platform=h2 @@ -31,7 +31,7 @@ spring.datasource.username=root spring.datasource.password="" # Audit configuration -# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH] +# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH|CLICKHOUSE] audit.query.source=MYSQL # Elasticsearch config @@ -47,7 +47,7 @@ es.auth.user=admin es.auth.password=inlong # ClickHouse config -# ClickHouse url +# ClickHouse jdbcUrl audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit # ClickHouse username audit.ck.username=default diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java index 9eae25d0e..3b0f6d107 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java @@ -62,7 +62,7 @@ public class OpenAPIFilter implements Filter { SecretToken token = parseBasicAuth(httpServletRequest); subject.login(token); } catch (Exception ex) { - LOGGER.error("login error, msg: {}", ex.getMessage()); + LOGGER.error("login error: {}", ex.getMessage()); ((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage()); return; } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java index 1e0b8c976..5408daefc 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java @@ -70,7 +70,7 @@ public class AuthenticationFilter implements Filter { UsernamePasswordToken token = getPasswordToken(servletRequest); subject.login(token); } catch (Exception ex) { - LOGGER.error("login error, msg: {}", ex.getMessage()); + LOGGER.error("login error: {}", ex.getMessage()); ((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage()); return; } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java index 68022eef1..79380d0d8 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java @@ -17,8 +17,6 @@ package org.apache.inlong.manager.web.config; -import java.nio.charset.StandardCharsets; -import java.util.List; import lombok.Data; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -31,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.protocol.HttpContext; +import org.apache.inlong.common.constant.ProtocolType; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -43,6 +42,9 @@ import org.springframework.util.ObjectUtils; import org.springframework.web.client.DefaultResponseErrorHandler; import org.springframework.web.client.RestTemplate; +import java.nio.charset.StandardCharsets; +import java.util.List; + @Data @Configuration @ConditionalOnMissingBean(RestTemplate.class) @@ -84,8 +86,8 @@ public class RestTemplateConfig { public PoolingHttpClientConnectionManager httpClientConnectionManager() { // Support HTTP, HTTPS Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create() - .register("http", PlainConnectionSocketFactory.getSocketFactory()) - .register("https", SSLConnectionSocketFactory.getSocketFactory()) + .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory()) + .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory()) .build(); PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager( registry); diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index b11bfdf1f..dfac2d818 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -44,6 +44,7 @@ spring.datasource.druid.testOnReturn=false spring.datasource.druid.filters=stat,wall # Open the mergeSql function through the connectProperties property, Slow SQL records spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 + # Exclude ElasticsearchRestClientAutoConfiguration spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration @@ -64,7 +65,7 @@ es.auth.user=admin es.auth.password=inlong # ClickHouse config -# ClickHouse url +# ClickHouse jdbcUrl audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit # ClickHouse username audit.ck.username=default diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 203124fc2..53dd907c1 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -32,7 +32,7 @@ spring.datasource.druid.initialSize=20 spring.datasource.druid.minIdle=20 spring.datasource.druid.maxActive=300 # Configure the timeout period to wait for the connection to be acquired -spring.datasource.druid.maxWait=6000 +spring.datasource.druid.maxWait=600000 # Configure the minimum survival time of a connection in the pool, in milliseconds spring.datasource.druid.minEvictableIdleTimeMillis=3600000 # Detect when applying for connection. It is recommended to configure it to true, which does not affect performance and ensures safety @@ -45,6 +45,7 @@ spring.datasource.druid.testOnReturn=false spring.datasource.druid.filters=stat,wall # Open the mergeSql function through the connectProperties property, Slow SQL records spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 + # Exclude ElasticsearchRestClientAutoConfiguration spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration @@ -65,7 +66,7 @@ es.auth.user=admin es.auth.password=inlong # ClickHouse config -# ClickHouse url +# ClickHouse jdbcUrl audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit # ClickHouse username audit.ck.username=default diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java index 581024506..2bb692925 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java @@ -29,6 +29,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS; +import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE; +import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE; + /** * Process event listener */ @@ -43,11 +48,11 @@ public interface ProcessEventListener extends EventListener<ProcessEvent> { * Async process common thread pool */ ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( - 20, - 40, - 0L, + CORE_POOL_SIZE, + MAX_POOL_SIZE, + ALIVE_TIME_MS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), + new LinkedBlockingQueue<>(QUEUE_SIZE), new ThreadFactoryBuilder().setNameFormat("inlong-workflow-%s").build(), new CallerRunsPolicy()); diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java index ac61cfb5b..d157e9ecb 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java @@ -105,21 +105,23 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> { WorkflowContext.ActionContext actionContext = context.getActionContext(); if (actionContext == null) { resetActionContext(context); + actionContext = context.getActionContext(); } - WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity(); - Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())), - "task status should allow complete"); + WorkflowTaskEntity taskEntity = actionContext.getTaskEntity(); + Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(taskEntity.getStatus())), + String.format("task status %s not allowed to complete", taskEntity.getStatus())); + try { ListenerResult listenerResult = this.taskEventNotifier.notify(TaskEvent.COMPLETE, context); if (!listenerResult.isSuccess()) { - failedTask(context, workflowTaskEntity); + failedTask(context, taskEntity); } else { - completeTaskEntity(context, workflowTaskEntity, TaskStatus.COMPLETED); + completeTaskEntity(context, taskEntity, TaskStatus.COMPLETED); } return listenerResult.isSuccess(); } catch (Exception e) { - log.error("Complete service task failed", e); - failedTask(context, workflowTaskEntity); + log.error("failed to complete service task: " + taskEntity, e); + failedTask(context, taskEntity); return false; } } @@ -139,12 +141,14 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> { taskQuery.setProcessId(processId); taskQuery.setName(serviceName); List<WorkflowTaskEntity> taskEntities = taskEntityMapper.selectByQuery(taskQuery); + WorkflowTaskEntity taskEntity; if (CollectionUtils.isEmpty(taskEntities)) { taskEntity = saveTaskEntity(serviceTask, context); } else { taskEntity = taskEntities.get(0); } + ActionContext actionContext = new WorkflowContext.ActionContext() .setTask((WorkflowTask) context.getCurrentElement()) .setAction(WorkflowAction.COMPLETE) diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java index d3b26786e..2b98103a8 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java @@ -17,10 +17,10 @@ package org.apache.inlong.manager.workflow.processor; -import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.enums.ProcessStatus; -import org.apache.inlong.manager.common.exceptions.JsonException; +import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity; import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper; @@ -40,11 +40,10 @@ import java.util.Date; /** * Start event handler */ +@Slf4j @Service public class StartEventProcessor extends AbstractNextableElementProcessor<StartEvent> { - @Autowired - private ObjectMapper objectMapper; @Autowired private ProcessEventNotifier processEventNotifier; @Autowired @@ -61,13 +60,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE WorkflowProcess process = context.getProcess(); ProcessForm form = context.getProcessForm(); if (process.getFormClass() != null) { - Preconditions.checkNotNull(form, "form cannot be null"); + Preconditions.checkNotNull(form, "process form cannot be null"); Preconditions.checkTrue(form.getClass().isAssignableFrom(process.getFormClass()), - "form type not match, should be class " + process.getFormClass()); + String.format("form type %s should match the process form type %s", + form.getClass(), process.getFormClass())); form.validate(); } else { - Preconditions.checkNull(form, "no form required"); + log.warn("not need to provide the form info"); + context.setProcessForm(null); } + WorkflowProcessEntity processEntity = saveProcessEntity(applicant, process, form); context.setProcessEntity(processEntity); context.setActionContext(new WorkflowContext.ActionContext().setAction(WorkflowAction.START)); @@ -91,18 +93,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE processEntity.setDisplayName(process.getDisplayName()); processEntity.setType(process.getType()); processEntity.setTitle(form.getTitle()); + processEntity.setInlongGroupId(form.getInlongGroupId()); if (form instanceof StreamResourceProcessForm) { StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form; processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId()); } + processEntity.setApplicant(applicant); processEntity.setStatus(ProcessStatus.PROCESSING.name()); - try { - processEntity.setFormData(objectMapper.writeValueAsString(form)); - } catch (Exception e) { - throw new JsonException("write form to json error: ", e); - } + processEntity.setFormData(JsonUtils.toJsonString(form)); processEntity.setStartTime(new Date()); processEntity.setHidden(process.getHidden());
