http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java index b5c6261..ad675c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java @@ -28,6 +28,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; @@ -113,6 +114,7 @@ import org.jetbrains.annotations.Nullable; * checkpoints, etc.). If you need to override configured defaults, you should use compute task together with * {@link ComputeTaskSpis} annotation. Refer to {@link ComputeTask} documentation for more information. */ +@SuppressWarnings("deprecation") public interface IgniteCompute extends IgniteAsyncSupport { /** * Gets cluster group to which this {@code IgniteCompute} instance belongs. @@ -137,6 +139,20 @@ public interface IgniteCompute extends IgniteAsyncSupport { public void affinityRun(@Nullable String cacheName, Object affKey, IgniteRunnable job) throws IgniteException; /** + * Executes given job asynchronously on the node where data for provided affinity key is located + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. + * + * @param cacheName Name of the cache to use for affinity co-location. + * @param affKey Affinity key. + * @param job Job which will be co-located on the node with given affinity key. + * @return a Future representing pending completion of the affinity run. + * @throws IgniteException If job failed. + */ + public IgniteFuture<Void> affinityRunAsync(@Nullable String cacheName, Object affKey, IgniteRunnable job) + throws IgniteException; + + /** * Executes given job on the node where data for provided affinity key is located * (a.k.a. affinity co-location). * </p> @@ -154,6 +170,21 @@ public interface IgniteCompute extends IgniteAsyncSupport { throws IgniteException; /** + * Executes given job asynchronously on the node where data for provided affinity key is located + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. The data + * of the extra caches' partitions with the same partition number also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param affKey Affinity key. + * @param job Job which will be co-located on the node with given affinity key. + * @return a Future representing pending completion of the affinity run. + * @throws IgniteException If job failed. + */ + public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, Object affKey, + IgniteRunnable job) throws IgniteException; + + /** * Executes given job on the node where partition is located (the partition is primary on the node) * </p> * It's guaranteed that the data of all the partitions of all participating caches, @@ -170,6 +201,21 @@ public interface IgniteCompute extends IgniteAsyncSupport { throws IgniteException; /** + * Executes given job asynchronously on the node where partition is located (the partition is primary on the node) + * The data of the partition will not be migrated from the target node + * while the job is executed. The data of the extra caches' partitions with the same partition number + * also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param partId Partition number. + * @param job Job which will be co-located on the node with given affinity key. + * @return a Future representing pending completion of the affinity run. + * @throws IgniteException If job failed. + */ + public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, int partId, + IgniteRunnable job) throws IgniteException; + + /** * Executes given job on the node where data for provided affinity key is located * (a.k.a. affinity co-location). * </p> @@ -186,6 +232,20 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <R> R affinityCall(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) throws IgniteException; /** + * Executes given job asynchronously on the node where data for provided affinity key is located + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. + * + * @param cacheName Name of the cache to use for affinity co-location. + * @param affKey Affinity key. + * @param job Job which will be co-located on the node with given affinity key. + * @return a Future representing pending completion of the affinity call. + * @throws IgniteException If job failed. + */ + public <R> IgniteFuture<R> affinityCallAsync(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) + throws IgniteException; + + /** * Executes given job on the node where data for provided affinity key is located * (a.k.a. affinity co-location). * </p> @@ -203,6 +263,21 @@ public interface IgniteCompute extends IgniteAsyncSupport { throws IgniteException; /** + * Executes given job asynchronously on the node where data for provided affinity key is located + * (a.k.a. affinity co-location). The data of the partition where affKey is stored + * will not be migrated from the target node while the job is executed. The data + * of the extra caches' partitions with the same partition number also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param affKey Affinity key. + * @param job Job which will be co-located on the node with given affinity key. + * @return a Future representing pending completion of the affinity call. + * @throws IgniteException If job failed. + */ + public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, Object affKey, + IgniteCallable<R> job) throws IgniteException; + + /** * Executes given job on the node where partition is located (the partition is primary on the node) * </p> * It's guaranteed that the data of all the partitions of all participating caches, @@ -219,6 +294,21 @@ public interface IgniteCompute extends IgniteAsyncSupport { throws IgniteException; /** + * Executes given job asynchronously on the node where partition is located (the partition is primary on the node) + * The data of the partition will not be migrated from the target node + * while the job is executed. The data of the extra caches' partitions with the same partition number + * also will not be migrated. + * + * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location. + * @param partId Partition to reserve. + * @param job Job which will be co-located on the node with given affinity key. + * @return a Future representing pending completion of the affinity call. + * @throws IgniteException If job failed. + */ + public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, int partId, + IgniteCallable<R> job) throws IgniteException; + + /** * Executes given task on within the cluster group. For step-by-step explanation of task execution process * refer to {@link ComputeTask} documentation. * @@ -233,6 +323,20 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <T, R> R execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) throws IgniteException; /** + * Executes given task asynchronously on within the cluster group. For step-by-step explanation of task execution + * process refer to {@link ComputeTask} documentation. + * + * @param taskCls Class of the task to execute. If class has {@link ComputeTaskName} annotation, + * then task is deployed under a name specified within annotation. Otherwise, full + * class name is used as task name. + * @param arg Optional argument of task execution, can be {@code null}. + * @return a Future representing pending completion of the task. + * @throws IgniteException If task failed. + */ + public <T, R> ComputeTaskFuture<R> executeAsync(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) + throws IgniteException; + + /** * Executes given task within the cluster group. For step-by-step explanation of task execution process * refer to {@link ComputeTask} documentation. * @@ -247,6 +351,19 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg) throws IgniteException; /** + * Executes given task asynchronously within the cluster group. For step-by-step explanation of task execution + * process refer to {@link ComputeTask} documentation. + * + * @param task Instance of task to execute. If task class has {@link ComputeTaskName} annotation, + * then task is deployed under a name specified within annotation. Otherwise, full + * class name is used as task name. + * @param arg Optional argument of task execution, can be {@code null}. + * @return a Future representing pending completion of the task. + * @throws IgniteException If task failed. + */ + public <T, R> ComputeTaskFuture<R> executeAsync(ComputeTask<T, R> task, @Nullable T arg) throws IgniteException; + + /** * Executes given task within the cluster group. For step-by-step explanation of task execution process * refer to {@link ComputeTask} documentation. * <p> @@ -263,6 +380,21 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <T, R> R execute(String taskName, @Nullable T arg) throws IgniteException; /** + * Executes given task asynchronously within the cluster group. For step-by-step explanation of task execution + * process refer to {@link ComputeTask} documentation. + * <p> + * If task for given name has not been deployed yet, then {@code taskName} will be + * used as task class name to auto-deploy the task (see {@link #localDeployTask(Class, ClassLoader)} method). + * + * @param taskName Name of the task to execute. + * @param arg Optional argument of task execution, can be {@code null}. + * @return a Future representing pending completion of the task. + * @throws IgniteException If task failed. + * @see ComputeTask for information about task execution. + */ + public <T, R> ComputeTaskFuture<R> executeAsync(String taskName, @Nullable T arg) throws IgniteException; + + /** * Broadcasts given job to all nodes in the cluster group. * * @param job Job to broadcast to all cluster group nodes. @@ -272,6 +404,15 @@ public interface IgniteCompute extends IgniteAsyncSupport { public void broadcast(IgniteRunnable job) throws IgniteException; /** + * Broadcasts given job asynchronously to all nodes in the cluster group. + * + * @param job Job to broadcast to all cluster group nodes. + * @return a Future representing pending completion of the broadcast execution of the job. + * @throws IgniteException If job failed. + */ + public IgniteFuture<Void> broadcastAsync(IgniteRunnable job) throws IgniteException; + + /** * Broadcasts given job to all nodes in cluster group. Every participating node will return a * job result. Collection of all returned job results is returned from the result future. * @@ -283,6 +424,16 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <R> Collection<R> broadcast(IgniteCallable<R> job) throws IgniteException; /** + * Broadcasts given job asynchronously to all nodes in cluster group. Every participating node will return a + * job result. Collection of all returned job results is returned from the result future. + * + * @param job Job to broadcast to all cluster group nodes. + * @return a Future representing pending completion of the broadcast execution of the job. + * @throws IgniteException If execution failed. + */ + public <R> IgniteFuture<Collection<R>> broadcastAsync(IgniteCallable<R> job) throws IgniteException; + + /** * Broadcasts given closure job with passed in argument to all nodes in the cluster group. * Every participating node will return a job result. Collection of all returned job results * is returned from the result future. @@ -296,6 +447,19 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <R, T> Collection<R> broadcast(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException; /** + * Broadcasts given closure job asynchronously with passed in argument to all nodes in the cluster group. + * Every participating node will return a job result. Collection of all returned job results + * is returned from the result future. + * + * @param job Job to broadcast to all cluster group nodes. + * @param arg Job closure argument. + * @return a Future representing pending completion of the broadcast execution of the job. + * @throws IgniteException If execution failed. + */ + public <R, T> IgniteFuture<Collection<R>> broadcastAsync(IgniteClosure<T, R> job, @Nullable T arg) + throws IgniteException; + + /** * Executes provided job on a node within the underlying cluster group. * * @param job Job closure to execute. @@ -305,6 +469,15 @@ public interface IgniteCompute extends IgniteAsyncSupport { public void run(IgniteRunnable job) throws IgniteException; /** + * Executes provided job asynchronously on a node within the underlying cluster group. + * + * @param job Job closure to execute. + * @return a Future representing pending completion of the job. + * @throws IgniteException If execution failed. + */ + public IgniteFuture<Void> runAsync(IgniteRunnable job) throws IgniteException; + + /** * Executes collection of jobs on grid nodes within the underlying cluster group. * * @param jobs Collection of jobs to execute. @@ -314,6 +487,16 @@ public interface IgniteCompute extends IgniteAsyncSupport { public void run(Collection<? extends IgniteRunnable> jobs) throws IgniteException; /** + * Executes collection of jobs asynchronously on grid nodes within the underlying cluster group. + * Executes asynchronously. Returns control immediately. + * + * @param jobs Collection of jobs to execute. + * @return a Future representing pending completion of the job. + * @throws IgniteException If execution failed. + */ + public IgniteFuture<Void> runAsync(Collection<? extends IgniteRunnable> jobs) throws IgniteException; + + /** * Executes provided job on a node within the underlying cluster group. The result of the * job execution is returned from the result closure. * @@ -325,6 +508,16 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <R> R call(IgniteCallable<R> job) throws IgniteException; /** + * Executes provided job asynchronously on a node within the underlying cluster group. The result of the + * job execution is returned from the result closure. + * + * @param job Job to execute. + * @return a Future representing pending completion of the job. + * @throws IgniteException If execution failed. + */ + public <R> IgniteFuture<R> callAsync(IgniteCallable<R> job) throws IgniteException; + + /** * Executes collection of jobs on nodes within the underlying cluster group. * Collection of all returned job results is returned from the result future. * @@ -336,12 +529,23 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <R> Collection<R> call(Collection<? extends IgniteCallable<R>> jobs) throws IgniteException; /** + * Executes collection of jobs asynchronously on nodes within the underlying cluster group. + * Collection of all returned job results is returned from the result future. + * + * @param jobs Collection of jobs to execute. + * @return a Future representing pending completion of the job. + * @throws IgniteException If execution failed. + */ + public <R> IgniteFuture<Collection<R>> callAsync(Collection<? extends IgniteCallable<R>> jobs) + throws IgniteException; + + /** * Executes collection of jobs on nodes within the underlying cluster group. The returned * job results will be reduced into an individual result by provided reducer. * * @param jobs Collection of jobs to execute. * @param rdc Reducer to reduce all job results into one individual return value. - * @return Future with reduced job result for this execution. + * @return Reduced job result for this execution. * @throws IgniteException If execution failed. */ @IgniteAsyncSupported @@ -349,6 +553,18 @@ public interface IgniteCompute extends IgniteAsyncSupport { throws IgniteException; /** + * Executes collection of jobs asynchronously on nodes within the underlying cluster group. The returned + * job results will be reduced into an individual result by provided reducer. + * + * @param jobs Collection of jobs to execute. + * @param rdc Reducer to reduce all job results into one individual return value. + * @return a Future with reduced job result for this execution. + * @throws IgniteException If execution failed. + */ + public <R1, R2> IgniteFuture<R2> callAsync(Collection<? extends IgniteCallable<R1>> jobs, + IgniteReducer<R1, R2> rdc) throws IgniteException; + + /** * Executes provided closure job on a node within the underlying cluster group. This method is different * from {@code run(...)} and {@code call(...)} methods in a way that it receives job argument * which is then passed into the closure at execution time. @@ -362,6 +578,18 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <R, T> R apply(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException; /** + * Executes provided closure job asynchronously on a node within the underlying cluster group. + * This method is different from {@code run(...)} and {@code call(...)} methods in a way that + * it receives job argument which is then passed into the closure at execution time. + * + * @param job Job to run. + * @param arg Job argument. + * @return a Future representing pending completion of the job. + * @throws IgniteException If execution failed. + */ + public <R, T> IgniteFuture<R> applyAsync(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException; + + /** * Executes provided closure job on nodes within the underlying cluster group. A new job is executed for * every argument in the passed in collection. The number of actual job executions will be * equal to size of the job arguments collection. @@ -375,6 +603,19 @@ public interface IgniteCompute extends IgniteAsyncSupport { public <T, R> Collection<R> apply(IgniteClosure<T, R> job, Collection<? extends T> args) throws IgniteException; /** + * Executes provided closure job asynchronously on nodes within the underlying cluster group. A new job is executed + * for every argument in the passed in collection. The number of actual job executions will be + * equal to size of the job arguments collection. + * + * @param job Job to run. + * @param args Job arguments. + * @return a Future representing pending completion of the job. + * @throws IgniteException If execution failed. + */ + public <T, R> IgniteFuture<Collection<R>> applyAsync(IgniteClosure<T, R> job, Collection<? extends T> args) + throws IgniteException; + + /** * Executes provided closure job on nodes within the underlying cluster group. A new job is executed for * every argument in the passed in collection. The number of actual job executions will be * equal to size of the job arguments collection. The returned job results will be reduced @@ -383,7 +624,7 @@ public interface IgniteCompute extends IgniteAsyncSupport { * @param job Job to run. * @param args Job arguments. * @param rdc Reducer to reduce all job results into one individual return value. - * @return Future with reduced job result for this execution. + * @return Reduced job result for this execution. * @throws IgniteException If execution failed. */ @IgniteAsyncSupported @@ -391,6 +632,21 @@ public interface IgniteCompute extends IgniteAsyncSupport { IgniteReducer<R1, R2> rdc) throws IgniteException; /** + * Executes provided closure job asynchronously on nodes within the underlying cluster group. A new job is executed + * for every argument in the passed in collection. The number of actual job executions will be + * equal to size of the job arguments collection. The returned job results will be reduced + * into an individual result by provided reducer. + * + * @param job Job to run. + * @param args Job arguments. + * @param rdc Reducer to reduce all job results into one individual return value. + * @return a Future with reduced job result for this execution. + * @throws IgniteException If execution failed. + */ + public <R1, R2, T> IgniteFuture<R2> applyAsync(IgniteClosure<T, R1> job, Collection<? extends T> args, + IgniteReducer<R1, R2> rdc) throws IgniteException; + + /** * Gets tasks future for active tasks started on local node. * * @return Map of active tasks keyed by their task task session ID. @@ -489,8 +745,10 @@ public interface IgniteCompute extends IgniteAsyncSupport { public void undeployTask(String taskName) throws IgniteException; /** {@inheritDoc} */ + @Deprecated @Override public <R> ComputeTaskFuture<R> future(); /** {@inheritDoc} */ + @Deprecated @Override public IgniteCompute withAsync(); } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java index c0e4d3b..c081f2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java @@ -25,6 +25,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -90,13 +91,27 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously queries nodes in this cluster group for events using passed in predicate filter for event + * selection. + * + * @param p Predicate filter used to query events on remote nodes. + * @param timeout Maximum time to wait for result, {@code 0} to wait forever. + * @param types Event types to be queried. + * @return a Future representing pending completion of the query. The completed future contains + * collection of grid events returned from specified nodes. + * @throws IgniteException If query failed. + */ + public <T extends Event> IgniteFuture<List<T>> remoteQueryAsync(IgnitePredicate<T> p, long timeout, + @Nullable int... types) throws IgniteException; + + /** * Adds event listener for specified events to all nodes in the cluster group (possibly including * local node if it belongs to the cluster group as well). This means that all events occurring on * any node within this cluster group that pass remote filter will be sent to local node for * local listener notifications. * <p> * The listener can be unsubscribed automatically if local node stops, if {@code locLsnr} callback - * returns {@code false} or if {@link #stopRemoteListen(UUID)} is called. + * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link #stopRemoteListenAsync(UUID)} are called. * * @param locLsnr Listener callback that is called on local node. If {@code null}, this events will be handled * on remote nodes by passed in {@code rmtFilter}. @@ -108,7 +123,8 @@ public interface IgniteEvents extends IgniteAsyncSupport { * @param types Types of events to listen for. If not provided, all events that pass the * provided remote filter will be sent to local node. * @param <T> Type of the event. - * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening. + * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or + * {@link #stopRemoteListenAsync(UUID)} methods to stop listening. * @throws IgniteException If failed to add listener. */ @IgniteAsyncSupported @@ -118,6 +134,35 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously adds event listener for specified events to all nodes in the cluster group (possibly including + * local node if it belongs to the cluster group as well). This means that all events occurring on + * any node within this cluster group that pass remote filter will be sent to local node for + * local listener notifications. + * <p> + * The listener can be unsubscribed automatically if local node stops, if {@code locLsnr} callback + * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link #stopRemoteListenAsync(UUID)} are called. + * + * @param <T> Type of the event. + * @param locLsnr Listener callback that is called on local node. If {@code null}, this events will be handled + * on remote nodes by passed in {@code rmtFilter}. + * @param rmtFilter Filter callback that is called on remote node. Only events that pass the remote filter + * will be sent to local node. If {@code null}, all events of specified types will + * be sent to local node. This remote filter can be used to pre-handle events remotely, + * before they are passed in to local callback. It will be auto-unsubsribed on the node + * where event occurred in case if it returns {@code false}. + * @param types Types of events to listen for. If not provided, all events that pass the + * provided remote filter will be sent to local node. + * @return a Future representing pending completion of the operation. The completed future contains + * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or + * {@link #stopRemoteListenAsync(UUID)} methods to stop listening. + * @throws IgniteException If failed to add listener. + */ + public <T extends Event> IgniteFuture<UUID> remoteListenAsync(@Nullable IgniteBiPredicate<UUID, T> locLsnr, + @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) + throws IgniteException; + + /** * Adds event listener for specified events to all nodes in the cluster group (possibly including * local node if it belongs to the cluster group as well). This means that all events occurring on * any node within this cluster group that pass remote filter will be sent to local node for @@ -148,9 +193,11 @@ public interface IgniteEvents extends IgniteAsyncSupport { * @param types Types of events to listen for. If not provided, all events that pass the * provided remote filter will be sent to local node. * @param <T> Type of the event. - * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening. - * @see #stopRemoteListen(UUID) + * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or + * {@link #stopRemoteListen(UUID)} methods to stop listening. * @throws IgniteException If failed to add listener. + * @see #stopRemoteListen(UUID) + * @see #stopRemoteListenAsync(UUID) */ @IgniteAsyncSupported public <T extends Event> UUID remoteListen(int bufSize, @@ -162,6 +209,50 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously adds event listener for specified events to all nodes in the cluster group (possibly including + * local node if it belongs to the cluster group as well). This means that all events occurring on + * any node within this cluster group that pass remote filter will be sent to local node for + * local listener notification. + * + * @param <T> Type of the event. + * @param bufSize Remote events buffer size. Events from remote nodes won't be sent until buffer + * is full or time interval is exceeded. + * @param interval Maximum time interval after which events from remote node will be sent. Events + * from remote nodes won't be sent until buffer is full or time interval is exceeded. + * @param autoUnsubscribe Flag indicating that event listeners on remote nodes should be + * automatically unregistered if master node (node that initiated event listening) leaves + * topology. If this flag is {@code false}, listeners will be unregistered only when + * {@link #stopRemoteListen(UUID)} method is called, or the {@code 'callback (locLsnr)'} + * passed in returns {@code false}. + * @param locLsnr Callback that is called on local node. If this predicate returns {@code true}, + * the implementation will continue listening to events. Otherwise, events + * listening will be stopped and listeners will be unregistered on all nodes + * in the cluster group. If {@code null}, this events will be handled on remote nodes by + * passed in {@code rmtFilter} until local node stops (if {@code 'autoUnsubscribe'} is {@code true}) + * or until {@link #stopRemoteListen(UUID)} is called. + * @param rmtFilter Filter callback that is called on remote node. Only events that pass the remote filter + * will be sent to local node. If {@code null}, all events of specified types will + * be sent to local node. This remote filter can be used to pre-handle events remotely, + * before they are passed in to local callback. It will be auto-unsubsribed on the node + * where event occurred in case if it returns {@code false}. + * @param types Types of events to listen for. If not provided, all events that pass the + * provided remote filter will be sent to local node. + * @return a Future representing pending completion of the operation. The completed future contains + * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} + * or {@link #stopRemoteListen(UUID)} methods to stop listening. + * @throws IgniteException If failed to add listener. + * @see #stopRemoteListen(UUID) + * @see #stopRemoteListenAsync(UUID) + */ + public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize, + long interval, + boolean autoUnsubscribe, + @Nullable IgniteBiPredicate<UUID, T> locLsnr, + @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) + throws IgniteException; + + /** * Stops listening to remote events. This will unregister all listeners identified with provided * operation ID on all nodes defined by {@link #clusterGroup()}. * <p> @@ -169,13 +260,27 @@ public interface IgniteEvents extends IgniteAsyncSupport { * * @param opId Operation ID that was returned from * {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} method. - * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...) * @throws IgniteException If failed to stop listeners. + * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...) + * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, IgnitePredicate, int...) */ @IgniteAsyncSupported public void stopRemoteListen(UUID opId) throws IgniteException; /** + * Asynchronously stops listening to remote events. This will unregister all listeners identified with provided + * operation ID on all nodes defined by {@link #clusterGroup()}. + * + * @param opId Operation ID that was returned from + * {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} method. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to stop listeners. + * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...) + * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, IgnitePredicate, int...) + */ + public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException; + + /** * Waits for the specified events. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -191,6 +296,18 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Create future to wait for the specified events. + * + * @param filter Optional filtering predicate. Only if predicates evaluates to {@code true} will the event + * end the wait. + * @param types Types of the events to wait for. If not provided, all events will be passed to the filter. + * @return a Future representing pending completion of the operation. The completed future contains grid event. + * @throws IgniteException If wait was interrupted. + */ + public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable IgnitePredicate<T> filter, + @Nullable int... types) throws IgniteException; + + /** * Queries local node for events using passed-in predicate filter for event selection. * * @param p Predicate to filter events. All predicates must be satisfied for the @@ -269,5 +386,6 @@ public interface IgniteEvents extends IgniteAsyncSupport { public boolean isEnabled(int type); /** {@inheritDoc} */ + @Deprecated @Override public IgniteEvents withAsync(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java index 8fb4fcd..78c86dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java @@ -32,6 +32,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.apache.ignite.igfs.IgfsPathNotFoundException; @@ -275,6 +276,15 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { public void format() throws IgniteException; /** + * Asynchronously formats the file system removing all existing entries from it. + * <p> + * + * @return a Future representing pending completion of the format operation. + * @throws IgniteException In case format has failed. + */ + public IgniteFuture<Void> formatAsync() throws IgniteException; + + /** * Executes IGFS task. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -291,6 +301,20 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException; /** + * Executes IGFS task asynchronously. + * + * @param task Task to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException; + + + /** * Executes IGFS task with overridden maximum range length (see * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). * <p> @@ -313,6 +337,25 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { throws IgniteException; /** + * Executes IGFS task asynchronously with overridden maximum range length (see + * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). + * + * @param task Task to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param skipNonExistentFiles Whether to skip non existent files. If set to {@code true} non-existent files will + * be ignored. Otherwise an exception will be thrown. + * @param maxRangeLen Optional maximum range length. If {@code 0}, then by default all consecutive + * IGFS blocks will be included. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) + throws IgniteException; + + /** * Executes IGFS task. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -329,6 +372,20 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException; /** + * Executes IGFS task asynchronously. + * + * @param taskCls Task class to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException; + + + /** * Executes IGFS task with overridden maximum range length (see * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). * <p> @@ -350,6 +407,24 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { long maxRangeLen, @Nullable T arg) throws IgniteException; /** + * Executes IGFS task asynchronously with overridden maximum range length (see + * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). + * + * @param taskCls Task class to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param skipNonExistentFiles Whether to skip non existent files. If set to {@code true} non-existent files will + * be ignored. Otherwise an exception will be thrown. + * @param maxRangeLen Maximum range length. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, + long maxRangeLen, @Nullable T arg) throws IgniteException; + + /** * Checks if the specified path exists in the file system. * * @param path Path to check for existence in the file system. @@ -473,5 +548,6 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { public long usedSpaceSize() throws IgniteException; /** {@inheritDoc} */ + @Deprecated @Override public IgniteFileSystem withAsync(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java index e64ded5..f8257d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java @@ -25,6 +25,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** @@ -159,6 +160,22 @@ public interface IgniteMessaging extends IgniteAsyncSupport { public UUID remoteListen(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) throws IgniteException; /** + * Asynchronously adds a message listener for a given topic to all nodes in the cluster group (possibly including + * this node if it belongs to the cluster group as well). This means that any node within this cluster + * group can send a message for a given topic and all nodes within the cluster group will receive + * listener notifications. + * + * @param topic Topic to subscribe to, {@code null} means default topic. + * @param p Predicate that is called on each node for each received message. If predicate returns {@code false}, + * then it will be unsubscribed from any further notifications. + * @return a Future representing pending completion of the operation. The completed future contains + * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening. + * @throws IgniteException If failed to add listener. + */ + public IgniteFuture<UUID> remoteListenAsync(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) + throws IgniteException; + + /** * Unregisters all listeners identified with provided operation ID on all nodes in the cluster group. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -169,6 +186,16 @@ public interface IgniteMessaging extends IgniteAsyncSupport { @IgniteAsyncSupported public void stopRemoteListen(UUID opId) throws IgniteException; + /** + * Asynchronously unregisters all listeners identified with provided operation ID on all nodes in the cluster group. + * + * @param opId Listen ID that was returned from {@link #remoteListen(Object, IgniteBiPredicate)} method. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to unregister listeners. + */ + public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException; + /** {@inheritDoc} */ + @Deprecated @Override IgniteMessaging withAsync(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/IgniteServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java index 8365ec7..1c01598 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; @@ -150,7 +151,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * when a singleton service instance will be active on more than one node (e.g. crash detection delay). * <p> * This method is analogous to calling - * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 1, 1)} method. + * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 1, 1)} + * method. * * @param name Service name. * @param svc Service instance. @@ -160,13 +162,35 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deployClusterSingleton(String name, Service svc) throws IgniteException; /** + * Asynchronously deploys a cluster-wide singleton service. Ignite will guarantee that there is always + * one instance of the service in the cluster. In case if grid node on which the service + * was deployed crashes or stops, Ignite will automatically redeploy it on another node. + * However, if the node on which the service is deployed remains in topology, then the + * service will always be deployed on that node only, regardless of topology changes. + * <p> + * Note that in case of topology changes, due to network delays, there may be a temporary situation + * when a singleton service instance will be active on more than one node (e.g. crash detection delay). + * <p> + * This method is analogous to calling + * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, int, int) + * deployMultipleAsync(name, svc, 1, 1)} method. + * + * @param name Service name. + * @param svc Service instance. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException; + + /** * Deploys a per-node singleton service. Ignite will guarantee that there is always * one instance of the service running on each node. Whenever new nodes are started * within the underlying cluster group, Ignite will automatically deploy one instance of * the service on every new node. * <p> * This method is analogous to calling - * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 0, 1)} method. + * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 0, 1)} + * method. * * @param name Service name. * @param svc Service instance. @@ -176,6 +200,23 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deployNodeSingleton(String name, Service svc) throws IgniteException; /** + * Asynchronously deploys a per-node singleton service. Ignite will guarantee that there is always + * one instance of the service running on each node. Whenever new nodes are started + * within the underlying cluster group, Ignite will automatically deploy one instance of + * the service on every new node. + * <p> + * This method is analogous to calling + * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, int, int) + * deployMultipleAsync(name, svc, 0, 1)} method. + * + * @param name Service name. + * @param svc Service instance. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException; + + /** * Deploys one instance of this service on the primary node for a given affinity key. * Whenever topology changes and primary node assignment changes, Ignite will always * make sure that the service is undeployed on the previous primary node and deployed @@ -184,8 +225,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * Note that in case of topology changes, due to network delays, there may be a temporary situation * when a service instance will be active on more than one node (e.g. crash detection delay). * <p> - * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} method - * as follows: + * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} + * method as follows: * <pre name="code" class="java"> * ServiceConfiguration cfg = new ServiceConfiguration(); * @@ -211,6 +252,41 @@ public interface IgniteServices extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously deploys one instance of this service on the primary node for a given affinity key. + * Whenever topology changes and primary node assignment changes, Ignite will always + * make sure that the service is undeployed on the previous primary node and deployed + * on the new primary node. + * <p> + * Note that in case of topology changes, due to network delays, there may be a temporary situation + * when a service instance will be active on more than one node (e.g. crash detection delay). + * <p> + * This method is analogous to the invocation of + * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} method as follows: + * <pre name="code" class="java"> + * ServiceConfiguration cfg = new ServiceConfiguration(); + * + * cfg.setName(name); + * cfg.setService(svc); + * cfg.setCacheName(cacheName); + * cfg.setAffinityKey(affKey); + * cfg.setTotalCount(1); + * cfg.setMaxPerNodeCount(1); + * + * ignite.services().deployAsync(cfg); + * </pre> + * + * @param name Service name. + * @param svc Service instance. + * @param cacheName Name of the cache on which affinity for key should be calculated, {@code null} for + * default cache. + * @param affKey Affinity cache key. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, @Nullable String cacheName, + Object affKey) throws IgniteException; + + /** * Deploys multiple instances of the service on the grid. Ignite will deploy a * maximum amount of services equal to {@code 'totalCnt'} parameter making sure that * there are no more than {@code 'maxPerNodeCnt'} service instances running @@ -221,8 +297,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have * value greater than {@code 0}. * <p> - * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} method - * as follows: + * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} + * method as follows: * <pre name="code" class="java"> * ServiceConfiguration cfg = new ServiceConfiguration(); * @@ -244,20 +320,57 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) throws IgniteException; /** + * Asynchronously deploys multiple instances of the service on the grid. Ignite will deploy a + * maximum amount of services equal to {@code 'totalCnt'} parameter making sure that + * there are no more than {@code 'maxPerNodeCnt'} service instances running + * on each node. Whenever topology changes, Ignite will automatically rebalance + * the deployed services within cluster to make sure that each node will end up with + * about equal number of deployed instances whenever possible. + * <p> + * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have + * value greater than {@code 0}. + * <p> + * This method is analogous to the invocation of + * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} method as follows: + * <pre name="code" class="java"> + * ServiceConfiguration cfg = new ServiceConfiguration(); + * + * cfg.setName(name); + * cfg.setService(svc); + * cfg.setTotalCount(totalCnt); + * cfg.setMaxPerNodeCount(maxPerNodeCnt); + * + * ignite.services().deployAsync(cfg); + * </pre> + * + * @param name Service name. + * @param svc Service instance. + * @param totalCnt Maximum number of deployed services in the grid, {@code 0} for unlimited. + * @param maxPerNodeCnt Maximum number of deployed services on each node, {@code 0} for unlimited. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt) + throws IgniteException; + + /** * Deploys multiple instances of the service on the grid according to provided * configuration. Ignite will deploy a maximum amount of services equal to * {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() cfg.getTotalCount()} parameter - * making sure that there are no more than {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()} + * making sure that there are no more than + * {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()} * service instances running on each node. Whenever topology changes, Ignite will automatically rebalance * the deployed services within cluster to make sure that each node will end up with * about equal number of deployed instances whenever possible. * <p> - * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()} is not {@code null}, then Ignite - * will deploy the service on the primary node for given affinity key. The affinity will be calculated - * on the cache with {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name. + * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()} + * is not {@code null}, then Ignite will deploy the service on the primary node for given affinity key. + * The affinity will be calculated on the cache with + * {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name. * <p> - * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()} is not {@code null}, then - * Ignite will deploy service on all grid nodes for which the provided filter evaluates to {@code true}. + * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()} + * is not {@code null}, then Ignite will deploy service on all grid nodes for which + * the provided filter evaluates to {@code true}. * The node filter will be checked in addition to the underlying cluster group filter, or the * whole grid, if the underlying cluster group includes all the cluster nodes. * <p> @@ -283,12 +396,56 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deploy(ServiceConfiguration cfg) throws IgniteException; /** + * Asynchronously deploys multiple instances of the service on the grid according to provided + * configuration. Ignite will deploy a maximum amount of services equal to + * {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() cfg.getTotalCount()} parameter + * making sure that there are no more than + * {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()} + * service instances running on each node. Whenever topology changes, Ignite will automatically rebalance + * the deployed services within cluster to make sure that each node will end up with + * about equal number of deployed instances whenever possible. + * <p> + * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()} + * is not {@code null}, then Ignite + * will deploy the service on the primary node for given affinity key. The affinity will be calculated + * on the cache with {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name. + * <p> + * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()} + * is not {@code null}, then Ignite will deploy service on all grid nodes + * for which the provided filter evaluates to {@code true}. + * The node filter will be checked in addition to the underlying cluster group filter, or the + * whole grid, if the underlying cluster group includes all the cluster nodes. + * <p> + * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have + * value greater than {@code 0}. + * <p> + * Here is an example of creating service deployment configuration: + * <pre name="code" class="java"> + * ServiceConfiguration cfg = new ServiceConfiguration(); + * + * cfg.setName(name); + * cfg.setService(svc); + * cfg.setTotalCount(0); // Unlimited. + * cfg.setMaxPerNodeCount(2); // Deploy 2 instances of service on each node. + * + * ignite.services().deployAsync(cfg); + * </pre> + * + * @param cfg Service configuration. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException; + + /** * Cancels service deployment. If a service with specified name was deployed on the grid, - * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} method will be called on it. + * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * method will be called on it. * <p> - * Note that Ignite cannot guarantee that the service exits from {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)} - * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} is called. It is up to the user to - * make sure that the service code properly reacts to cancellations. + * Note that Ignite cannot guarantee that the service exits from + * {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)} + * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * is called. It is up to the user to make sure that the service code properly reacts to cancellations. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). * @@ -299,6 +456,23 @@ public interface IgniteServices extends IgniteAsyncSupport { public void cancel(String name) throws IgniteException; /** + * Asynchronously cancels service deployment. If a service with specified name was deployed on the grid, + * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * method will be called on it. + * <p> + * Note that Ignite cannot guarantee that the service exits from + * {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)} + * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * is called. It is up to the user to + * make sure that the service code properly reacts to cancellations. + * + * @param name Name of service to cancel. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to cancel service. + */ + public IgniteFuture<Void> cancelAsync(String name) throws IgniteException; + + /** * Cancels all deployed services. * <p> * Note that depending on user logic, it may still take extra time for a service to @@ -312,6 +486,17 @@ public interface IgniteServices extends IgniteAsyncSupport { public void cancelAll() throws IgniteException; /** + * Asynchronously cancels all deployed services. + * <p> + * Note that depending on user logic, it may still take extra time for a service to + * finish execution, even after it was cancelled. + * + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to cancel services. + */ + public IgniteFuture<Void> cancelAllAsync() throws IgniteException; + + /** * Gets metadata about all deployed services in the grid. * * @return Metadata about all deployed services in the grid. @@ -364,8 +549,10 @@ public interface IgniteServices extends IgniteAsyncSupport { * @return Either proxy over remote service or local service if it is deployed locally. * @throws IgniteException If failed to create service proxy. */ - public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout) throws IgniteException; + public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout) + throws IgniteException; /** {@inheritDoc} */ + @Deprecated @Override public IgniteServices withAsync(); }
