This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6eacd2fab [INLONG-1858][Manager] Fix adding columns for Hive not take
effect error (#4554)
6eacd2fab is described below
commit 6eacd2fab85157b514d3b79e57b750ec31a08508
Author: woofyzhao <[email protected]>
AuthorDate: Tue Jun 7 19:32:42 2022 +0800
[INLONG-1858][Manager] Fix adding columns for Hive not take effect error
(#4554)
---
.../inlong/manager/common/enums/FieldType.java | 2 +-
.../core/operation/InlongStreamProcessOperation.java | 5 +++--
.../manager/service/sink/StreamSinkServiceImpl.java | 20 +++++++++++++++-----
3 files changed, 19 insertions(+), 8 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index 901dec247..354f6852a 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -42,7 +42,7 @@ public enum FieldType {
public static FieldType forName(String name) {
Preconditions.checkNotNull(name, "FieldType should not be null");
for (FieldType value : values()) {
- if (value.toString().equals(name)) {
+ if (value.toString().equalsIgnoreCase(name)) {
return value;
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
index 3ad697091..47f1e9523 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
@@ -86,11 +86,12 @@ public class InlongStreamProcessOperation {
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
}
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
- if (status == StreamStatus.CONFIG_ING || status ==
StreamStatus.CONFIG_SUCCESSFUL) {
+ if (status == StreamStatus.CONFIG_ING) {
log.warn("GroupId={}, StreamId={} is already in {}", groupId,
streamId, status);
return true;
}
- if (status != StreamStatus.NEW || status !=
StreamStatus.CONFIG_FAILED) {
+ if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
+ && status != StreamStatus.CONFIG_SUCCESSFUL) {
throw new BusinessException(
String.format("GroupId=%s, StreamId=%s, status=%s not
correct for stream start", groupId, streamId,
status));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 455b87845..29097b0be 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -41,10 +42,12 @@ import
org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import
org.apache.inlong.manager.service.core.operation.InlongStreamProcessOperation;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -62,7 +65,6 @@ import java.util.Objects;
public class StreamSinkServiceImpl implements StreamSinkService {
private static final Logger LOGGER =
LoggerFactory.getLogger(StreamSinkServiceImpl.class);
-
@Autowired
private SinkOperationFactory operationFactory;
@Autowired
@@ -71,6 +73,9 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
private StreamSinkEntityMapper sinkMapper;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
+ @Autowired
+ private AutowireCapableBeanFactory autowireCapableBeanFactory;
+ private InlongStreamProcessOperation streamProcessOperation;
@Override
@Transactional(rollbackFor = Throwable.class)
@@ -192,7 +197,7 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
String streamId = request.getInlongStreamId();
String sinkName = request.getSinkName();
String sinkType = request.getSinkType();
- InlongGroupEntity groupEntity =
groupCheckService.checkGroupStatus(groupId, operator);
+ final InlongGroupEntity groupEntity =
groupCheckService.checkGroupStatus(groupId, operator);
// Check whether the sink name exists with the same groupId and
streamId
List<StreamSinkEntity> sinkList =
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
@@ -214,9 +219,14 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
// The inlong group status is [Configuration successful], then
asynchronously initiate
// the [Single inlong stream resource creation] workflow
-// if
(EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus()))
{
-// executorService.execute(new WorkflowStartRunnable(operator,
groupEntity, streamId));
-// }
+ if
(GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+ // To work around the circular reference check we manually
instantiate and wire
+ if (streamProcessOperation == null) {
+ streamProcessOperation = new InlongStreamProcessOperation();
+
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+ }
+ streamProcessOperation.startProcess(groupId, streamId, operator,
true);
+ }
LOGGER.info("success to update sink info: {}", request);
return true;
}