This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7d056984a7 IGNITE-23423 Provide system view for running compute jobs
(#4699)
7d056984a7 is described below
commit 7d056984a7d8b109fd28a95bcbcc62d079cd176e
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Nov 13 16:47:19 2024 +0300
IGNITE-23423 Provide system view for running compute jobs (#4699)
---
modules/compute/build.gradle | 2 +
.../internal/compute/ComputeComponentImpl.java | 13 +-
.../internal/compute/ComputeViewProvider.java | 79 ++++++
.../ignite/internal/util/SubscriptionUtils.java | 2 +-
.../subscription/IterableToPublisherAdapter.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 5 +-
modules/sql-engine/build.gradle | 1 +
.../sql/engine/ItComputeSystemViewTest.java | 301 +++++++++++++++++++++
8 files changed, 401 insertions(+), 6 deletions(-)
diff --git a/modules/compute/build.gradle b/modules/compute/build.gradle
index 6c0bd87bc4..771e28bfcd 100644
--- a/modules/compute/build.gradle
+++ b/modules/compute/build.gradle
@@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-placement-driver-api')
implementation project(':ignite-binary-tuple')
implementation project(':ignite-client-common')
+ implementation project(':ignite-system-view-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.auto.service.annotations
@@ -60,6 +61,7 @@ dependencies {
integrationTestImplementation project(':ignite-placement-driver-api')
integrationTestImplementation project(':ignite-catalog')
integrationTestImplementation project(':ignite-client')
+ integrationTestImplementation project(':ignite-system-view-api')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 7d2b30e952..2435642a63 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -56,6 +56,8 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -66,7 +68,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Implementation of {@link ComputeComponent}.
*/
-public class ComputeComponentImpl implements ComputeComponent {
+public class ComputeComponentImpl implements ComputeComponent,
SystemViewProvider {
private static final IgniteLogger LOG =
Loggers.forClass(ComputeComponentImpl.class);
/** Busy lock to stop synchronously. */
@@ -91,6 +93,8 @@ public class ComputeComponentImpl implements ComputeComponent
{
private final ExecutorService failoverExecutor;
+ private final ComputeViewProvider computeViewProvider = new
ComputeViewProvider();
+
/**
* Creates a new instance.
*/
@@ -271,6 +275,7 @@ public class ComputeComponentImpl implements
ComputeComponent {
executor.start();
messaging.start(this::executeLocally);
executionManager.start();
+ computeViewProvider.init(executionManager);
return nullCompletedFuture();
}
@@ -288,6 +293,7 @@ public class ComputeComponentImpl implements
ComputeComponent {
executionManager.stop();
messaging.stop();
executor.stop();
+ computeViewProvider.stop();
IgniteUtils.shutdownAndAwaitTermination(failoverExecutor, 10,
TimeUnit.SECONDS);
return nullCompletedFuture();
@@ -321,4 +327,9 @@ public class ComputeComponentImpl implements
ComputeComponent {
ExecutionManager executionManager() {
return executionManager;
}
+
+ @Override
+ public List<SystemView<?>> systemViews() {
+ return List.of(computeViewProvider.get());
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeViewProvider.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeViewProvider.java
new file mode 100644
index 0000000000..7f8e0b7b52
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeViewProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.type.NativeTypes.stringOf;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViews;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.jetbrains.annotations.Nullable;
+
+/** Provider that creates system view exposes information about currently
running compute tasks hosted by a node. */
+public class ComputeViewProvider {
+ private final DelegatingPublisher publisher = new DelegatingPublisher();
+
+ /** Initializes with compute aware manager. */
+ public void init(ExecutionManager executionManager) {
+ publisher.executionManager = executionManager;
+ }
+
+ public void stop() {
+ publisher.executionManager = null;
+ }
+
+ /** Returns system view exposes information about currently running tasks.
*/
+ public SystemView<?> get() {
+ NativeType idType = stringOf(36);
+
+ NativeType timestampType =
NativeTypes.timestamp(NativeTypes.MAX_TIME_PRECISION);
+
+ return SystemViews.<JobState>nodeViewBuilder()
+ .name("COMPUTE_TASKS")
+ .nodeNameColumnAlias("COORDINATOR_NODE_ID")
+ .<String>addColumn("ID", idType, info -> info.id().toString())
+ .<String>addColumn("STATUS", stringOf(10), info ->
info.status().name())
+ .<Instant>addColumn("CREATE_TIME", timestampType,
JobState::createTime)
+ .<Instant>addColumn("START_TIME", timestampType,
JobState::startTime)
+ .<Instant>addColumn("FINISH_TIME", timestampType,
JobState::finishTime)
+ .dataProvider(publisher)
+ .build();
+ }
+
+ private static class DelegatingPublisher implements Publisher<JobState> {
+ @Nullable private volatile ExecutionManager executionManager;
+
+ @Override
+ public void subscribe(Subscriber<? super JobState> subscriber) {
+ ExecutionManager execManager = executionManager;
+
+ Publisher<JobState> jobStatePublisher = execManager != null
+ ?
SubscriptionUtils.fromIterable(execManager.localStatesAsync())
+ : SubscriptionUtils.fromIterable(List.of());
+
+ jobStatePublisher.subscribe(subscriber);
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
index 76fab9b412..9722f5d9e2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/SubscriptionUtils.java
@@ -117,7 +117,7 @@ public class SubscriptionUtils {
* @param <T> Type of the entries this publisher will emit.
* @return Publisher created from the given iterable.
*/
- public static <T> Publisher<T> fromIterable(CompletableFuture<Iterable<T>>
iterableFuture) {
+ public static <T> Publisher<T> fromIterable(CompletableFuture<? extends
Iterable<T>> iterableFuture) {
return new IterableToPublisherAdapter<>(iterableFuture, Runnable::run,
Integer.MAX_VALUE);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
index 7318b556a4..ac03cb1cdf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java
@@ -35,7 +35,7 @@ import java.util.concurrent.Flow.Subscription;
* @param <T> The type of the entry this publisher will emit.
*/
public class IterableToPublisherAdapter<T> implements Publisher<T> {
- private final CompletableFuture<Iterable<T>> iterableFuture;
+ private final CompletableFuture<? extends Iterable<T>> iterableFuture;
private final Executor executor;
private final int batchSize;
@@ -48,7 +48,7 @@ public class IterableToPublisherAdapter<T> implements
Publisher<T> {
* to provide some reasonable value here in order to give am ability
to other publishers which share the same
* executor to make progress.
*/
- public IterableToPublisherAdapter(CompletableFuture<Iterable<T>>
iterableFuture, Executor executor, int batchSize) {
+ public IterableToPublisherAdapter(CompletableFuture<? extends Iterable<T>>
iterableFuture, Executor executor, int batchSize) {
this.iterableFuture = iterableFuture;
this.executor = executor;
this.batchSize = batchSize;
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1ab83b4457..c135b32bc2 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -88,7 +88,6 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.compute.AntiHijackIgniteCompute;
-import org.apache.ignite.internal.compute.ComputeComponent;
import org.apache.ignite.internal.compute.ComputeComponentImpl;
import org.apache.ignite.internal.compute.IgniteComputeImpl;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
@@ -312,7 +311,7 @@ public class IgniteImpl implements Ignite {
/** Cluster service (cluster network manager). */
private final ClusterService clusterSvc;
- private final ComputeComponent computeComponent;
+ private final ComputeComponentImpl computeComponent;
private final CriticalWorkerWatchdog criticalWorkerRegistry;
@@ -1094,6 +1093,8 @@ public class IgniteImpl implements Ignite {
computeCfg
);
+ systemViewManager.register(computeComponent);
+
compute = new IgniteComputeImpl(
placementDriverMgr.placementDriver(),
clusterSvc.topologyService(),
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 1ca9dee20c..572c234a92 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -117,6 +117,7 @@ dependencies {
testImplementation libs.archunit.core
testImplementation libs.archunit.junit5
+ integrationTestImplementation libs.awaitility
integrationTestImplementation project(':ignite-api')
integrationTestImplementation project(':ignite-schema')
integrationTestImplementation project(':ignite-catalog')
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItComputeSystemViewTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItComputeSystemViewTest.java
new file mode 100644
index 0000000000..40bddf83c4
--- /dev/null
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItComputeSystemViewTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.compute.JobStatus.CANCELED;
+import static org.apache.ignite.compute.JobStatus.EXECUTING;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.sql.engine.util.Commons.closeQuiet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
+import static
org.apache.ignite.internal.testframework.matchers.TaskStateMatcher.taskStateWithStatus;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasLength;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.TaskStatus;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ColumnType;
+import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * End-to-end tests to verify {@code COMPUTE_TASKS} system view.
+ */
+public class ItComputeSystemViewTest extends BaseSqlIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void checkMeta(boolean isClient) {
+ String query = "SELECT * FROM COMPUTE_TASKS";
+
+ Ignite entryNode = isClient ?
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+
+ try {
+ // Verify metadata.
+ assertQuery(query)
+ .withDefaultSchema("SYSTEM")
+ .columnMetadata(
+ new
MetadataMatcher().name("COORDINATOR_NODE_ID").type(ColumnType.STRING).nullable(false),
+ new
MetadataMatcher().name("ID").type(ColumnType.STRING).precision(36).nullable(true),
+ new
MetadataMatcher().name("STATUS").type(ColumnType.STRING).nullable(true),
+ new
MetadataMatcher().name("CREATE_TIME").type(ColumnType.TIMESTAMP).nullable(true),
+ new
MetadataMatcher().name("START_TIME").type(ColumnType.TIMESTAMP).nullable(true),
+ new
MetadataMatcher().name("FINISH_TIME").type(ColumnType.TIMESTAMP).nullable(true)
+ )
+ .check();
+ } finally {
+ closeQuiet(entryNode);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void viewRunningJobs(boolean isClient) {
+ Ignite entryNode = isClient ?
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+ Ignite targetNode = CLUSTER.node(0);
+
+ try {
+ ClockService clockService =
unwrapIgniteImpl(targetNode).clockService();
+
+ long tsBefore = clockService.now().getPhysical();
+
+ JobDescriptor<Void, Void> job =
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(targetNode)), job, null);
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ long tsAfter = clockService.now().getPhysical();
+
+ String query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE STATUS =
?";
+
+ List<List<Object>> res = sql(0, query, EXECUTING.name());
+
+ assertThat(res, Matchers.hasSize(1));
+
+ verifyComputeJobState(res.get(0), List.of(targetNode.name()),
EXECUTING.name(), tsBefore, tsAfter);
+
+ IgniteTestUtils.await(execution.cancelAsync());
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+
+ // Second Job call on different node.
+ job =
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+
+ tsBefore = clockService.now().getPhysical();
+
+ targetNode = CLUSTER.node(1);
+
+ execution =
entryNode.compute().submit(JobTarget.node(clusterNode(targetNode)), job, null);
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ tsAfter = clockService.now().getPhysical();
+
+ query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE
COORDINATOR_NODE_ID = ? AND STATUS = ?";
+
+ res = sql(0, query, targetNode.name(), EXECUTING.name());
+
+ verifyComputeJobState(res.get(0), List.of(targetNode.name()),
EXECUTING.name(), tsBefore, tsAfter);
+
+ IgniteTestUtils.await(execution.cancelAsync());
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ } finally {
+ closeQuiet(entryNode);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void viewRunningBroadcasts(boolean isClient) {
+ Ignite entryNode = isClient ?
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+
+ try {
+ ClockService clockService =
unwrapIgniteImpl(CLUSTER.node(0)).clockService();
+
+ long tsBefore = clockService.now().getPhysical();
+
+ JobDescriptor<Void, Void> job =
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+ Map<ClusterNode, JobExecution<Void>> execution =
entryNode.compute().submitBroadcast(
+ Set.of(clusterNode(CLUSTER.node(0)),
clusterNode(CLUSTER.node(1))), job, null);
+
+ execution.forEach((k, exec) -> await().until(exec::stateAsync,
willBe(jobStateWithStatus(EXECUTING))));
+
+ long tsAfter = clockService.now().getPhysical();
+
+ String query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE STATUS =
?";
+
+ List<List<Object>> res = sql(0, query, EXECUTING.name());
+
+ assertThat(res.size(), is(2));
+ List<String> execNodes = List.of(CLUSTER.node(0).name(),
CLUSTER.node(1).name());
+
+ verifyComputeJobState(res.get(0), execNodes, EXECUTING.name(),
tsBefore, tsAfter);
+ verifyComputeJobState(res.get(1), execNodes, EXECUTING.name(),
tsBefore, tsAfter);
+
+ execution.forEach((k, exec) ->
IgniteTestUtils.await(exec.cancelAsync()));
+ } finally {
+ closeQuiet(entryNode);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void viewRunningMapReduceTask(boolean isClient) {
+ Ignite entryNode = isClient ?
IgniteClient.builder().addresses("localhost").build() : CLUSTER.node(0);
+ Ignite targetNode = CLUSTER.node(0);
+
+ try {
+ ClockService clockService =
unwrapIgniteImpl(targetNode).clockService();
+
+ long tsBefore = clockService.now().getPhysical();
+
+ TaskExecution<Void> execution = entryNode.compute()
+
.submitMapReduce(TaskDescriptor.builder(MapReduceTaskCustom.class).build(),
null);
+
+ await().until(execution::stateAsync,
willBe(taskStateWithStatus(TaskStatus.EXECUTING)));
+
+ long tsAfter = clockService.now().getPhysical();
+
+ String query = "SELECT * FROM SYSTEM.COMPUTE_TASKS WHERE STATUS =
?";
+
+ List<List<Object>> res = sql(0, query, EXECUTING.name());
+
+ assertThat(res.size(), is(2));
+ List<String> execNodes = List.of(CLUSTER.node(0).name(),
CLUSTER.node(1).name());
+
+ verifyComputeJobState(res.get(0), execNodes, EXECUTING.name(),
tsBefore, tsAfter);
+ verifyComputeJobState(res.get(1), execNodes, EXECUTING.name(),
tsBefore, tsAfter);
+
+ IgniteTestUtils.await(execution.cancelAsync());
+ } finally {
+ closeQuiet(entryNode);
+ }
+ }
+
+ private static ClusterNode clusterNode(Ignite node) {
+ return unwrapIgniteImpl(node).node();
+ }
+
+ private static class InfiniteMapReduceJob implements ComputeJob<Void,
Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
context, Void input) {
+ return new CompletableFuture<>();
+ }
+ }
+
+ private static class MapReduceTaskCustom implements MapReduceTask<Void,
Void, Void, Void> {
+ @Override
+ public CompletableFuture<List<MapReduceJob<Void, Void>>>
splitAsync(TaskExecutionContext taskContext, @Nullable Void input) {
+ return completedFuture(List.of(
+ MapReduceJob.<Void, Void>builder()
+
.jobDescriptor(JobDescriptor.builder(InfiniteMapReduceJob.class).build())
+ .nodes(taskContext.ignite().clusterNodes())
+ .build()
+ ));
+ }
+
+ @Override
+ public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, Void> results) {
+ return completedFuture(null);
+ }
+ }
+
+ private static class InfiniteJob implements ComputeJob<Void, Void> {
+ @Override
+ public @Nullable CompletableFuture<Void>
executeAsync(JobExecutionContext context, @Nullable Void arg) {
+ while (true) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // No op, just return from loop
+ break;
+ }
+ }
+
+ return null;
+ }
+ }
+
+ private static void verifyComputeJobState(
+ List<Object> row,
+ List<String> nodeName,
+ String phase,
+ long tsBefore,
+ long tsAfter
+ ) {
+ int idx = 0;
+
+ // INITIATOR_NODE
+ assertThat(nodeName, hasItem((String) row.get(idx++)));
+
+ // ID
+ assertThat((String) row.get(idx++), hasLength(36));
+
+ // PHASE
+ assertThat(row.get(idx++), equalTo(phase));
+
+ // CREATE_TIME
+ assertThat(((Instant) row.get(idx++)).toEpochMilli(),
Matchers.allOf(greaterThanOrEqualTo(tsBefore), lessThanOrEqualTo(tsAfter)));
+
+ // START_TIME
+ assertThat(((Instant) row.get(idx++)).toEpochMilli(),
Matchers.allOf(greaterThanOrEqualTo(tsBefore), lessThanOrEqualTo(tsAfter)));
+
+ // FINISH_TIME
+ // Asynchronously updated and eventually can be null
+ if (row.get(idx) != null) {
+ assertThat(((Instant) row.get(idx)).toEpochMilli(),
+ Matchers.allOf(greaterThanOrEqualTo(tsBefore),
greaterThanOrEqualTo(tsAfter)));
+ }
+ }
+}