CalvinKirs commented on a change in pull request #6229:
URL: https://github.com/apache/dolphinscheduler/pull/6229#discussion_r714925416



##########
File path: 
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java
##########
@@ -158,47 +164,108 @@ private void addFormUrlencoded(HttpPost post) {
     @Override
     public void cancelApplication(boolean status) throws Exception {
         super.cancelApplication(status);
+        logger.info("start to cancelApplication");
+        Objects.requireNonNull(triggerResult, "triggerResult can not be null");
+        logger.info("start to cancelApplication taskId:{}", 
triggerResult.getTaskId());
+        final String triggerUrl = getTriggerUrl();
+        StringEntity entity = new 
StringEntity("action=core_action&event_submit_do_cancel_task=y&taskid=" + 
triggerResult.getTaskId(), StandardCharsets.UTF_8);
+
+        CancelResult cancelResult = null;
+        HttpPost post = new HttpPost(triggerUrl);
+        addFormUrlencoded(post);
+        post.setEntity(entity);
+        try (CloseableHttpClient client = HttpClients.createDefault();
+             // trigger to start TIS dataX task
+             CloseableHttpResponse response = client.execute(post)) {
+            cancelResult = processResponse(triggerUrl, response, 
CancelResult.class);
+            if (!cancelResult.isSuccess()) {
+                List<String> errormsg = triggerResult.getErrormsg();
+                StringBuffer errs = new StringBuffer();
+                if 
(org.apache.dolphinscheduler.spi.utils.CollectionUtils.isNotEmpty(errormsg)) {
+                    
errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(",")));
+                }
+                throw new Exception("cancel TIS job faild taskId:" + 
triggerResult.getTaskId() + errs.toString());
+            }
+        }
     }
 
-    private WebSocket receiveRealtimeLog(final String tisHost, String 
dataXName, int taskId) throws InterruptedException, 
java.util.concurrent.ExecutionException {
+    private String getTriggerUrl() {
+        final String tisHost = getTisHost();
+        return String.format("http://%s/tjs/coredefine/coredefine.ajax";, 
tisHost);
+    }
 
-        WebSocketUpgradeHandler.Builder upgradeHandlerBuilder
-                = new WebSocketUpgradeHandler.Builder();
-        WebSocketUpgradeHandler wsHandler = upgradeHandlerBuilder
-                .addWebSocketListener(new WebSocketListener() {
-                    @Override
-                    public void onOpen(WebSocket websocket) {
-                        // WebSocket connection opened
-                    }
+    private String getTisHost() {
+        final String tisHost = 
taskExecutionContext.getDefinedParams().get(KEY_POOL_VAR_TIS_HOST);
+        if (StringUtils.isEmpty(tisHost)) {
+            throw new IllegalStateException("global var '" + 
KEY_POOL_VAR_TIS_HOST + "' can not be empty");
+        }
+        return tisHost;
+    }
 
-                    @Override
-                    public void onClose(WebSocket websocket, int code, String 
reason) {
-                        // WebSocket connection closed
-                    }
+    private WebSocketClient receiveRealtimeLog(final String tisHost, String 
dataXName, int taskId) throws Exception {
+        final String applyURI = String.format("ws://%s" + WS_REQUEST_PATH + 
"?logtype=full&collection=%s&taskid=%s", tisHost, dataXName, taskId);
+        logger.info("apply ws connection,uri:{}", applyURI);
+        WebSocketClient webSocketClient = new WebSocketClient(new 
URI(applyURI)) {
+            @Override
+            public void onOpen(ServerHandshake handshakedata) {
+                logger.info("start to receive remote execute log");
+            }
 

Review comment:
       This method may be a barrier when the workload is large




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to