This is an automated email from the ASF dual-hosted git repository.

loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new c36d3623 Fix callback issue when task done (#587)
c36d3623 is described below

commit c36d3623a5d44a31c4fa9253d64dc92a5a03570a
Author: chzhoo <[email protected]>
AuthorDate: Thu Aug 14 16:44:41 2025 +0800

    Fix callback issue when task done (#587)
---
 .../client/callback/JobOperatorCallback.java       | 25 +++++++++++++++-------
 .../client/callback/RestJobOperatorCallback.java   | 12 ++++-------
 .../callback/RestJobOperatorCallbackTest.java      | 13 ++++++-----
 3 files changed, 27 insertions(+), 23 deletions(-)

diff --git 
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
 
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
index 9e97ef4a..aed6ac49 100644
--- 
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
+++ 
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
@@ -19,7 +19,7 @@
 
 package org.apache.geaflow.cluster.client.callback;
 
-import java.util.Map;
+import java.io.Serializable;
 
 public interface JobOperatorCallback {
 
@@ -28,20 +28,29 @@ public interface JobOperatorCallback {
      */
     void onFinish();
 
-    class JobOperatorMeta {
-        private final Map<String, String> params;
+    class JobOperatorMeta implements Serializable {
+        private boolean success;
+        private String action;
 
-        public JobOperatorMeta(Map<String, String> params) {
-            this.params = params;
+        public boolean isSuccess() {
+            return success;
         }
 
-        public Map<String, String> getParams() {
-            return params;
+        public void setSuccess(boolean success) {
+            this.success = success;
+        }
+
+        public String getAction() {
+            return action;
+        }
+
+        public void setAction(String action) {
+            this.action = action;
         }
 
         @Override
         public String toString() {
-            return "JobOperatorMeta{" + "params='" + params + '}';
+            return "JobOperatorMeta{" + "action='" + action + ", success=" + 
success + '}';
         }
     }
 }
diff --git 
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
 
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
index 4f9a3cb3..b366c2e7 100644
--- 
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
+++ 
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
@@ -37,7 +37,6 @@ public class RestJobOperatorCallback implements 
JobOperatorCallback {
     private static final String GEAFLOW_TOKEN_KEY = "geaflow-token";
     private static final String FINISH_JOB_PATH = "/api/tasks/%s/operations";
     private static final String FINISH_ACTION_KEY = "finish";
-    private static final String JOB_ACTION_KEY = "action";
 
     private final String callbackUrl;
     private final Map<String, String> headers;
@@ -52,16 +51,13 @@ public class RestJobOperatorCallback implements 
JobOperatorCallback {
 
     @Override
     public void onFinish() {
-        Map<String, String> params = new HashMap<>();
-        params.put(JOB_ACTION_KEY, FINISH_ACTION_KEY);
-        JobOperatorCallback.JobOperatorMeta jobOperatorMeta = new 
JobOperatorCallback.JobOperatorMeta(params);
+        JobOperatorCallback.JobOperatorMeta jobOperatorMeta = new 
JobOperatorCallback.JobOperatorMeta();
+        jobOperatorMeta.setSuccess(true);
+        jobOperatorMeta.setAction(FINISH_ACTION_KEY);
 
         String fullUrl = getFullUrl(jobOperatorMeta);
         if (fullUrl != null) {
-            HttpRequest request = new HttpRequest();
-            request.setSuccess(true);
-            request.setData(jobOperatorMeta);
-            HttpUtil.post(fullUrl, new Gson().toJson(request), headers);
+            HttpUtil.post(fullUrl, new Gson().toJson(jobOperatorMeta), 
headers);
         }
     }
 
diff --git 
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
 
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
index 60c03ce9..9f29174e 100644
--- 
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
+++ 
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
@@ -65,16 +65,15 @@ public class RestJobOperatorCallbackTest {
         Configuration configuration = new Configuration();
         configuration.put(JOB_UNIQUE_ID, String.valueOf(0L));
         RestJobOperatorCallback callback = new 
RestJobOperatorCallback(configuration, baseUrl);
-        Map<String, ConnectAddress> addressList = new HashMap<>();
-        addressList.put("1", new ConnectAddress());
         callback.onFinish();
 
         // confirm that your app made the HTTP requests you were expecting.
-        RecordedRequest request1 = server.takeRequest();
-        Assert.assertEquals("/api/tasks/0/operations", request1.getPath());
-        HttpRequest result1 = new Gson()
-            .fromJson(request1.getBody().readString(StandardCharsets.UTF_8), 
HttpRequest.class);
-        Assert.assertTrue(result1.isSuccess());
+        RecordedRequest request = server.takeRequest();
+        Assert.assertEquals("/api/tasks/0/operations", request.getPath());
+        JobOperatorCallback.JobOperatorMeta reqBody = new Gson()
+            .fromJson(request.getBody().readString(StandardCharsets.UTF_8),  
JobOperatorCallback.JobOperatorMeta.class);
+        Assert.assertTrue(reqBody.isSuccess());
+        Assert.assertEquals(reqBody.getAction(), "finish");
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to