This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5b061ac34 [INLONG-4473][Manager] Fix DuplicateKeyException when save
StreamSinkField (#4475)
5b061ac34 is described below
commit 5b061ac3433ea0f495707bd2b27d33646308ee14
Author: kipshi <[email protected]>
AuthorDate: Wed Jun 1 17:01:45 2022 +0800
[INLONG-4473][Manager] Fix DuplicateKeyException when save StreamSinkField
(#4475)
---
.../manager/service/core/impl/InlongStreamServiceImpl.java | 1 +
.../inlong/manager/service/sink/StreamSinkServiceImpl.java | 11 +++++++++++
.../inlong/manager/service/sort/util/FieldInfoUtils.java | 2 +-
.../manager/service/source/StreamSourceServiceImpl.java | 11 +++++++++++
4 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 03da93075..5a5c93abe 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -605,6 +605,7 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
if (CollectionUtils.isEmpty(infoList)) {
return;
}
+ infoList.stream().forEach(streamField -> streamField.setId(null));
List<InlongStreamFieldEntity> list =
CommonBeanUtils.copyListProperties(infoList,
InlongStreamFieldEntity::new);
for (InlongStreamFieldEntity entity : list) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index c99c94b3b..026461985 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -95,6 +96,11 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
// According to the sink type, save sink information
StreamSinkOperation operation =
operationFactory.getInstance(SinkType.forType(sinkType));
+ List<SinkField> fields = request.getFieldList();
+ // Remove id in sinkField when save
+ if (CollectionUtils.isNotEmpty(fields)) {
+ fields.stream().forEach(sinkField -> sinkField.setId(null));
+ }
int id = operation.saveOpt(request, operator);
LOGGER.info("success to save sink info: {}", request);
@@ -197,6 +203,11 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
throw new BusinessException(String.format(err, sinkName,
groupId, streamId));
}
}
+ List<SinkField> fields = request.getFieldList();
+ // Remove id in sinkField when save
+ if (CollectionUtils.isNotEmpty(fields)) {
+ fields.stream().forEach(sinkField -> sinkField.setId(null));
+ }
StreamSinkOperation operation =
operationFactory.getInstance(SinkType.forType(sinkType));
operation.updateOpt(request, operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index e9859592a..f29557a64 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -119,7 +119,7 @@ public class FieldInfoUtils {
// TODO The meta field needs to be selectable and cannot be filled
in by the user
return new MetaFieldInfo(fieldName,
MetaField.forName(metaFieldName));
} else {
- return new FieldInfo(fieldName,
convertFieldFormat(fieldType.toLowerCase(), format));
+ return new FieldInfo(fieldName, convertFieldFormat(fieldType,
format));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 8cc655ed4..b3993d071 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -33,6 +33,7 @@ import
org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -84,6 +85,11 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
// According to the source type, save source information
String sourceType = request.getSourceType();
StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(sourceType));
+ // Remove id in sourceField when save
+ List<StreamField> streamFields = request.getFieldList();
+ if (CollectionUtils.isNotEmpty(streamFields)) {
+ streamFields.stream().forEach(streamField ->
streamField.setId(null));
+ }
int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
LOGGER.info("success to save source info: {}", request);
@@ -167,6 +173,11 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
String sourceType = request.getSourceType();
StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(sourceType));
+ // Remove id in sourceField when save
+ List<StreamField> streamFields = request.getFieldList();
+ if (CollectionUtils.isNotEmpty(streamFields)) {
+ streamFields.stream().forEach(streamField ->
streamField.setId(null));
+ }
operation.updateOpt(request, groupEntity.getStatus(), operator);
LOGGER.info("success to update source info: {}", request);