This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f006c1e19 [INLONG-4100][Manager] Add id in update request for source,
sink, and transform (#4101)
f006c1e19 is described below
commit f006c1e1985bc85b8bdd44b9160991e37bdd44f9
Author: kipshi <[email protected]>
AuthorDate: Sat May 7 12:38:30 2022 +0800
[INLONG-4100][Manager] Add id in update request for source, sink, and
transform (#4101)
---
.../client/api/impl/DefaultInlongStreamBuilder.java | 18 +++++++++++-------
.../manager/client/api/impl/InlongStreamImpl.java | 18 +++++++++++-------
.../common/pojo/stream/InlongStreamResponse.java | 6 ++++++
3 files changed, 28 insertions(+), 14 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index c94d39ee6..c9b0a8fd0 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -170,7 +170,8 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
List<String> updateTransformNames = Lists.newArrayList();
for (TransformResponse transformResponse : transformResponses) {
StreamTransform transform =
InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
- String transformName = transform.getTransformName();
+ final String transformName = transform.getTransformName();
+ final int id = transformResponse.getId();
if (transformRequests.get(transformName) == null) {
TransformRequest transformRequest =
InlongStreamTransformTransfer.createTransformRequest(transform,
streamInfo);
@@ -180,6 +181,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
}
} else {
TransformRequest transformRequest =
transformRequests.get(transformName);
+ transformRequest.setId(id);
Pair<Boolean, String> updateState =
managerClient.updateTransform(transformRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update
transform=%s failed with err=%s", transformRequest,
@@ -208,9 +210,9 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
List<SourceListResponse> sourceListResponses =
managerClient.listSources(groupId, streamId);
List<String> updateSourceNames = Lists.newArrayList();
for (SourceListResponse sourceListResponse : sourceListResponses) {
- String sourceName = sourceListResponse.getSourceName();
- int id = sourceListResponse.getId();
- String type = sourceListResponse.getSourceType();
+ final String sourceName = sourceListResponse.getSourceName();
+ final int id = sourceListResponse.getId();
+ final String type = sourceListResponse.getSourceType();
if (sourceRequests.get(sourceName) == null) {
boolean isDelete = managerClient.deleteSource(id, type);
if (!isDelete) {
@@ -218,6 +220,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
}
} else {
SourceRequest sourceRequest = sourceRequests.get(sourceName);
+ sourceRequest.setId(id);
Pair<Boolean, String> updateState =
managerClient.updateSource(sourceRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update source=%s
failed with err=%s", sourceRequest,
@@ -246,9 +249,9 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
List<SinkListResponse> sinkListResponses =
managerClient.listSinks(groupId, streamId);
List<String> updateSinkNames = Lists.newArrayList();
for (SinkListResponse sinkListResponse : sinkListResponses) {
- String sinkName = sinkListResponse.getSinkName();
- int id = sinkListResponse.getId();
- String type = sinkListResponse.getSinkType();
+ final String sinkName = sinkListResponse.getSinkName();
+ final int id = sinkListResponse.getId();
+ final String type = sinkListResponse.getSinkType();
if (sinkRequests.get(sinkName) == null) {
boolean isDelete = managerClient.deleteSink(id, type);
if (!isDelete) {
@@ -256,6 +259,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
}
} else {
SinkRequest sinkRequest = sinkRequests.get(sinkName);
+ sinkRequest.setId(id);
Pair<Boolean, String> updateState =
managerClient.updateSink(sinkRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update sink=%s
failed with err=%s", sinkRequest,
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 33b270249..11da4a2ef 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -321,7 +321,8 @@ public class InlongStreamImpl extends InlongStream {
List<String> updateTransformNames = Lists.newArrayList();
for (TransformResponse transformResponse : transformResponses) {
StreamTransform transform =
InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
- String transformName = transform.getTransformName();
+ final String transformName = transform.getTransformName();
+ final int id = transformResponse.getId();
if (this.streamTransforms.get(transformName) == null) {
TransformRequest transformRequest =
InlongStreamTransformTransfer.createTransformRequest(transform,
streamInfo);
@@ -333,6 +334,7 @@ public class InlongStreamImpl extends InlongStream {
StreamTransform newTransform =
this.streamTransforms.get(transformName);
TransformRequest transformRequest =
InlongStreamTransformTransfer.createTransformRequest(newTransform,
streamInfo);
+ transformRequest.setId(id);
Pair<Boolean, String> updateState =
managerClient.updateTransform(transformRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update
transform=%s failed with err=%s", transformRequest,
@@ -359,9 +361,9 @@ public class InlongStreamImpl extends InlongStream {
List<SourceListResponse> sourceListResponses =
managerClient.listSources(groupId, streamId);
List<String> updateSourceNames = Lists.newArrayList();
for (SourceListResponse sourceListResponse : sourceListResponses) {
- String sourceName = sourceListResponse.getSourceName();
- int id = sourceListResponse.getId();
- String type = sourceListResponse.getSourceType();
+ final String sourceName = sourceListResponse.getSourceName();
+ final int id = sourceListResponse.getId();
+ final String type = sourceListResponse.getSourceType();
if (this.streamSources.get(sourceName) == null) {
boolean isDelete = managerClient.deleteSource(id, type);
if (!isDelete) {
@@ -370,6 +372,7 @@ public class InlongStreamImpl extends InlongStream {
} else {
StreamSource source = this.streamSources.get(sourceName);
SourceRequest sourceRequest =
InlongStreamSourceTransfer.createSourceRequest(source, streamInfo);
+ sourceRequest.setId(id);
Pair<Boolean, String> updateState =
managerClient.updateSource(sourceRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update source=%s
failed with err=%s", sourceRequest,
@@ -395,9 +398,9 @@ public class InlongStreamImpl extends InlongStream {
List<SinkListResponse> sinkListResponses =
managerClient.listSinks(groupId, streamId);
List<String> updateSinkNames = Lists.newArrayList();
for (SinkListResponse sinkListResponse : sinkListResponses) {
- String sinkName = sinkListResponse.getSinkName();
- int id = sinkListResponse.getId();
- String type = sinkListResponse.getSinkType();
+ final String sinkName = sinkListResponse.getSinkName();
+ final int id = sinkListResponse.getId();
+ final String type = sinkListResponse.getSinkType();
if (this.streamSinks.get(sinkName) == null) {
boolean isDelete = managerClient.deleteSink(id, type);
if (!isDelete) {
@@ -406,6 +409,7 @@ public class InlongStreamImpl extends InlongStream {
} else {
StreamSink sink = this.streamSinks.get(sinkName);
SinkRequest sinkRequest =
InlongStreamSinkTransfer.createSinkRequest(sink, streamInfo);
+ sinkRequest.setId(id);
Pair<Boolean, String> updateState =
managerClient.updateSink(sinkRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update sink=%s
failed with err=%s", sinkRequest,
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index 0c8b8758d..3b11902ac 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -41,6 +41,12 @@ public class InlongStreamResponse {
@ApiModelProperty(value = "Inlong stream name", required = true)
private String name;
+ @ApiModelProperty(value = "Inlong group id")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "Inlong stream id")
+ private String inlongStreamId;
+
@ApiModelProperty(value = "Inlong stream description")
private String description;