This is an automated email from the ASF dual-hosted git repository.
loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new c36d3623 Fix callback issue when task done (#587)
c36d3623 is described below
commit c36d3623a5d44a31c4fa9253d64dc92a5a03570a
Author: chzhoo <[email protected]>
AuthorDate: Thu Aug 14 16:44:41 2025 +0800
Fix callback issue when task done (#587)
---
.../client/callback/JobOperatorCallback.java | 25 +++++++++++++++-------
.../client/callback/RestJobOperatorCallback.java | 12 ++++-------
.../callback/RestJobOperatorCallbackTest.java | 13 ++++++-----
3 files changed, 27 insertions(+), 23 deletions(-)
diff --git
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
index 9e97ef4a..aed6ac49 100644
---
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
+++
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/JobOperatorCallback.java
@@ -19,7 +19,7 @@
package org.apache.geaflow.cluster.client.callback;
-import java.util.Map;
+import java.io.Serializable;
public interface JobOperatorCallback {
@@ -28,20 +28,29 @@ public interface JobOperatorCallback {
*/
void onFinish();
- class JobOperatorMeta {
- private final Map<String, String> params;
+ class JobOperatorMeta implements Serializable {
+ private boolean success;
+ private String action;
- public JobOperatorMeta(Map<String, String> params) {
- this.params = params;
+ public boolean isSuccess() {
+ return success;
}
- public Map<String, String> getParams() {
- return params;
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public String getAction() {
+ return action;
+ }
+
+ public void setAction(String action) {
+ this.action = action;
}
@Override
public String toString() {
- return "JobOperatorMeta{" + "params='" + params + '}';
+ return "JobOperatorMeta{" + "action='" + action + ", success=" +
success + '}';
}
}
}
diff --git
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
index 4f9a3cb3..b366c2e7 100644
---
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
+++
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallback.java
@@ -37,7 +37,6 @@ public class RestJobOperatorCallback implements
JobOperatorCallback {
private static final String GEAFLOW_TOKEN_KEY = "geaflow-token";
private static final String FINISH_JOB_PATH = "/api/tasks/%s/operations";
private static final String FINISH_ACTION_KEY = "finish";
- private static final String JOB_ACTION_KEY = "action";
private final String callbackUrl;
private final Map<String, String> headers;
@@ -52,16 +51,13 @@ public class RestJobOperatorCallback implements
JobOperatorCallback {
@Override
public void onFinish() {
- Map<String, String> params = new HashMap<>();
- params.put(JOB_ACTION_KEY, FINISH_ACTION_KEY);
- JobOperatorCallback.JobOperatorMeta jobOperatorMeta = new
JobOperatorCallback.JobOperatorMeta(params);
+ JobOperatorCallback.JobOperatorMeta jobOperatorMeta = new
JobOperatorCallback.JobOperatorMeta();
+ jobOperatorMeta.setSuccess(true);
+ jobOperatorMeta.setAction(FINISH_ACTION_KEY);
String fullUrl = getFullUrl(jobOperatorMeta);
if (fullUrl != null) {
- HttpRequest request = new HttpRequest();
- request.setSuccess(true);
- request.setData(jobOperatorMeta);
- HttpUtil.post(fullUrl, new Gson().toJson(request), headers);
+ HttpUtil.post(fullUrl, new Gson().toJson(jobOperatorMeta),
headers);
}
}
diff --git
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
index 60c03ce9..9f29174e 100644
---
a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
+++
b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/org/apache/geaflow/cluster/client/callback/RestJobOperatorCallbackTest.java
@@ -65,16 +65,15 @@ public class RestJobOperatorCallbackTest {
Configuration configuration = new Configuration();
configuration.put(JOB_UNIQUE_ID, String.valueOf(0L));
RestJobOperatorCallback callback = new
RestJobOperatorCallback(configuration, baseUrl);
- Map<String, ConnectAddress> addressList = new HashMap<>();
- addressList.put("1", new ConnectAddress());
callback.onFinish();
// confirm that your app made the HTTP requests you were expecting.
- RecordedRequest request1 = server.takeRequest();
- Assert.assertEquals("/api/tasks/0/operations", request1.getPath());
- HttpRequest result1 = new Gson()
- .fromJson(request1.getBody().readString(StandardCharsets.UTF_8),
HttpRequest.class);
- Assert.assertTrue(result1.isSuccess());
+ RecordedRequest request = server.takeRequest();
+ Assert.assertEquals("/api/tasks/0/operations", request.getPath());
+ JobOperatorCallback.JobOperatorMeta reqBody = new Gson()
+ .fromJson(request.getBody().readString(StandardCharsets.UTF_8),
JobOperatorCallback.JobOperatorMeta.class);
+ Assert.assertTrue(reqBody.isSuccess());
+ Assert.assertEquals(reqBody.getAction(), "finish");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]