This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 011d12bb0 [INLONG-4070][Manager] Add update and delete APIs for inlong
stream in manager client (#4074)
011d12bb0 is described below
commit 011d12bb07d0f652ecf3f5e103f27c1129324caa
Author: kipshi <[email protected]>
AuthorDate: Thu May 5 22:04:05 2022 +0800
[INLONG-4070][Manager] Add update and delete APIs for inlong stream in
manager client (#4074)
---
.../inlong/manager/client/api/InlongStream.java | 6 +-
.../api/impl/DefaultInlongStreamBuilder.java | 162 ++++++++++----------
.../manager/client/api/impl/InlongGroupImpl.java | 4 +-
.../manager/client/api/impl/InlongStreamImpl.java | 165 +++++++++++++++++++--
.../client/api/inner/InnerInlongManagerClient.java | 56 +++++++
.../client/api/impl/InlongStreamImplTest.java | 4 +-
.../manager/service/sort/util/FieldInfoUtils.java | 18 ++-
.../manager/service/sort/util/LoadNodeUtils.java | 10 +-
8 files changed, 311 insertions(+), 114 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index 169b7befa..26a3a2c6f 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -46,9 +46,5 @@ public abstract class InlongStream {
public abstract StreamPipeline createPipeline();
- @Deprecated
- public abstract void updateSource(StreamSource source);
-
- @Deprecated
- public abstract void updateSink(StreamSink sink);
+ public abstract InlongStream update();
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 60f20df6f..c94d39ee6 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -25,9 +25,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.common.pojo.stream.StreamSink;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
@@ -36,8 +33,6 @@ import
org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
-import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
@@ -46,6 +41,9 @@ import
org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
@@ -72,7 +70,8 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
InnerStreamContext streamContext = new InnerStreamContext(stream);
groupContext.setStreamContext(streamContext);
this.streamContext = streamContext;
- this.inlongStream = new InlongStreamImpl(stream.getName());
+ this.inlongStream = new
InlongStreamImpl(groupContext.getGroupInfo().getName(), stream.getName(),
+ managerClient);
if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
this.inlongStream.setStreamFields(streamConf.getStreamFields());
}
@@ -150,13 +149,12 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
Pair<Boolean, InlongStreamInfo> existMsg =
managerClient.isStreamExists(dataStreamInfo);
if (existMsg.getKey()) {
Pair<Boolean, String> updateMsg =
managerClient.updateStreamInfo(dataStreamInfo);
- if (updateMsg.getKey()) {
- initOrUpdateTransform();
- initOrUpdateSource();
- initOrUpdateSink();
- } else {
+ if (!updateMsg.getKey()) {
throw new RuntimeException(String.format("Update data stream
failed:%s", updateMsg.getValue()));
}
+ initOrUpdateTransform();
+ initOrUpdateSource();
+ initOrUpdateSink();
return inlongStream;
} else {
return init();
@@ -169,6 +167,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
List<TransformResponse> transformResponses =
managerClient.listTransform(groupId, streamId);
+ List<String> updateTransformNames = Lists.newArrayList();
for (TransformResponse transformResponse : transformResponses) {
StreamTransform transform =
InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
String transformName = transform.getTransformName();
@@ -186,93 +185,94 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
throw new RuntimeException(String.format("Update
transform=%s failed with err=%s", transformRequest,
updateState.getValue()));
}
+ transformRequest.setId(transformResponse.getId());
+ updateTransformNames.add(transformName);
}
}
- }
-
- private void initOrUpdateSource() {
- List<SourceRequest> sourceRequests =
Lists.newArrayList(streamContext.getSourceRequests().values());
- for (SourceRequest sourceRequest : sourceRequests) {
- sourceRequest.setId(initOrUpdateSource(sourceRequest));
+ for (Map.Entry<String, TransformRequest> requestEntry :
transformRequests.entrySet()) {
+ String transformName = requestEntry.getKey();
+ if (updateTransformNames.contains(transformName)) {
+ continue;
+ }
+ TransformRequest transformRequest = requestEntry.getValue();
+ String index = managerClient.createTransform(transformRequest);
+ transformRequest.setId(Double.valueOf(index).intValue());
}
}
- private int initOrUpdateSource(SourceRequest sourceRequest) {
- String sourceType = sourceRequest.getSourceType();
- if (SourceType.KAFKA.name().equals(sourceType) ||
SourceType.BINLOG.name().equals(sourceType)) {
- List<SourceListResponse> responses =
managerClient.listSources(sourceRequest.getInlongGroupId(),
- sourceRequest.getInlongStreamId(),
sourceRequest.getSourceType());
- if (CollectionUtils.isEmpty(responses)) {
- String sourceIndex = managerClient.createSource(sourceRequest);
- return Double.valueOf(sourceIndex).intValue();
- } else {
- SourceListResponse sourceListResponse = null;
- for (SourceListResponse response : responses) {
- if
(response.getSourceName().equals(sourceRequest.getSourceName())) {
- sourceListResponse = response;
- break;
- }
+ private void initOrUpdateSource() {
+ Map<String, SourceRequest> sourceRequests =
streamContext.getSourceRequests();
+ InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ List<SourceListResponse> sourceListResponses =
managerClient.listSources(groupId, streamId);
+ List<String> updateSourceNames = Lists.newArrayList();
+ for (SourceListResponse sourceListResponse : sourceListResponses) {
+ String sourceName = sourceListResponse.getSourceName();
+ int id = sourceListResponse.getId();
+ String type = sourceListResponse.getSourceType();
+ if (sourceRequests.get(sourceName) == null) {
+ boolean isDelete = managerClient.deleteSource(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete source=%s
failed", sourceListResponse));
}
- if (sourceListResponse == null) {
- String sourceIndex =
managerClient.createSource(sourceRequest);
- return Double.valueOf(sourceIndex).intValue();
+ } else {
+ SourceRequest sourceRequest = sourceRequests.get(sourceName);
+ Pair<Boolean, String> updateState =
managerClient.updateSource(sourceRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update source=%s
failed with err=%s", sourceRequest,
+ updateState.getValue()));
}
+ updateSourceNames.add(sourceName);
sourceRequest.setId(sourceListResponse.getId());
- Pair<Boolean, String> updateMsg =
managerClient.updateSource(sourceRequest);
- if (updateMsg.getKey()) {
- return sourceListResponse.getId();
- } else {
- throw new RuntimeException(
- String.format("Update source:%s failed with
ex:%s", GsonUtil.toJson(sourceRequest),
- updateMsg.getValue()));
- }
}
- } else {
- throw new IllegalArgumentException(String.format("Unsupported
source type:%s", sourceType));
}
- }
-
- private void initOrUpdateSink() {
- List<SinkRequest> sinkRequests =
Lists.newArrayList(streamContext.getSinkRequests().values());
- for (SinkRequest sinkRequest : sinkRequests) {
- sinkRequest.setId(initOrUpdateSink(sinkRequest));
+ for (Map.Entry<String, SourceRequest> requestEntry :
sourceRequests.entrySet()) {
+ String sourceName = requestEntry.getKey();
+ if (updateSourceNames.contains(sourceName)) {
+ continue;
+ }
+ SourceRequest sourceRequest = requestEntry.getValue();
+ String index = managerClient.createSource(sourceRequest);
+ sourceRequest.setId(Double.valueOf(index).intValue());
}
}
- private int initOrUpdateSink(SinkRequest sinkRequest) {
- String sinkType = sinkRequest.getSinkType();
- boolean flag = SinkType.HIVE.name().equals(sinkType) ||
SinkType.KAFKA.name().equals(sinkType)
- || SinkType.CLICKHOUSE.name().equals(sinkType);
- if (flag) {
- List<SinkListResponse> responses =
managerClient.listSinks(sinkRequest.getInlongGroupId(),
- sinkRequest.getInlongStreamId(),
sinkRequest.getSinkType());
- if (CollectionUtils.isEmpty(responses)) {
- String sinkIndex = managerClient.createSink(sinkRequest);
- return Double.valueOf(sinkIndex).intValue();
- } else {
- SinkListResponse sinkListResponse = null;
- for (SinkListResponse response : responses) {
- if
(response.getSinkName().equals(sinkRequest.getSinkName())) {
- sinkListResponse = response;
- break;
- }
+ private void initOrUpdateSink() {
+ Map<String, SinkRequest> sinkRequests =
streamContext.getSinkRequests();
+ InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ List<SinkListResponse> sinkListResponses =
managerClient.listSinks(groupId, streamId);
+ List<String> updateSinkNames = Lists.newArrayList();
+ for (SinkListResponse sinkListResponse : sinkListResponses) {
+ String sinkName = sinkListResponse.getSinkName();
+ int id = sinkListResponse.getId();
+ String type = sinkListResponse.getSinkType();
+ if (sinkRequests.get(sinkName) == null) {
+ boolean isDelete = managerClient.deleteSink(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete sink=%s
failed", sinkListResponse));
}
- if (sinkListResponse == null) {
- String sinkIndex = managerClient.createSink(sinkRequest);
- return Double.valueOf(sinkIndex).intValue();
+ } else {
+ SinkRequest sinkRequest = sinkRequests.get(sinkName);
+ Pair<Boolean, String> updateState =
managerClient.updateSink(sinkRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update sink=%s
failed with err=%s", sinkRequest,
+ updateState.getValue()));
}
+ updateSinkNames.add(sinkName);
sinkRequest.setId(sinkListResponse.getId());
- Pair<Boolean, String> updateMsg =
managerClient.updateSink(sinkRequest);
- if (updateMsg.getKey()) {
- return sinkListResponse.getId();
- } else {
- throw new RuntimeException(
- String.format("Update sink:%s failed with ex:%s",
GsonUtil.toJson(sinkRequest),
- updateMsg.getValue()));
- }
}
- } else {
- throw new IllegalArgumentException(String.format("Unsupported sink
type:%s", sinkType));
+ }
+ for (Map.Entry<String, SinkRequest> requestEntry :
sinkRequests.entrySet()) {
+ String sinkName = requestEntry.getKey();
+ if (updateSinkNames.contains(sinkName)) {
+ continue;
+ }
+ SinkRequest sinkRequest = requestEntry.getValue();
+ String index = managerClient.createSink(sinkRequest);
+ sinkRequest.setId(Double.valueOf(index).intValue());
}
}
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8f7e1b360..825ba0689 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -271,6 +271,8 @@ public class InlongGroupImpl implements InlongGroup {
if (CollectionUtils.isEmpty(streamResponses)) {
return null;
}
- return
streamResponses.stream().map(InlongStreamImpl::new).collect(Collectors.toList());
+ return streamResponses.stream()
+ .map(fullStreamResponse -> new
InlongStreamImpl(fullStreamResponse, managerClient))
+ .collect(Collectors.toList());
}
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 1915eaa96..60515cf1f 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -22,19 +22,23 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.common.pojo.stream.StreamSink;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
+import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
@@ -42,6 +46,11 @@ import
org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelationship;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import java.util.List;
import java.util.Map;
@@ -50,9 +59,12 @@ import java.util.stream.Collectors;
@Data
@EqualsAndHashCode(callSuper = true)
-@NoArgsConstructor
public class InlongStreamImpl extends InlongStream {
+ private InnerInlongManagerClient managerClient;
+
+ private String groupName;
+
private String name;
private Map<String, StreamSource> streamSources = Maps.newHashMap();
@@ -63,9 +75,11 @@ public class InlongStreamImpl extends InlongStream {
private List<StreamField> streamFields = Lists.newArrayList();
- public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
+ public InlongStreamImpl(FullStreamResponse fullStreamResponse,
InnerInlongManagerClient managerClient) {
InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
+ this.managerClient = managerClient;
this.name = streamInfo.getName();
+ this.groupName = streamInfo.getInlongGroupId().substring(2);
List<InlongStreamFieldInfo> streamFieldInfos =
streamInfo.getFieldList();
if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
this.streamFields = streamFieldInfos.stream()
@@ -106,7 +120,9 @@ public class InlongStreamImpl extends InlongStream {
}
- public InlongStreamImpl(String name) {
+ public InlongStreamImpl(String group, String name,
InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ this.groupName = group;
this.name = name;
}
@@ -234,15 +250,136 @@ public class InlongStreamImpl extends InlongStream {
}
@Override
- public void updateSource(StreamSource source) {
- AssertUtil.notNull(source.getSourceName(), "Source name should not be
empty");
- streamSources.put(source.getSourceName(), source);
+ public InlongStream update() {
+ InlongStreamInfo streamInfo = new InlongStreamInfo();
+ streamInfo.setInlongStreamId("b_" + name);
+ streamInfo.setInlongGroupId("b_" + groupName);
+ streamInfo = managerClient.getStreamInfo(streamInfo);
+ if (streamInfo == null) {
+ throw new IllegalArgumentException(
+ String.format("Stream is not exists for group=%s and
stream=%s", groupName, name));
+ }
+
streamInfo.setFieldList(InlongStreamTransfer.createStreamFields(this.streamFields,
streamInfo));
+ StreamPipeline streamPipeline = createPipeline();
+ streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
+ Pair<Boolean, String> updateMsg =
managerClient.updateStreamInfo(streamInfo);
+ if (!updateMsg.getKey()) {
+ throw new RuntimeException(String.format("Update data stream
failed:%s", updateMsg.getValue()));
+ }
+ initOrUpdateTransform(streamInfo);
+ initOrUpdateSource(streamInfo);
+ initOrUpdateSink(streamInfo);
+ return this;
}
- @Override
- public void updateSink(StreamSink sink) {
- AssertUtil.notNull(sink.getSinkName(), "Sink name should not be
empty");
- streamSinks.put(sink.getSinkName(), sink);
+ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
+ final String groupId = "b_" + groupName;
+ final String streamId = "b_" + name;
+ List<TransformResponse> transformResponses =
managerClient.listTransform(groupId, streamId);
+ List<String> updateTransformNames = Lists.newArrayList();
+ for (TransformResponse transformResponse : transformResponses) {
+ StreamTransform transform =
InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
+ String transformName = transform.getTransformName();
+ if (this.streamTransforms.get(transformName) == null) {
+ TransformRequest transformRequest =
InlongStreamTransformTransfer.createTransformRequest(transform,
+ streamInfo);
+ boolean isDelete =
managerClient.deleteTransform(transformRequest);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete
transform=%s failed", transformRequest));
+ }
+ } else {
+ StreamTransform newTransform =
this.streamTransforms.get(transformName);
+ TransformRequest transformRequest =
InlongStreamTransformTransfer.createTransformRequest(newTransform,
+ streamInfo);
+ Pair<Boolean, String> updateState =
managerClient.updateTransform(transformRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update
transform=%s failed with err=%s", transformRequest,
+ updateState.getValue()));
+ }
+ updateTransformNames.add(transformName);
+ }
+ }
+ for (Map.Entry<String, StreamTransform> transformEntry :
this.streamTransforms.entrySet()) {
+ String transformName = transformEntry.getKey();
+ if (updateTransformNames.contains(transformName)) {
+ continue;
+ }
+ StreamTransform transform = transformEntry.getValue();
+ TransformRequest transformRequest =
InlongStreamTransformTransfer.createTransformRequest(transform,
+ streamInfo);
+ managerClient.createTransform(transformRequest);
+ }
+ }
+
+ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
+ final String groupId = "b_" + groupName;
+ final String streamId = "b_" + name;
+ List<SourceListResponse> sourceListResponses =
managerClient.listSources(groupId, streamId);
+ List<String> updateSourceNames = Lists.newArrayList();
+ for (SourceListResponse sourceListResponse : sourceListResponses) {
+ String sourceName = sourceListResponse.getSourceName();
+ int id = sourceListResponse.getId();
+ String type = sourceListResponse.getSourceType();
+ if (this.streamSources.get(sourceName) == null) {
+ boolean isDelete = managerClient.deleteSource(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete source=%s
failed", sourceListResponse));
+ }
+ } else {
+ StreamSource source = this.streamSources.get(sourceName);
+ SourceRequest sourceRequest =
InlongStreamSourceTransfer.createSourceRequest(source, streamInfo);
+ Pair<Boolean, String> updateState =
managerClient.updateSource(sourceRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update source=%s
failed with err=%s", sourceRequest,
+ updateState.getValue()));
+ }
+ updateSourceNames.add(sourceName);
+ }
+ }
+ for (Map.Entry<String, StreamSource> requestEntry :
streamSources.entrySet()) {
+ String sourceName = requestEntry.getKey();
+ if (updateSourceNames.contains(sourceName)) {
+ continue;
+ }
+ StreamSource streamSource = requestEntry.getValue();
+ SourceRequest sourceRequest =
InlongStreamSourceTransfer.createSourceRequest(streamSource, streamInfo);
+ managerClient.createSource(sourceRequest);
+ }
}
+ private void initOrUpdateSink(InlongStreamInfo streamInfo) {
+ final String groupId = "b_" + groupName;
+ final String streamId = "b_" + name;
+ List<SinkListResponse> sinkListResponses =
managerClient.listSinks(groupId, streamId);
+ List<String> updateSinkNames = Lists.newArrayList();
+ for (SinkListResponse sinkListResponse : sinkListResponses) {
+ String sinkName = sinkListResponse.getSinkName();
+ int id = sinkListResponse.getId();
+ String type = sinkListResponse.getSinkType();
+ if (this.streamSinks.get(sinkName) == null) {
+ boolean isDelete = managerClient.deleteSink(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete sink=%s
failed", sinkListResponse));
+ }
+ } else {
+ StreamSink sink = this.streamSinks.get(sinkName);
+ SinkRequest sinkRequest =
InlongStreamSinkTransfer.createSinkRequest(sink, streamInfo);
+ Pair<Boolean, String> updateState =
managerClient.updateSink(sinkRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update sink=%s
failed with err=%s", sinkRequest,
+ updateState.getValue()));
+ }
+ updateSinkNames.add(sinkName);
+ }
+ }
+ for (Map.Entry<String, StreamSink> requestEntry :
streamSinks.entrySet()) {
+ String sinkName = requestEntry.getKey();
+ if (updateSinkNames.contains(sinkName)) {
+ continue;
+ }
+ StreamSink streamSink = requestEntry.getValue();
+ SinkRequest sinkRequest =
InlongStreamSinkTransfer.createSinkRequest(streamSink, streamInfo);
+ managerClient.createSink(sinkRequest);
+ }
+ }
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index e8a2a6dd1..1e67fa5ad 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -481,6 +481,34 @@ public class InnerInlongManagerClient {
}
}
+ public boolean deleteSource(int id, String type) {
+ AssertUtil.isTrue(id > 0, "sourceId is illegal");
+ AssertUtil.notEmpty(type, "sourceType should not be null");
+ final String path = HTTP_PATH + "/source/delete/" + id;
+ String url = formatUrl(path);
+ url = String.format("%s&sourceType=%s", url, type);
+ RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), "");
+ Request request = new Request.Builder()
+ .url(url)
+ .method("DELETE", requestBody)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s",
responseBody.getErrMsg()));
+ return Boolean.parseBoolean(responseBody.getData().toString());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Inlong source delete failed: %s",
e.getMessage()), e);
+ }
+ }
+
public String createTransform(TransformRequest transformRequest) {
String path = HTTP_PATH + "/transform/save";
final String sink = GsonUtil.toJson(transformRequest);
@@ -611,6 +639,34 @@ public class InnerInlongManagerClient {
}
}
+ public boolean deleteSink(int id, String type) {
+ AssertUtil.isTrue(id > 0, "sinkId is illegal");
+ AssertUtil.notEmpty(type, "sinkType should not be null");
+ final String path = HTTP_PATH + "/sink/delete/" + id;
+ String url = formatUrl(path);
+ url = String.format("%s&sinkType=%s", url, type);
+ RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), "");
+ Request request = new Request.Builder()
+ .url(url)
+ .method("DELETE", requestBody)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s",
responseBody.getErrMsg()));
+ return Boolean.parseBoolean(responseBody.getData().toString());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Inlong sink delete failed: %s",
e.getMessage()), e);
+ }
+ }
+
public List<SinkListResponse> listSinks(String groupId, String streamId) {
return listSinks(groupId, streamId, null);
}
diff --git
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
index 19d179588..14e660c23 100644
---
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
+++
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.client.api.impl;
import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.client.api.sink.ClickHouseSink;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.sink.KafkaSink;
@@ -28,6 +27,7 @@ import
org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
import
org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
import
org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterStrategy;
import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
@@ -41,7 +41,7 @@ public class InlongStreamImplTest {
@Test
public void testCreatePipeline() {
- InlongStream inlongStream = new InlongStreamImpl();
+ InlongStream inlongStream = new InlongStreamImpl("group", "stream",
null);
// add stream source
KafkaSource kafkaSource = new KafkaSource();
kafkaSource.setSourceName("A");
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index 6538dd5c6..f0a19492d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -45,7 +45,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import
org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +58,7 @@ public class FieldInfoUtils {
/**
* Built in field map, key is field name, value is built in field name
*/
- public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new
HashMap<>();
+ public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new
LinkedHashMap<>();
static {
BUILT_IN_FIELD_MAP.put(MetaFieldType.DATA_TIME.getName(),
BuiltInField.DATA_TIME);
@@ -68,20 +68,28 @@ public class FieldInfoUtils {
BUILT_IN_FIELD_MAP.put(MetaFieldType.IS_DDL.getName(),
BuiltInField.MYSQL_METADATA_IS_DDL);
BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TYPE.getName(),
BuiltInField.MYSQL_METADATA_EVENT_TYPE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.PROCESSING_TIME.getName(),
BuiltInField.PROCESS_TIME);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(),
BuiltInField.MYSQL_METADATA_DATA);
BUILT_IN_FIELD_MAP.put(MetaFieldType.UPDATE_BEFORE.getName(),
BuiltInField.METADATA_UPDATE_BEFORE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.BATCH_ID.getName(),
BuiltInField.METADATA_BATCH_ID);
BUILT_IN_FIELD_MAP.put(MetaFieldType.SQL_TYPE.getName(),
BuiltInField.METADATA_SQL_TYPE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.TS.getName(),
BuiltInField.METADATA_TS);
BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_TYPE.getName(),
BuiltInField.METADATA_MYSQL_TYPE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.PK_NAMES.getName(),
BuiltInField.METADATA_PK_NAMES);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(),
BuiltInField.MYSQL_METADATA_DATA);
+ }
+
+ public static FieldInfo parseSinkFieldInfo(SinkFieldResponse
sinkFieldResponse, String nodeId) {
+ boolean isBuiltIn = sinkFieldResponse.getIsMetaField() == 1;
+ FieldInfo fieldInfo = getFieldInfo(sinkFieldResponse.getFieldName(),
sinkFieldResponse.getFieldType(),
+ isBuiltIn, sinkFieldResponse.getFieldFormat());
+ fieldInfo.setNodeId(nodeId);
+ return fieldInfo;
}
- public static FieldInfo parseStreamFieldInfo(InlongStreamFieldInfo
streamField, String name) {
+ public static FieldInfo parseStreamFieldInfo(InlongStreamFieldInfo
streamField, String nodeId) {
boolean isBuiltIn = streamField.getIsMetaField() == 1;
FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(),
streamField.getFieldType(), isBuiltIn,
streamField.getFieldFormat());
- fieldInfo.setNodeId(name);
+ fieldInfo.setNodeId(nodeId);
return fieldInfo;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 1ebd697ad..c4cae249c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -73,9 +73,8 @@ public class LoadNodeUtils {
String bootstrapServers = kafkaSinkResponse.getBootstrapServers();
List<SinkFieldResponse> sinkFieldResponses =
kafkaSinkResponse.getFieldList();
List<FieldInfo> fieldInfos = sinkFieldResponses.stream()
- .map(sinkFieldResponse -> new
FieldInfo(sinkFieldResponse.getFieldName(), name,
-
FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
-
sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
+ .map(sinkFieldResponse ->
FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
+ .collect(Collectors.toList());
List<FieldRelationShip> fieldRelationShips =
parseSinkFields(sinkFieldResponses, name);
Map<String, String> properties =
kafkaSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
@@ -127,9 +126,8 @@ public class LoadNodeUtils {
String hiveVersion = hiveSinkResponse.getHiveVersion();
List<SinkFieldResponse> sinkFieldResponses =
hiveSinkResponse.getFieldList();
List<FieldInfo> fields = sinkFieldResponses.stream()
- .map(sinkFieldResponse -> new
FieldInfo(sinkFieldResponse.getFieldName(), name,
-
FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
-
sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
+ .map(sinkFieldResponse ->
FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
+ .collect(Collectors.toList());
List<FieldRelationShip> fieldRelationShips =
parseSinkFields(sinkFieldResponses, name);
Map<String, String> properties =
hiveSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));