This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new 079d7da1 [Improvement] Support selecting a session cluster when
submitting CDC jobs (#292)
079d7da1 is described below
commit 079d7da159558cea3457ec3004b90aaf24245afb
Author: yangyang zhong <[email protected]>
AuthorDate: Thu Jun 6 17:05:19 2024 +0800
[Improvement] Support selecting a session cluster when submitting CDC jobs
(#292)
---
.../web/server/data/dto/CdcJobSubmitDTO.java | 2 +-
.../service/impl/CdcJobDefinitionServiceImpl.java | 10 +++++-
.../controller/CdcJobDefinitionControllerTest.java | 38 +++++++++++++++++++++-
3 files changed, 47 insertions(+), 3 deletions(-)
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
index 6800b73c..1c4492c9 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
@@ -24,5 +24,5 @@ import lombok.Data;
@Data
public class CdcJobSubmitDTO {
- private String flinkSessionUrl;
+ private String clusterId;
}
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index 0304299d..9d061f9a 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -31,6 +31,7 @@ import
org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
import org.apache.paimon.web.server.data.model.CatalogInfo;
import org.apache.paimon.web.server.data.model.CdcJobDefinition;
+import org.apache.paimon.web.server.data.model.ClusterInfo;
import org.apache.paimon.web.server.data.model.cdc.CdcGraph;
import org.apache.paimon.web.server.data.model.cdc.CdcNode;
import org.apache.paimon.web.server.data.result.PageR;
@@ -39,6 +40,7 @@ import org.apache.paimon.web.server.data.result.enums.Status;
import org.apache.paimon.web.server.mapper.CdcJobDefinitionMapper;
import org.apache.paimon.web.server.service.CatalogService;
import org.apache.paimon.web.server.service.CdcJobDefinitionService;
+import org.apache.paimon.web.server.service.ClusterService;
import org.apache.paimon.web.server.util.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -61,6 +63,8 @@ public class CdcJobDefinitionServiceImpl
@Autowired private CatalogService catalogService;
+ @Autowired private ClusterService clusterService;
+
@Override
public R<Void> create(CdcJobDefinitionDTO cdcJobDefinitionDTO) {
String name = cdcJobDefinitionDTO.getName();
@@ -142,7 +146,11 @@ public class CdcJobDefinitionServiceImpl
cdcGraph.getTarget().getType(),
flinkCdcSyncType);
ObjectNode actionConfigs = JSONUtils.createObjectNode();
- actionConfigs.put(FlinkCdcOptions.SESSION_URL,
cdcJobSubmitDTO.getFlinkSessionUrl());
+ String clusterId = cdcJobSubmitDTO.getClusterId();
+ ClusterInfo clusterInfo = clusterService.getById(clusterId);
+ actionConfigs.put(
+ FlinkCdcOptions.SESSION_URL,
+ String.format("http://%s:%s", clusterInfo.getHost(),
clusterInfo.getPort()));
handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource());
handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget());
ActionContext actionContext = factory.getActionContext(actionConfigs);
diff --git
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
index 2fd2c0e4..01762beb 100644
---
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
+++
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
@@ -19,7 +19,9 @@
package org.apache.paimon.web.server.controller;
import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
import org.apache.paimon.web.server.data.model.CdcJobDefinition;
+import org.apache.paimon.web.server.data.model.ClusterInfo;
import org.apache.paimon.web.server.data.result.PageR;
import org.apache.paimon.web.server.data.result.R;
import org.apache.paimon.web.server.util.ObjectMapperUtils;
@@ -123,7 +125,6 @@ public class CdcJobDefinitionControllerTest extends
ControllerTestBase {
@Order(3)
@Test
public void testSearchCdcJobDefinition() throws Exception {
- testCreateCdcJob(cdcJobDefinitionDto());
testCreateCdcJob(cdcJobDefinitionDtoForSearch());
MockHttpServletResponse listAllResponse =
mockMvc.perform(
@@ -181,4 +182,39 @@ public class CdcJobDefinitionControllerTest extends
ControllerTestBase {
getPageR(searchResponse, new
TypeReference<PageR<CdcJobDefinition>>() {});
return searchResult.getTotal();
}
+
+ @Order(4)
+ @Test
+ public void submitCdcJob() throws Exception {
+ System.setProperty("FLINK_HOME", "/opt/flink");
+ System.setProperty("ACTION_JAR_PATH", "/opt/flink/jar");
+ ClusterInfo cluster = new ClusterInfo();
+ cluster.setId(1);
+ cluster.setClusterName("clusterName");
+ cluster.setHost("127.0.0.1");
+ cluster.setPort(8083);
+ cluster.setType("Flink");
+ cluster.setEnabled(true);
+ mockMvc.perform(
+ MockMvcRequestBuilders.post("/api/cluster")
+ .cookie(cookie)
+ .content(ObjectMapperUtils.toJSON(cluster))
+ .contentType(MediaType.APPLICATION_JSON_VALUE)
+ .accept(MediaType.APPLICATION_JSON_VALUE))
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andDo(MockMvcResultHandlers.print());
+ CdcJobDefinitionDTO cdcJobDefinitionDTO = cdcJobDefinitionDto();
+ CdcJobSubmitDTO cdcJobSubmitDTO = new CdcJobSubmitDTO();
+ mockMvc.perform(
+ MockMvcRequestBuilders.get(
+ cdcJobDefinitionPath + "/" +
cdcJobDefinitionDTO.getId())
+ .cookie(cookie)
+ .accept(MediaType.APPLICATION_JSON_VALUE)
+
.content(ObjectMapperUtils.toJSON(cdcJobSubmitDTO))
+ .contentType(MediaType.APPLICATION_JSON_VALUE))
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andDo(MockMvcResultHandlers.print())
+ .andReturn()
+ .getResponse();
+ }
}