[
https://issues.apache.org/jira/browse/IGNITE-22349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850271#comment-17850271
]
Alexey Kukushkin edited comment on IGNITE-22349 at 5/29/24 6:38 AM:
--------------------------------------------------------------------
h3. Patch
* Overview: [^IGNITE-22349-README.md]
* Diff: [^ignite-22349.patch]
was (Author: kukushal):
h3. Patch
* Overview: attached IGNITE-22349-README.md
* Diff: attached ignite-22349.patch
> Ignite.NET support for priority ordering of Compute jobs
> --------------------------------------------------------
>
> Key: IGNITE-22349
> URL: https://issues.apache.org/jira/browse/IGNITE-22349
> Project: Ignite
> Issue Type: Improvement
> Components: .NET, compute, platforms
> Affects Versions: 2.16
> Reporter: Alexey Kukushkin
> Assignee: Alexey Kukushkin
> Priority: Major
> Labels: .net, compute, platforms
> Attachments: IGNITE-22349-README.md, ignite-22349.patch
>
>
> I want Apache Ignite to support [priority
> ordering|https://ignite.apache.org/docs/latest/distributed-computing/job-scheduling#priority-ordering]
> of Ignite.NET compute jobs on the same node.
> h4. Analysis
> {{PriorityQueueCollisionSpi}} does priority ordering. The problem is the
> {{PriorityQueueCollisionSpi}} expects the user to provide job priorities
> via the {{ComputeTaskSession}}'s "{{grid.task.priority}}" attribute and the
> {{ComputeTaskSession}} is not available in Ignite.NET.
> It looks like the requirement is to add an injectable {{ComputeTaskSession}}
> in Ignite.NET exposing the {{SetAttributes}} operation similar to how it
> works in Java.
> h4. Reproducer
> I expect more or less ordered output from the below reproducer. The output
> may not be completely ordered since completely ordered output requires all
> the jobs to land on the server node in single batch and this reproducer
> cannot guarantee that:
> {noformat}
> >>> Completed job with priority 0
> >>> Completed job with priority 9
> >>> Completed job with priority 8
> >>> Completed job with priority 7
> >>> Completed job with priority 6
> >>> Completed job with priority 5
> >>> Completed job with priority 4
> >>> Completed job with priority 3
> >>> Completed job with priority 2
> >>> Completed job with priority 1
> {noformat}
> {{PriorityQueueCollisionSpiTest.cs}}:
> {code:java}
> public class PriorityQueueCollisionSpiTest
> {
> private static ITestOutputHelper? _output;
> public PriorityQueueCollisionSpiTest(ITestOutputHelper output)
> {
> _output = output;
> }
> /// <summary>
> /// Schedules jobs according to <see
> cref="IComputeTask{TArg,TJobRes,TRes}"/>'s priority.
> /// </summary>
> [Fact]
> public void SchedulesJobsAccordingToTaskPriority()
> {
> // Given an Ignite cluster consisting of server and client nodes
> using var ignored = Ignition.Start(GetIgniteConfiguration("server1"));
> var igniteConfiguration = GetIgniteConfiguration("app1");
> igniteConfiguration.ClientMode = true;
> using var ignite = Ignition.Start(igniteConfiguration);
> var igniteCompute = ignite.GetCompute();
>
> // And the user asynchronously executes multiple tasks, each task
> starting a job having increasing priority
> const int jobCount = 10;
> ICollection<Task> futureResultCollection = new List<Task>(jobCount);
> for (var priority = 0; priority < jobCount; priority++)
> {
> var task = new PriorityTask(priority);
> var futureResult = igniteCompute.ExecuteAsync(task, jobCount);
> futureResultCollection.Add(futureResult);
> }
> // When all the jobs complete
> Task.WaitAll(futureResultCollection.ToArray());
>
> // Then the ">>> Completed job with priority" console output
> demonstrates that the jobs completed in the
> // decreasing priority order, more or less.
> }
> private static IgniteConfiguration GetIgniteConfiguration(string
> igniteName) =>
> new()
> {
> ConsistentId = igniteName,
> IgniteInstanceName = igniteName,
> SpringConfigUrl = "ignite-sandbox.xml",
> DiscoverySpi = new TcpDiscoverySpi
> {
> IpFinder = new TcpDiscoveryStaticIpFinder {Endpoints = new
> List<string> {"127.0.0.1:48700"}},
> LocalPort = 48700
> },
> FailureDetectionTimeout = TimeSpan.FromMinutes(10),
> ClientFailureDetectionTimeout = TimeSpan.FromMinutes(10),
> JvmOptions = new List<string>
> {"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"}
> };
> /// <summary>
> /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> implementation that
> single <see cref="IComputeJob{TRes}"/>s with
> /// the specified priority.
> /// </summary>
> [ComputeTaskSessionFullSupport]
> private sealed class PriorityTask : ComputeTaskSplitAdapter<int, bool,
> bool>
> {
> private readonly int _priority;
> [TaskSessionResource] private IComputeTaskSession _taskSession;
> public PriorityTask(int priority)
> {
> _priority = priority;
> }
> /// <inheritdoc />
> public override bool Reduce(IList<IComputeJobResult<bool>> results)
> => results.All(r => r.Data);
> /// <inheritdoc />
> protected override ICollection<IComputeJob<bool>> Split(int gridSize,
> int jobCount)
> {
> IComputeJob<bool> job = new PriorityJob(_priority);
> _taskSession.SetAttribute("grid.task.priority", _priority);
> var actual = _taskSession.GetAttribute<int>("grid.task.priority");
> Assert.Equal(_priority, actual);
> return new List<IComputeJob<bool>> {job};
> }
> }
> /// <summary>
> /// <see cref="IComputeJob{TRes}"/> implementation with a priority
> indicator.
> /// </summary>
> private class PriorityJob : ComputeJobAdapter<bool>
> {
> private readonly int _priority;
> public PriorityJob(int priority)
> {
> _priority = priority;
> }
> /// <inheritdoc />
> public override bool Execute()
> {
> _output?.WriteLine($">>> Completed job with priority
> {_priority}");
> return true;
> }
> }
> }
> {code}
> {{ignite-sandbox.xml}}:
> {code:xml}
> <?xml version="1.0" encoding="utf-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xmlns:util="http://www.springframework.org/schema/util"
> xmlns:context="http://www.springframework.org/schema/context"
> xsi:schemaLocation="
> http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
> http://www.springframework.org/schema/util
> http://www.springframework.org/schema/util/spring-util-2.5.xsd"
> >
> <bean class="org.apache.ignite.configuration.IgniteConfiguration">
> <property name="collisionSpi">
> <bean
> class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi">
> <property name="parallelJobsNumber" value="1"/>
> <property name="starvationPreventionEnabled" value="false"/>
> </bean>
> </property>
> </bean>
> </beans>
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)