This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eacd2fab [INLONG-1858][Manager] Fix adding columns for Hive not take 
effect error  (#4554)
6eacd2fab is described below

commit 6eacd2fab85157b514d3b79e57b750ec31a08508
Author: woofyzhao <[email protected]>
AuthorDate: Tue Jun 7 19:32:42 2022 +0800

    [INLONG-1858][Manager] Fix adding columns for Hive not take effect error  
(#4554)
---
 .../inlong/manager/common/enums/FieldType.java       |  2 +-
 .../core/operation/InlongStreamProcessOperation.java |  5 +++--
 .../manager/service/sink/StreamSinkServiceImpl.java  | 20 +++++++++++++++-----
 3 files changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index 901dec247..354f6852a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -42,7 +42,7 @@ public enum FieldType {
     public static FieldType forName(String name) {
         Preconditions.checkNotNull(name, "FieldType should not be null");
         for (FieldType value : values()) {
-            if (value.toString().equals(name)) {
+            if (value.toString().equalsIgnoreCase(name)) {
                 return value;
             }
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
index 3ad697091..47f1e9523 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
@@ -86,11 +86,12 @@ public class InlongStreamProcessOperation {
             throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
         }
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
-        if (status == StreamStatus.CONFIG_ING || status == 
StreamStatus.CONFIG_SUCCESSFUL) {
+        if (status == StreamStatus.CONFIG_ING) {
             log.warn("GroupId={}, StreamId={} is already in {}", groupId, 
streamId, status);
             return true;
         }
-        if (status != StreamStatus.NEW || status != 
StreamStatus.CONFIG_FAILED) {
+        if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
+                && status != StreamStatus.CONFIG_SUCCESSFUL) {
             throw new BusinessException(
                     String.format("GroupId=%s, StreamId=%s, status=%s not 
correct for stream start", groupId, streamId,
                             status));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 455b87845..29097b0be 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -41,10 +42,12 @@ import 
org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import 
org.apache.inlong.manager.service.core.operation.InlongStreamProcessOperation;
 import org.apache.inlong.manager.service.group.GroupCheckService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -62,7 +65,6 @@ import java.util.Objects;
 public class StreamSinkServiceImpl implements StreamSinkService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamSinkServiceImpl.class);
-
     @Autowired
     private SinkOperationFactory operationFactory;
     @Autowired
@@ -71,6 +73,9 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     private StreamSinkEntityMapper sinkMapper;
     @Autowired
     private StreamSinkFieldEntityMapper sinkFieldMapper;
+    @Autowired
+    private AutowireCapableBeanFactory autowireCapableBeanFactory;
+    private InlongStreamProcessOperation streamProcessOperation;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -192,7 +197,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         String streamId = request.getInlongStreamId();
         String sinkName = request.getSinkName();
         String sinkType = request.getSinkType();
-        InlongGroupEntity groupEntity = 
groupCheckService.checkGroupStatus(groupId, operator);
+        final InlongGroupEntity groupEntity = 
groupCheckService.checkGroupStatus(groupId, operator);
 
         // Check whether the sink name exists with the same groupId and 
streamId
         List<StreamSinkEntity> sinkList = 
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
@@ -214,9 +219,14 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
 
         // The inlong group status is [Configuration successful], then 
asynchronously initiate
         // the [Single inlong stream resource creation] workflow
-//        if 
(EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus()))
 {
-//            executorService.execute(new WorkflowStartRunnable(operator, 
groupEntity, streamId));
-//        }
+        if 
(GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+            // To work around the circular reference check we manually 
instantiate and wire
+            if (streamProcessOperation == null) {
+                streamProcessOperation = new InlongStreamProcessOperation();
+                
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+            }
+            streamProcessOperation.startProcess(groupId, streamId, operator, 
true);
+        }
         LOGGER.info("success to update sink info: {}", request);
         return true;
     }

Reply via email to