This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d056984a7 IGNITE-23423 Provide system view for running compute jobs 
(#4699)
7d056984a7 is described below

commit 7d056984a7d8b109fd28a95bcbcc62d079cd176e
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Nov 13 16:47:19 2024 +0300

    IGNITE-23423 Provide system view for running compute jobs (#4699)
---
 modules/compute/build.gradle                       |   2 +
 .../internal/compute/ComputeComponentImpl.java     |  13 +-
 .../internal/compute/ComputeViewProvider.java      |  79 ++++++
 .../ignite/internal/util/SubscriptionUtils.java    |   2 +-
 .../subscription/IterableToPublisherAdapter.java   |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   5 +-
 modules/sql-engine/build.gradle                    |   1 +
 .../sql/engine/ItComputeSystemViewTest.java        | 301 +++++++++++++++++++++
 8 files changed, 401 insertions(+), 6 deletions(-)

diff --git a/modules/compute/build.gradle b/modules/compute/build.gradle
index 6c0bd87bc4..771e28bfcd 100644
--- a/modules/compute/build.gradle
+++ b/modules/compute/build.gradle
@@ -33,6 +33,7 @@ dependencies {
     implementation project(':ignite-placement-driver-api')
     implementation project(':ignite-binary-tuple')
     implementation project(':ignite-client-common')
+    implementation project(':ignite-system-view-api')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.auto.service.annotations
@@ -60,6 +61,7 @@ dependencies {
     integrationTestImplementation project(':ignite-placement-driver-api')
     integrationTestImplementation project(':ignite-catalog')
     integrationTestImplementation project(':ignite-client')
+    integrationTestImplementation project(':ignite-system-view-api')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 7d2b30e952..2435642a63 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -56,6 +56,8 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViewProvider;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -66,7 +68,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Implementation of {@link ComputeComponent}.
  */
-public class ComputeComponentImpl implements ComputeComponent {
+public class ComputeComponentImpl implements ComputeComponent, 
SystemViewProvider {
     private static final IgniteLogger LOG = 
Loggers.forClass(ComputeComponentImpl.class);
 
     /** Busy lock to stop synchronously. */
@@ -91,6 +93,8 @@ public class ComputeComponentImpl implements ComputeComponent 
{
 
     private final ExecutorService failoverExecutor;
 
+    private final ComputeViewProvider computeViewProvider = new 
ComputeViewProvider();
+
     /**
      * Creates a new instance.
      */
@@ -271,6 +275,7 @@ public class ComputeComponentImpl implements 
ComputeComponent {
         executor.start();
         messaging.start(this::executeLocally);
         executionManager.start();
+        computeViewProvider.init(executionManager);
 
         return nullCompletedFuture();
     }
@@ -288,6 +293,7 @@ public class ComputeComponentImpl implements 
ComputeComponent {
         executionManager.stop();
         messaging.stop();
         executor.stop();
+        computeViewProvider.stop();
         IgniteUtils.shutdownAndAwaitTermination(failoverExecutor, 10, 
TimeUnit.SECONDS);
 
         return nullCompletedFuture();
@@ -321,4 +327,9 @@ public class ComputeComponentImpl implements 
ComputeComponent {
     ExecutionManager executionManager() {
         return executionManager;
     }
+
+    @Override
+    public List<SystemView<?>> systemViews() {
+        return List.of(computeViewProvider.get());
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeViewProvider.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeViewProvider.java
new file mode 100644
index 0000000000..7f8e0b7b52
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeViewProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.type.NativeTypes.stringOf;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViews;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.jetbrains.annotations.Nullable;
+
+/** Provider that creates system view exposes information about currently 
running compute tasks hosted by a node. */
+public class ComputeViewProvider {
+    private final DelegatingPublisher publisher = new DelegatingPublisher();
+
+    /** Initializes with compute aware manager. */
+    public void init(ExecutionManager executionManager) {
+        publisher.executionManager = executionManager;
+    }
+
+    public void stop() {
+        publisher.executionManager = null;
+    }
+
+    /** Returns system view exposes information about currently running tasks. 
*/
+    public SystemView<?> get() {
+        NativeType idType = stringOf(36);
+
+        NativeType timestampType = 
NativeTypes.timestamp(NativeTypes.MAX_TIME_PRECISION);
+
+        return SystemViews.<JobState>nodeViewBuilder()
+                .name("COMPUTE_TASKS")
+                .nodeNameColumnAlias("COORDINATOR_NODE_ID")
+                .<String>addColumn("ID", idType, info -> info.id().toString())
+                .<String>addColumn("STATUS", stringOf(10), info -> 
info.status().name())
+                .<Instant>addColumn("CREATE_TIME", timestampType, 
JobState::createTime)
+                .<Instant>addColumn("START_TIME", timestampType, 
JobState::startTime)
+                .<Instant>addColumn("FINISH_TIME", timestampType, 
JobState::finishTime)
+                .dataProvider(publisher)
+                .build();
+    }
+
+    private static class DelegatingPublisher implements Publisher<JobState> {
+        @Nullable private volatile ExecutionManager executionManager;
+
+        @Override
+        public void subscribe(Subscriber<? super JobState> subscriber) {
+            ExecutionManager execManager = executionManager;
+
+            Publisher<JobState> jobStatePublisher = execManager != null
+                    ? 
SubscriptionUtils.fromIterable(execManager.localStatesAsync())
+                    : SubscriptionUtils.fromIterable(List.of());
+
+            jobStatePublisher.subscribe(subscriber);
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
index 76fab9b412..9722f5d9e2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
@@ -117,7 +117,7 @@ public class SubscriptionUtils {
      * @param <T> Type of the entries this publisher will emit.
      * @return Publisher created from the given iterable.
      */
-    public static <T> Publisher<T> fromIterable(CompletableFuture<Iterable<T>> 
iterableFuture) {
+    public static <T> Publisher<T> fromIterable(CompletableFuture<? extends 
Iterable<T>> iterableFuture) {
         return new IterableToPublisherAdapter<>(iterableFuture, Runnable::run, 
Integer.MAX_VALUE);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
index 7318b556a4..ac03cb1cdf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
@@ -35,7 +35,7 @@ import java.util.concurrent.Flow.Subscription;
  * @param <T> The type of the entry this publisher will emit.
  */
 public class IterableToPublisherAdapter<T> implements Publisher<T> {
-    private final CompletableFuture<Iterable<T>> iterableFuture;
+    private final CompletableFuture<? extends Iterable<T>> iterableFuture;
     private final Executor executor;
     private final int batchSize;
 
@@ -48,7 +48,7 @@ public class IterableToPublisherAdapter<T> implements 
Publisher<T> {
      *      to provide some reasonable value here in order to give am ability 
to other publishers which share the same
      *      executor to make progress.
      */
-    public IterableToPublisherAdapter(CompletableFuture<Iterable<T>> 
iterableFuture, Executor executor, int batchSize) {
+    public IterableToPublisherAdapter(CompletableFuture<? extends Iterable<T>> 
iterableFuture, Executor executor, int batchSize) {
         this.iterableFuture = iterableFuture;
         this.executor = executor;
         this.batchSize = batchSize;
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1ab83b4457..c135b32bc2 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -88,7 +88,6 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.compute.AntiHijackIgniteCompute;
-import org.apache.ignite.internal.compute.ComputeComponent;
 import org.apache.ignite.internal.compute.ComputeComponentImpl;
 import org.apache.ignite.internal.compute.IgniteComputeImpl;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
@@ -312,7 +311,7 @@ public class IgniteImpl implements Ignite {
     /** Cluster service (cluster network manager). */
     private final ClusterService clusterSvc;
 
-    private final ComputeComponent computeComponent;
+    private final ComputeComponentImpl computeComponent;
 
     private final CriticalWorkerWatchdog criticalWorkerRegistry;
 
@@ -1094,6 +1093,8 @@ public class IgniteImpl implements Ignite {
                 computeCfg
         );
 
+        systemViewManager.register(computeComponent);
+
         compute = new IgniteComputeImpl(
                 placementDriverMgr.placementDriver(),
                 clusterSvc.topologyService(),
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 1ca9dee20c..572c234a92 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -117,6 +117,7 @@ dependencies {
     testImplementation libs.archunit.core
     testImplementation libs.archunit.junit5
 
+    integrationTestImplementation libs.awaitility
     integrationTestImplementation project(':ignite-api')
     integrationTestImplementation project(':ignite-schema')
     integrationTestImplementation project(':ignite-catalog')
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItComputeSystemViewTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItComputeSystemViewTest.java
new file mode 100644
index 0000000000..40bddf83c4
--- /dev/null
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItComputeSystemViewTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.compute.JobStatus.CANCELED;
+import static org.apache.ignite.compute.JobStatus.EXECUTING;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.sql.engine.util.Commons.closeQuiet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
+import static 
org.apache.ignite.internal.testframework.matchers.TaskStateMatcher.taskStateWithStatus;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasLength;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.TaskStatus;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ColumnType;
+import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * End-to-end tests to verify {@code COMPUTE_TASKS} system view.
+ */
+public class ItComputeSystemViewTest extends BaseSqlIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void checkMeta(boolean isClient) {
+        String query = "SELECT * FROM COMPUTE_TASKS";
+
+        Ignite entryNode = isClient ? 
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+
+        try {
+            // Verify metadata.
+            assertQuery(query)
+                    .withDefaultSchema("SYSTEM")
+                    .columnMetadata(
+                            new 
MetadataMatcher().name("COORDINATOR_NODE_ID").type(ColumnType.STRING).nullable(false),
+                            new 
MetadataMatcher().name("ID").type(ColumnType.STRING).precision(36).nullable(true),
+                            new 
MetadataMatcher().name("STATUS").type(ColumnType.STRING).nullable(true),
+                            new 
MetadataMatcher().name("CREATE_TIME").type(ColumnType.TIMESTAMP).nullable(true),
+                            new 
MetadataMatcher().name("START_TIME").type(ColumnType.TIMESTAMP).nullable(true),
+                            new 
MetadataMatcher().name("FINISH_TIME").type(ColumnType.TIMESTAMP).nullable(true)
+                    )
+                    .check();
+        } finally {
+            closeQuiet(entryNode);
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void viewRunningJobs(boolean isClient) {
+        Ignite entryNode = isClient ? 
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+        Ignite targetNode = CLUSTER.node(0);
+
+        try {
+            ClockService clockService = 
unwrapIgniteImpl(targetNode).clockService();
+
+            long tsBefore = clockService.now().getPhysical();
+
+            JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+            JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(targetNode)), job, null);
+
+            await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+            long tsAfter = clockService.now().getPhysical();
+
+            String query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE STATUS = 
?";
+
+            List<List<Object>> res = sql(0, query, EXECUTING.name());
+
+            assertThat(res, Matchers.hasSize(1));
+
+            verifyComputeJobState(res.get(0), List.of(targetNode.name()), 
EXECUTING.name(), tsBefore, tsAfter);
+
+            IgniteTestUtils.await(execution.cancelAsync());
+
+            await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+
+            // Second Job call on different node.
+            job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+
+            tsBefore = clockService.now().getPhysical();
+
+            targetNode = CLUSTER.node(1);
+
+            execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(targetNode)), job, null);
+
+            await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+            tsAfter = clockService.now().getPhysical();
+
+            query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE 
COORDINATOR_NODE_ID = ? AND STATUS = ?";
+
+            res = sql(0, query, targetNode.name(), EXECUTING.name());
+
+            verifyComputeJobState(res.get(0), List.of(targetNode.name()), 
EXECUTING.name(), tsBefore, tsAfter);
+
+            IgniteTestUtils.await(execution.cancelAsync());
+
+            await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+        } finally {
+            closeQuiet(entryNode);
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void viewRunningBroadcasts(boolean isClient) {
+        Ignite entryNode = isClient ? 
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+
+        try {
+            ClockService clockService = 
unwrapIgniteImpl(CLUSTER.node(0)).clockService();
+
+            long tsBefore = clockService.now().getPhysical();
+
+            JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+            Map<ClusterNode, JobExecution<Void>> execution = 
entryNode.compute().submitBroadcast(
+                    Set.of(clusterNode(CLUSTER.node(0)), 
clusterNode(CLUSTER.node(1))), job, null);
+
+            execution.forEach((k, exec) -> await().until(exec::stateAsync, 
willBe(jobStateWithStatus(EXECUTING))));
+
+            long tsAfter = clockService.now().getPhysical();
+
+            String query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE STATUS = 
?";
+
+            List<List<Object>> res = sql(0, query, EXECUTING.name());
+
+            assertThat(res.size(), is(2));
+            List<String> execNodes = List.of(CLUSTER.node(0).name(), 
CLUSTER.node(1).name());
+
+            verifyComputeJobState(res.get(0), execNodes, EXECUTING.name(), 
tsBefore, tsAfter);
+            verifyComputeJobState(res.get(1), execNodes, EXECUTING.name(), 
tsBefore, tsAfter);
+
+            execution.forEach((k, exec) -> 
IgniteTestUtils.await(exec.cancelAsync()));
+        } finally {
+            closeQuiet(entryNode);
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void viewRunningMapReduceTask(boolean isClient) {
+        Ignite entryNode = isClient ? 
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+        Ignite targetNode = CLUSTER.node(0);
+
+        try {
+            ClockService clockService = 
unwrapIgniteImpl(targetNode).clockService();
+
+            long tsBefore = clockService.now().getPhysical();
+
+            TaskExecution<Void> execution = entryNode.compute()
+                    
.submitMapReduce(TaskDescriptor.builder(MapReduceTaskCustom.class).build(), 
null);
+
+            await().until(execution::stateAsync, 
willBe(taskStateWithStatus(TaskStatus.EXECUTING)));
+
+            long tsAfter = clockService.now().getPhysical();
+
+            String query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE STATUS = 
?";
+
+            List<List<Object>> res = sql(0, query, EXECUTING.name());
+
+            assertThat(res.size(), is(2));
+            List<String> execNodes = List.of(CLUSTER.node(0).name(), 
CLUSTER.node(1).name());
+
+            verifyComputeJobState(res.get(0), execNodes, EXECUTING.name(), 
tsBefore, tsAfter);
+            verifyComputeJobState(res.get(1), execNodes, EXECUTING.name(), 
tsBefore, tsAfter);
+
+            IgniteTestUtils.await(execution.cancelAsync());
+        } finally {
+            closeQuiet(entryNode);
+        }
+    }
+
+    private static ClusterNode clusterNode(Ignite node) {
+        return unwrapIgniteImpl(node).node();
+    }
+
+    private static class InfiniteMapReduceJob implements ComputeJob<Void, 
Void> {
+        @Override
+        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Void input) {
+            return new CompletableFuture<>();
+        }
+    }
+
+    private static class MapReduceTaskCustom implements MapReduceTask<Void, 
Void, Void, Void> {
+        @Override
+        public CompletableFuture<List<MapReduceJob<Void, Void>>> 
splitAsync(TaskExecutionContext taskContext, @Nullable Void input) {
+            return completedFuture(List.of(
+                    MapReduceJob.<Void, Void>builder()
+                            
.jobDescriptor(JobDescriptor.builder(InfiniteMapReduceJob.class).build())
+                            .nodes(taskContext.ignite().clusterNodes())
+                            .build()
+            ));
+        }
+
+        @Override
+        public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Void> results) {
+            return completedFuture(null);
+        }
+    }
+
+    private static class InfiniteJob implements ComputeJob<Void, Void> {
+        @Override
+        public @Nullable CompletableFuture<Void> 
executeAsync(JobExecutionContext context, @Nullable Void arg) {
+            while (true) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    // No op, just return from loop
+                    break;
+                }
+            }
+
+            return null;
+        }
+    }
+
+    private static void verifyComputeJobState(
+            List<Object> row,
+            List<String> nodeName,
+            String phase,
+            long tsBefore,
+            long tsAfter
+    ) {
+        int idx = 0;
+
+        // INITIATOR_NODE
+        assertThat(nodeName, hasItem((String) row.get(idx++)));
+
+        // ID
+        assertThat((String) row.get(idx++), hasLength(36));
+
+        // PHASE
+        assertThat(row.get(idx++), equalTo(phase));
+
+        // CREATE_TIME
+        assertThat(((Instant) row.get(idx++)).toEpochMilli(), 
Matchers.allOf(greaterThanOrEqualTo(tsBefore), lessThanOrEqualTo(tsAfter)));
+
+        // START_TIME
+        assertThat(((Instant) row.get(idx++)).toEpochMilli(), 
Matchers.allOf(greaterThanOrEqualTo(tsBefore), lessThanOrEqualTo(tsAfter)));
+
+        // FINISH_TIME
+        // Asynchronously updated and eventually can be null
+        if (row.get(idx) != null) {
+            assertThat(((Instant) row.get(idx)).toEpochMilli(),
+                    Matchers.allOf(greaterThanOrEqualTo(tsBefore), 
greaterThanOrEqualTo(tsAfter)));
+        }
+    }
+}

Reply via email to