This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f006c1e19 [INLONG-4100][Manager] Add id in update request for source, 
sink, and transform (#4101)
f006c1e19 is described below

commit f006c1e1985bc85b8bdd44b9160991e37bdd44f9
Author: kipshi <[email protected]>
AuthorDate: Sat May 7 12:38:30 2022 +0800

    [INLONG-4100][Manager] Add id in update request for source, sink, and 
transform (#4101)
---
 .../client/api/impl/DefaultInlongStreamBuilder.java    | 18 +++++++++++-------
 .../manager/client/api/impl/InlongStreamImpl.java      | 18 +++++++++++-------
 .../common/pojo/stream/InlongStreamResponse.java       |  6 ++++++
 3 files changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index c94d39ee6..c9b0a8fd0 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -170,7 +170,8 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         List<String> updateTransformNames = Lists.newArrayList();
         for (TransformResponse transformResponse : transformResponses) {
             StreamTransform transform = 
InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
-            String transformName = transform.getTransformName();
+            final String transformName = transform.getTransformName();
+            final int id = transformResponse.getId();
             if (transformRequests.get(transformName) == null) {
                 TransformRequest transformRequest = 
InlongStreamTransformTransfer.createTransformRequest(transform,
                         streamInfo);
@@ -180,6 +181,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
                 }
             } else {
                 TransformRequest transformRequest = 
transformRequests.get(transformName);
+                transformRequest.setId(id);
                 Pair<Boolean, String> updateState = 
managerClient.updateTransform(transformRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update 
transform=%s failed with err=%s", transformRequest,
@@ -208,9 +210,9 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         List<SourceListResponse> sourceListResponses = 
managerClient.listSources(groupId, streamId);
         List<String> updateSourceNames = Lists.newArrayList();
         for (SourceListResponse sourceListResponse : sourceListResponses) {
-            String sourceName = sourceListResponse.getSourceName();
-            int id = sourceListResponse.getId();
-            String type = sourceListResponse.getSourceType();
+            final String sourceName = sourceListResponse.getSourceName();
+            final int id = sourceListResponse.getId();
+            final String type = sourceListResponse.getSourceType();
             if (sourceRequests.get(sourceName) == null) {
                 boolean isDelete = managerClient.deleteSource(id, type);
                 if (!isDelete) {
@@ -218,6 +220,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
                 }
             } else {
                 SourceRequest sourceRequest = sourceRequests.get(sourceName);
+                sourceRequest.setId(id);
                 Pair<Boolean, String> updateState = 
managerClient.updateSource(sourceRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update source=%s 
failed with err=%s", sourceRequest,
@@ -246,9 +249,9 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         List<SinkListResponse> sinkListResponses = 
managerClient.listSinks(groupId, streamId);
         List<String> updateSinkNames = Lists.newArrayList();
         for (SinkListResponse sinkListResponse : sinkListResponses) {
-            String sinkName = sinkListResponse.getSinkName();
-            int id = sinkListResponse.getId();
-            String type = sinkListResponse.getSinkType();
+            final String sinkName = sinkListResponse.getSinkName();
+            final int id = sinkListResponse.getId();
+            final String type = sinkListResponse.getSinkType();
             if (sinkRequests.get(sinkName) == null) {
                 boolean isDelete = managerClient.deleteSink(id, type);
                 if (!isDelete) {
@@ -256,6 +259,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
                 }
             } else {
                 SinkRequest sinkRequest = sinkRequests.get(sinkName);
+                sinkRequest.setId(id);
                 Pair<Boolean, String> updateState = 
managerClient.updateSink(sinkRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update sink=%s 
failed with err=%s", sinkRequest,
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 33b270249..11da4a2ef 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -321,7 +321,8 @@ public class InlongStreamImpl extends InlongStream {
         List<String> updateTransformNames = Lists.newArrayList();
         for (TransformResponse transformResponse : transformResponses) {
             StreamTransform transform = 
InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
-            String transformName = transform.getTransformName();
+            final String transformName = transform.getTransformName();
+            final int id = transformResponse.getId();
             if (this.streamTransforms.get(transformName) == null) {
                 TransformRequest transformRequest = 
InlongStreamTransformTransfer.createTransformRequest(transform,
                         streamInfo);
@@ -333,6 +334,7 @@ public class InlongStreamImpl extends InlongStream {
                 StreamTransform newTransform = 
this.streamTransforms.get(transformName);
                 TransformRequest transformRequest = 
InlongStreamTransformTransfer.createTransformRequest(newTransform,
                         streamInfo);
+                transformRequest.setId(id);
                 Pair<Boolean, String> updateState = 
managerClient.updateTransform(transformRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update 
transform=%s failed with err=%s", transformRequest,
@@ -359,9 +361,9 @@ public class InlongStreamImpl extends InlongStream {
         List<SourceListResponse> sourceListResponses = 
managerClient.listSources(groupId, streamId);
         List<String> updateSourceNames = Lists.newArrayList();
         for (SourceListResponse sourceListResponse : sourceListResponses) {
-            String sourceName = sourceListResponse.getSourceName();
-            int id = sourceListResponse.getId();
-            String type = sourceListResponse.getSourceType();
+            final String sourceName = sourceListResponse.getSourceName();
+            final int id = sourceListResponse.getId();
+            final String type = sourceListResponse.getSourceType();
             if (this.streamSources.get(sourceName) == null) {
                 boolean isDelete = managerClient.deleteSource(id, type);
                 if (!isDelete) {
@@ -370,6 +372,7 @@ public class InlongStreamImpl extends InlongStream {
             } else {
                 StreamSource source = this.streamSources.get(sourceName);
                 SourceRequest sourceRequest = 
InlongStreamSourceTransfer.createSourceRequest(source, streamInfo);
+                sourceRequest.setId(id);
                 Pair<Boolean, String> updateState = 
managerClient.updateSource(sourceRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update source=%s 
failed with err=%s", sourceRequest,
@@ -395,9 +398,9 @@ public class InlongStreamImpl extends InlongStream {
         List<SinkListResponse> sinkListResponses = 
managerClient.listSinks(groupId, streamId);
         List<String> updateSinkNames = Lists.newArrayList();
         for (SinkListResponse sinkListResponse : sinkListResponses) {
-            String sinkName = sinkListResponse.getSinkName();
-            int id = sinkListResponse.getId();
-            String type = sinkListResponse.getSinkType();
+            final String sinkName = sinkListResponse.getSinkName();
+            final int id = sinkListResponse.getId();
+            final String type = sinkListResponse.getSinkType();
             if (this.streamSinks.get(sinkName) == null) {
                 boolean isDelete = managerClient.deleteSink(id, type);
                 if (!isDelete) {
@@ -406,6 +409,7 @@ public class InlongStreamImpl extends InlongStream {
             } else {
                 StreamSink sink = this.streamSinks.get(sinkName);
                 SinkRequest sinkRequest = 
InlongStreamSinkTransfer.createSinkRequest(sink, streamInfo);
+                sinkRequest.setId(id);
                 Pair<Boolean, String> updateState = 
managerClient.updateSink(sinkRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update sink=%s 
failed with err=%s", sinkRequest,
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index 0c8b8758d..3b11902ac 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -41,6 +41,12 @@ public class InlongStreamResponse {
     @ApiModelProperty(value = "Inlong stream name", required = true)
     private String name;
 
+    @ApiModelProperty(value = "Inlong group id")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "Inlong stream id")
+    private String inlongStreamId;
+
     @ApiModelProperty(value = "Inlong stream description")
     private String description;
 

Reply via email to