This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 12232584b43 IGNITE-28345 Skip client compute notification on
executionFut fail (#7906)
12232584b43 is described below
commit 12232584b4373109bd1c92c9f20e6d2f4518735b
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Mar 31 12:39:51 2026 +0200
IGNITE-28345 Skip client compute notification on executionFut fail (#7906)
* Client request handler: prevent unexpected notifications when job/task
execution fails early
* CI config: add build failure detection for "Unexpected xxx ID" in .NET to
catch those problems early
---
.teamcity/test/platform_tests/PlatformDotnetTestsLinux.kt | 12 ++++++++++++
.../compute/ClientComputeExecuteMapReduceRequest.java | 13 ++++++++-----
.../requests/compute/ClientComputeExecuteRequest.java | 13 +++----------
3 files changed, 23 insertions(+), 15 deletions(-)
diff --git a/.teamcity/test/platform_tests/PlatformDotnetTestsLinux.kt
b/.teamcity/test/platform_tests/PlatformDotnetTestsLinux.kt
index d070770fde3..e7ce4f8df4c 100644
--- a/.teamcity/test/platform_tests/PlatformDotnetTestsLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformDotnetTestsLinux.kt
@@ -105,6 +105,18 @@ object PlatformDotnetTestsLinux : BuildType({
failureMessage = "NullReferenceException in log"
reverse = false
}
+ failOnText {
+ conditionType = BuildFailureOnText.ConditionType.CONTAINS
+ pattern = "Unexpected notification ID"
+ failureMessage = "Unexpected notification ID in log"
+ reverse = false
+ }
+ failOnText {
+ conditionType = BuildFailureOnText.ConditionType.CONTAINS
+ pattern = "Unexpected response ID"
+ failureMessage = "Unexpected response ID in log"
+ reverse = false
+ }
}
requirements {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index 08a4bac84d9..65cc1c39cf1 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -82,7 +82,6 @@ public class ClientComputeExecuteMapReduceRequest {
.clientAddress(clientContext.remoteAddress().toString());
TaskExecution<Object> execution =
compute.submitMapReduceInternal(taskDescriptor, metadataBuilder, arg, null);
- sendTaskResult(execution, notificationSender);
var idsAsync = execution.idsAsync()
.handle((ids, ex) -> {
@@ -90,10 +89,14 @@ public class ClientComputeExecuteMapReduceRequest {
return ex == null ? ids : Collections.<UUID>emptyList();
});
- return execution.idAsync().thenCompose(id -> idsAsync.thenApply(ids ->
out -> {
- //noinspection DataFlowIssue
- out.packUuid(id);
- packJobIds(out, ids);
+ return execution.idAsync().thenCompose(id -> idsAsync.thenApply(ids ->
{
+ sendTaskResult(execution, notificationSender);
+
+ return out -> {
+ //noinspection DataFlowIssue
+ out.packUuid(id);
+ packJobIds(out, ids);
+ };
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index ed925e27a24..c5704dc05fb 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
import org.apache.ignite.client.handler.ClientContext;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
@@ -131,12 +130,8 @@ public class ClientComputeExecuteRequest {
CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFut,
NotificationSender notificationSender
) {
- return executionFut.handle((execution, throwable) -> {
- if (throwable != null) {
- notificationSender.sendNotification(null, throwable,
NULL_HYBRID_TIMESTAMP);
- return
CompletableFuture.<ComputeJobDataHolder>failedFuture(throwable);
- } else {
- return execution.resultAsync().whenComplete((val, err) ->
+ return executionFut.thenCompose(execution ->
+ execution.resultAsync().whenComplete((val, err) ->
execution.stateAsync().whenComplete((state, errState)
-> {
try {
notificationSender.sendNotification(
@@ -150,9 +145,7 @@ public class ClientComputeExecuteRequest {
} catch (Throwable e) {
LOG.error("Failed to send job result
notification: " + e.getMessage(), e);
}
- }));
- }
- }).thenCompose(Function.identity());
+ })));
}
static void packSubmitResult(ClientMessagePacker out, UUID jobId,
ClusterNode node) {