This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git


The following commit(s) were added to refs/heads/main by this push:
     new 079d7da1 [Improvement] Support selecting a session cluster when 
submitting CDC jobs (#292)
079d7da1 is described below

commit 079d7da159558cea3457ec3004b90aaf24245afb
Author: yangyang zhong <[email protected]>
AuthorDate: Thu Jun 6 17:05:19 2024 +0800

    [Improvement] Support selecting a session cluster when submitting CDC jobs 
(#292)
---
 .../web/server/data/dto/CdcJobSubmitDTO.java       |  2 +-
 .../service/impl/CdcJobDefinitionServiceImpl.java  | 10 +++++-
 .../controller/CdcJobDefinitionControllerTest.java | 38 +++++++++++++++++++++-
 3 files changed, 47 insertions(+), 3 deletions(-)

diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
index 6800b73c..1c4492c9 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
@@ -24,5 +24,5 @@ import lombok.Data;
 @Data
 public class CdcJobSubmitDTO {
 
-    private String flinkSessionUrl;
+    private String clusterId;
 }
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index 0304299d..9d061f9a 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
 import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
 import org.apache.paimon.web.server.data.model.CatalogInfo;
 import org.apache.paimon.web.server.data.model.CdcJobDefinition;
+import org.apache.paimon.web.server.data.model.ClusterInfo;
 import org.apache.paimon.web.server.data.model.cdc.CdcGraph;
 import org.apache.paimon.web.server.data.model.cdc.CdcNode;
 import org.apache.paimon.web.server.data.result.PageR;
@@ -39,6 +40,7 @@ import org.apache.paimon.web.server.data.result.enums.Status;
 import org.apache.paimon.web.server.mapper.CdcJobDefinitionMapper;
 import org.apache.paimon.web.server.service.CatalogService;
 import org.apache.paimon.web.server.service.CdcJobDefinitionService;
+import org.apache.paimon.web.server.service.ClusterService;
 import org.apache.paimon.web.server.util.StringUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -61,6 +63,8 @@ public class CdcJobDefinitionServiceImpl
 
     @Autowired private CatalogService catalogService;
 
+    @Autowired private ClusterService clusterService;
+
     @Override
     public R<Void> create(CdcJobDefinitionDTO cdcJobDefinitionDTO) {
         String name = cdcJobDefinitionDTO.getName();
@@ -142,7 +146,11 @@ public class CdcJobDefinitionServiceImpl
                         cdcGraph.getTarget().getType(),
                         flinkCdcSyncType);
         ObjectNode actionConfigs = JSONUtils.createObjectNode();
-        actionConfigs.put(FlinkCdcOptions.SESSION_URL, 
cdcJobSubmitDTO.getFlinkSessionUrl());
+        String clusterId = cdcJobSubmitDTO.getClusterId();
+        ClusterInfo clusterInfo = clusterService.getById(clusterId);
+        actionConfigs.put(
+                FlinkCdcOptions.SESSION_URL,
+                String.format("http://%s:%s";, clusterInfo.getHost(), 
clusterInfo.getPort()));
         handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource());
         handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget());
         ActionContext actionContext = factory.getActionContext(actionConfigs);
diff --git 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
index 2fd2c0e4..01762beb 100644
--- 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
+++ 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
@@ -19,7 +19,9 @@
 package org.apache.paimon.web.server.controller;
 
 import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
 import org.apache.paimon.web.server.data.model.CdcJobDefinition;
+import org.apache.paimon.web.server.data.model.ClusterInfo;
 import org.apache.paimon.web.server.data.result.PageR;
 import org.apache.paimon.web.server.data.result.R;
 import org.apache.paimon.web.server.util.ObjectMapperUtils;
@@ -123,7 +125,6 @@ public class CdcJobDefinitionControllerTest extends 
ControllerTestBase {
     @Order(3)
     @Test
     public void testSearchCdcJobDefinition() throws Exception {
-        testCreateCdcJob(cdcJobDefinitionDto());
         testCreateCdcJob(cdcJobDefinitionDtoForSearch());
         MockHttpServletResponse listAllResponse =
                 mockMvc.perform(
@@ -181,4 +182,39 @@ public class CdcJobDefinitionControllerTest extends 
ControllerTestBase {
                 getPageR(searchResponse, new 
TypeReference<PageR<CdcJobDefinition>>() {});
         return searchResult.getTotal();
     }
+
+    @Order(4)
+    @Test
+    public void submitCdcJob() throws Exception {
+        System.setProperty("FLINK_HOME", "/opt/flink");
+        System.setProperty("ACTION_JAR_PATH", "/opt/flink/jar");
+        ClusterInfo cluster = new ClusterInfo();
+        cluster.setId(1);
+        cluster.setClusterName("clusterName");
+        cluster.setHost("127.0.0.1");
+        cluster.setPort(8083);
+        cluster.setType("Flink");
+        cluster.setEnabled(true);
+        mockMvc.perform(
+                        MockMvcRequestBuilders.post("/api/cluster")
+                                .cookie(cookie)
+                                .content(ObjectMapperUtils.toJSON(cluster))
+                                .contentType(MediaType.APPLICATION_JSON_VALUE)
+                                .accept(MediaType.APPLICATION_JSON_VALUE))
+                .andExpect(MockMvcResultMatchers.status().isOk())
+                .andDo(MockMvcResultHandlers.print());
+        CdcJobDefinitionDTO cdcJobDefinitionDTO = cdcJobDefinitionDto();
+        CdcJobSubmitDTO cdcJobSubmitDTO = new CdcJobSubmitDTO();
+        mockMvc.perform(
+                        MockMvcRequestBuilders.get(
+                                        cdcJobDefinitionPath + "/" + 
cdcJobDefinitionDTO.getId())
+                                .cookie(cookie)
+                                .accept(MediaType.APPLICATION_JSON_VALUE)
+                                
.content(ObjectMapperUtils.toJSON(cdcJobSubmitDTO))
+                                .contentType(MediaType.APPLICATION_JSON_VALUE))
+                .andExpect(MockMvcResultMatchers.status().isOk())
+                .andDo(MockMvcResultHandlers.print())
+                .andReturn()
+                .getResponse();
+    }
 }

Reply via email to