This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a735368fc [INLONG-4928][Manager] Modify inlong stream API in the
Manager client (#5309)
a735368fc is described below
commit a735368fc2739c43dab332443cc1e3cb17021612
Author: haifxu <[email protected]>
AuthorDate: Tue Aug 2 20:28:08 2022 +0800
[INLONG-4928][Manager] Modify inlong stream API in the Manager client
(#5309)
---
.../inlong/manager/client/api/InlongStream.java | 2 +-
.../api/inner/client/InlongStreamClient.java | 102 ++++++++++++++++++++-
.../client/api/service/InlongStreamApi.java | 19 ++++
3 files changed, 121 insertions(+), 2 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index 62fe52db5..d0e0897de 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -62,7 +62,7 @@ public interface InlongStream {
StreamSink getSinkInfoByName(String sinkName);
/**
- * Return data transform node defined in stream(split,string replace etc)
+ * Return data transform node defined in stream(split, string replace etc.)
* key is transform name which must be unique within one stream scope.
*/
Map<String, StreamTransform> getTransforms();
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index f54cbd86f..ad24f03bc 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -22,10 +22,11 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.service.InlongStreamApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
-import org.apache.inlong.manager.common.util.Preconditions;
import java.util.List;
@@ -49,6 +50,12 @@ public class InlongStreamClient {
return response.getData();
}
+ /**
+ * Query whether the inlong stream ID exists
+ *
+ * @param streamInfo inlong stream info
+ * @return true: exists, false: does not exist
+ */
public Boolean isStreamExists(InlongStreamInfo streamInfo) {
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
@@ -60,6 +67,12 @@ public class InlongStreamClient {
return response.getData();
}
+ /**
+ * InlongStream info that needs to be modified
+ *
+ * @param streamInfo inlong stream info that needs to be modified
+ * @return whether succeed
+ */
public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo)
{
Response<Boolean> resp =
ClientUtils.executeHttpCall(inlongStreamApi.updateStream(streamInfo));
@@ -86,6 +99,19 @@ public class InlongStreamClient {
}
}
+ /**
+ * Paging query inlong stream brief info list
+ *
+ * @param request query request
+ * @return inlong stream brief list
+ */
+ public PageInfo<InlongStreamBriefInfo>
listByCondition(InlongStreamPageRequest request) {
+ Response<PageInfo<InlongStreamBriefInfo>> response =
ClientUtils.executeHttpCall(
+ inlongStreamApi.listByCondition(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Get inlong stream info.
*/
@@ -99,4 +125,78 @@ public class InlongStreamClient {
return response.getData().getList();
}
+ /**
+ * Create stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean startProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be
empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be
empty");
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongStreamApi.startProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Suspend stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean suspendProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be
empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be
empty");
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongStreamApi.suspendProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Restart stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean restartProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be
empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be
empty");
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongStreamApi.restartProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Delete stream in synchronous/asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean deleteProcess(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be
empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be
empty");
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongStreamApi.deleteProcess(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Delete the specified inlong stream
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return whether succeed
+ */
+ public boolean delete(String groupId, String streamId) {
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be
empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be
empty");
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongStreamApi.delete(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 5e5f36bd2..83ab31759 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.client.api.service;
import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import retrofit2.Call;
import retrofit2.http.Body;
+import retrofit2.http.DELETE;
import retrofit2.http.GET;
import retrofit2.http.POST;
import retrofit2.http.Path;
@@ -43,7 +45,24 @@ public interface InlongStreamApi {
Call<Response<InlongStreamInfo>> getStream(@Query("groupId") String
groupId,
@Query("streamId") String streamId);
+ @POST("/stream/list")
+ Call<Response<PageInfo<InlongStreamBriefInfo>>> listByCondition(@Body
InlongStreamPageRequest request);
+
@POST("stream/listAll")
Call<Response<PageInfo<InlongStreamInfo>>> listStream(@Body
InlongStreamPageRequest request);
+ @POST("/stream/startProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> startProcess(@Path("groupId") String groupId,
@Path("streamId") String streamId);
+
+ @POST("/stream/suspendProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> suspendProcess(@Path("groupId") String groupId,
@Path("streamId") String streamId);
+
+ @POST("/stream/restartProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> restartProcess(@Path("groupId") String groupId,
@Path("streamId") String streamId);
+
+ @POST("/stream/deleteProcess/{groupId}/{streamId}")
+ Call<Response<Boolean>> deleteProcess(@Path("groupId") String groupId,
@Path("streamId") String streamId);
+
+ @DELETE("/stream/delete")
+ Call<Response<Boolean>> delete(@Path("groupId") String groupId,
@Path("streamId") String streamId);
}