http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java index 58ce001..7499a5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteDeploymentException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskFuture; @@ -83,9 +84,18 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> * @param ctx Kernal context. * @param prj Projection. * @param subjId Subject ID. + */ + public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId) { + this(ctx, prj, subjId, false); + } + + /** + * @param ctx Kernal context. + * @param prj Projection. + * @param subjId Subject ID. * @param async Async support flag. */ - public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async) { + private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async) { super(async); this.ctx = ctx; @@ -105,6 +115,29 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public void affinityRun(@Nullable String cacheName, Object affKey, IgniteRunnable job) { + try { + saveOrGet(affinityRunAsync0(cacheName, affKey, job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> affinityRunAsync(@Nullable String cacheName, Object affKey, + IgniteRunnable job) throws IgniteException { + return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheName, affKey, job)); + } + + /** + * Affinity run implementation. + * + * @param cacheName Cache name. + * @param affKey Affinity key. + * @param job Job. + * @return Internal future. + */ + private IgniteInternalFuture<?> affinityRunAsync0(@Nullable String cacheName, Object affKey, IgniteRunnable job) { A.notNull(affKey, "affKey"); A.notNull(job, "job"); @@ -119,7 +152,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - saveOrGet(ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes())); + return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -131,6 +164,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job) { + try { + saveOrGet(affinityRunAsync0(cacheNames, affKey, job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, Object affKey, + IgniteRunnable job) throws IgniteException { + return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames, affKey, job)); + } + + /** + * Affinity run implementation. + * + * @param cacheNames Cache names collection. + * @param affKey Affinity key. + * @param job Job. + * @return Internal future. + */ + private IgniteInternalFuture<?> affinityRunAsync0(@NotNull Collection<String> cacheNames, Object affKey, + IgniteRunnable job) { A.notNull(affKey, "affKey"); A.notNull(job, "job"); A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); @@ -148,7 +205,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - saveOrGet(ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes())); + return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -160,6 +217,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job) { + try { + saveOrGet(affinityRunAsync0(cacheNames, partId, job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, int partId, + IgniteRunnable job) throws IgniteException { + return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames, partId, job)); + } + + /** + * Affinity run implementation. + * + * @param cacheNames Cache names collection. + * @param partId partition ID. + * @param job Job. + * @return Internal future. + */ + private IgniteInternalFuture<?> affinityRunAsync0(@NotNull Collection<String> cacheNames, int partId, + IgniteRunnable job) { A.ensure(partId >= 0, "partId = " + partId); A.notNull(job, "job"); A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); @@ -167,7 +248,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - saveOrGet(ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes())); + return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -179,6 +260,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R> R affinityCall(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) { + try { + return saveOrGet(affinityCallAsync0(cacheName, affKey, job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> affinityCallAsync(@Nullable String cacheName, Object affKey, + IgniteCallable<R> job) throws IgniteException { + return createFuture(affinityCallAsync0(cacheName, affKey, job)); + } + + /** + * Affinity call implementation. + + * @param cacheName Cache name. + * @param affKey Affinity key. + * @param job Job. + * @return Internal future. + */ + private <R> IgniteInternalFuture<R> affinityCallAsync0(@Nullable String cacheName, Object affKey, + IgniteCallable<R> job) { A.notNull(affKey, "affKey"); A.notNull(job, "job"); @@ -193,8 +298,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - return saveOrGet(ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, - prj.nodes())); + return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, prj.nodes()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -206,8 +310,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job) { + try { + return saveOrGet(affinityCallAsync0(cacheNames, affKey, job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, Object affKey, + IgniteCallable<R> job) throws IgniteException { + return createFuture(affinityCallAsync0(cacheNames, affKey, job)); + } + /** + * Affinity call implementation. + + * @param cacheNames Cache names collection. + * @param affKey Affinity key. + * @param job Job. + * @return Internal future. + */ + private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull Collection<String> cacheNames, Object affKey, + IgniteCallable<R> job) { A.notNull(affKey, "affKey"); A.notNull(job, "job"); A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); @@ -225,7 +351,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes())); + return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -237,6 +363,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job) { + try { + return saveOrGet(affinityCallAsync0(cacheNames, partId, job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, int partId, + IgniteCallable<R> job) throws IgniteException { + return createFuture(affinityCallAsync0(cacheNames, partId, job)); + } + + /** + * Affinity call implementation. + + * @param cacheNames Cache names collection. + * @param partId Partition ID. + * @param job Job. + * @return Internal future. + */ + private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull Collection<String> cacheNames, int partId, + IgniteCallable<R> job) { A.ensure(partId >= 0, "partId = " + partId); A.notNull(job, "job"); A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty"); @@ -244,7 +394,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes())); + return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -257,6 +407,28 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T, R> R execute(String taskName, @Nullable T arg) { + try { + return (R)saveOrGet(executeAsync0(taskName, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> ComputeTaskFuture<R> executeAsync(String taskName, @Nullable T arg) throws IgniteException { + return (ComputeTaskFuture<R>)createFuture(executeAsync0(taskName, arg)); + } + + /** + * Execute implementation. + * + * @param taskName Task name. + * @param arg Argument. + * @return Internal future. + */ + @SuppressWarnings("unchecked") + private <T, R> IgniteInternalFuture<R> executeAsync0(String taskName, @Nullable T arg) { A.notNull(taskName, "taskName"); guard(); @@ -265,10 +437,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return (R)saveOrGet(ctx.task().execute(taskName, arg)); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.task().execute(taskName, arg); } finally { unguard(); @@ -277,6 +446,29 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) { + try { + return (R)saveOrGet(executeAsync0(taskCls, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> ComputeTaskFuture<R> executeAsync(Class<? extends ComputeTask<T, R>> taskCls, + @Nullable T arg) throws IgniteException { + return (ComputeTaskFuture<R>)createFuture(executeAsync0(taskCls, arg)); + } + + /** + * Execute implementation. + * + * @param taskCls Task class. + * @param arg Argument. + * @return Internal future. + */ + @SuppressWarnings("unchecked") + private <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) { A.notNull(taskCls, "taskCls"); guard(); @@ -285,10 +477,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return saveOrGet(ctx.task().execute(taskCls, arg)); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.task().execute(taskCls, arg); } finally { unguard(); @@ -297,30 +486,28 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg) { - A.notNull(task, "task"); - - guard(); - try { - ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); - ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - - return saveOrGet(ctx.task().execute(task, arg)); + return (R)saveOrGet(executeAsync0(task, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); } - finally { - unguard(); - } + } + + /** {@inheritDoc} */ + @Override public <T, R> ComputeTaskFuture<R> executeAsync(ComputeTask<T, R> task, @Nullable T arg) + throws IgniteException { + return (ComputeTaskFuture<R>)createFuture(executeAsync0(task, arg)); } /** + * Execute implementation. + * * @param task Task. * @param arg Task argument. * @return Task future. */ - public <T, R> ComputeTaskInternalFuture<R> executeAsync(ComputeTask<T, R> task, @Nullable T arg) { + public <T, R> ComputeTaskInternalFuture<R> executeAsync0(ComputeTask<T, R> task, @Nullable T arg) { A.notNull(task, "task"); guard(); @@ -336,21 +523,34 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> } } + /** {@inheritDoc} */ + @Override public void broadcast(IgniteRunnable job) { + try { + saveOrGet(broadcastAsync0(job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> broadcastAsync(IgniteRunnable job) throws IgniteException { + return (IgniteFuture<Void>)createFuture(broadcastAsync0(job)); + } + /** - * @param taskName Task name. - * @param arg Task argument. - * @return Task future. + * Broadcast implementation. + * + * @param job Job. + * @return Internal future. */ - public <T, R> ComputeTaskInternalFuture<R> executeAsync(String taskName, @Nullable T arg) { - A.notNull(taskName, "taskName"); + private IgniteInternalFuture<?> broadcastAsync0(IgniteRunnable job) { + A.notNull(job, "job"); guard(); try { - ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); - ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - - return ctx.task().execute(taskName, arg); + return ctx.closure().runAsync(BROADCAST, job, prj.nodes()); } finally { unguard(); @@ -358,33 +558,33 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> } /** {@inheritDoc} */ - @Override public void broadcast(IgniteRunnable job) { - A.notNull(job, "job"); - - guard(); - + @Override public <R> Collection<R> broadcast(IgniteCallable<R> job) { try { - saveOrGet(ctx.closure().runAsync(BROADCAST, job, prj.nodes())); + return saveOrGet(broadcastAsync0(job)); } catch (IgniteCheckedException e) { throw U.convertException(e); } - finally { - unguard(); - } } /** {@inheritDoc} */ - @Override public <R> Collection<R> broadcast(IgniteCallable<R> job) { + @Override public <R> IgniteFuture<Collection<R>> broadcastAsync(IgniteCallable<R> job) throws IgniteException { + return createFuture(broadcastAsync0(job)); + } + + /** + * Broadcast implementation. + * + * @param job Job. + * @return Internal future. + */ + private <R> IgniteInternalFuture<Collection<R>> broadcastAsync0(IgniteCallable<R> job) { A.notNull(job, "job"); guard(); try { - return saveOrGet(ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes()); } finally { unguard(); @@ -393,15 +593,34 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R, T> Collection<R> broadcast(IgniteClosure<T, R> job, @Nullable T arg) { + try { + return saveOrGet(broadcastAsync0(job, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R, T> IgniteFuture<Collection<R>> broadcastAsync(IgniteClosure<T, R> job, + @Nullable T arg) throws IgniteException { + return createFuture(broadcastAsync0(job, arg)); + } + + /** + * Broadcast implementation. + * + * @param job Job. + * @param arg Argument. + * @return Internal future. + */ + private <R, T> IgniteInternalFuture<Collection<R>> broadcastAsync0(IgniteClosure<T, R> job, @Nullable T arg) { A.notNull(job, "job"); guard(); try { - return saveOrGet(ctx.closure().broadcast(job, arg, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().broadcast(job, arg, prj.nodes()); } finally { unguard(); @@ -410,15 +629,32 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public void run(IgniteRunnable job) { + try { + saveOrGet(runAsync0(job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> runAsync(IgniteRunnable job) throws IgniteException { + return (IgniteFuture<Void>)createFuture(runAsync0(job)); + } + + /** + * Run implementation. + * + * @param job Job. + * @return Internal future. + */ + private IgniteInternalFuture<?> runAsync0(IgniteRunnable job) { A.notNull(job, "job"); guard(); try { - saveOrGet(ctx.closure().runAsync(BALANCE, job, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().runAsync(BALANCE, job, prj.nodes()); } finally { unguard(); @@ -427,15 +663,33 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public void run(Collection<? extends IgniteRunnable> jobs) { + try { + saveOrGet(runAsync0(jobs)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> runAsync(Collection<? extends IgniteRunnable> jobs) + throws IgniteException { + return (IgniteFuture<Void>)createFuture(runAsync0(jobs)); + } + + /** + * Run implementation. + * + * @param jobs Jobs. + * @return Internal future. + */ + private IgniteInternalFuture<?> runAsync0(Collection<? extends IgniteRunnable> jobs) { A.notEmpty(jobs, "jobs"); guard(); try { - saveOrGet(ctx.closure().runAsync(BALANCE, jobs, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().runAsync(BALANCE, jobs, prj.nodes()); } finally { unguard(); @@ -444,15 +698,34 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R, T> R apply(IgniteClosure<T, R> job, @Nullable T arg) { + try { + return saveOrGet(applyAsync0(job, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R, T> IgniteFuture<R> applyAsync(IgniteClosure<T, R> job, @Nullable T arg) + throws IgniteException { + return (IgniteFuture<R>)createFuture(applyAsync0(job, arg)); + } + + /** + * Apply implementation. + * + * @param job Job. + * @param arg Argument. + * @return Internal future. + */ + private <R, T> IgniteInternalFuture<R> applyAsync0(IgniteClosure<T, R> job, @Nullable T arg) { A.notNull(job, "job"); guard(); try { - return saveOrGet(ctx.closure().callAsync(job, arg, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().callAsync(job, arg, prj.nodes()); } finally { unguard(); @@ -461,15 +734,32 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R> R call(IgniteCallable<R> job) { + try { + return saveOrGet(callAsync0(job)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> callAsync(IgniteCallable<R> job) throws IgniteException { + return (IgniteFuture<R>)createFuture(callAsync0(job)); + } + + /** + * Call implementation. + * + * @param job Job. + * @return Internal future. + */ + private <R> IgniteInternalFuture<R> callAsync0(IgniteCallable<R> job) { A.notNull(job, "job"); guard(); try { - return saveOrGet(ctx.closure().callAsync(BALANCE, job, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().callAsync(BALANCE, job, prj.nodes()); } finally { unguard(); @@ -478,15 +768,33 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R> Collection<R> call(Collection<? extends IgniteCallable<R>> jobs) { + try { + return saveOrGet(callAsync0(jobs)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<Collection<R>> callAsync( + Collection<? extends IgniteCallable<R>> jobs) throws IgniteException { + return (IgniteFuture<Collection<R>>)createFuture(callAsync0(jobs)); + } + + /** + * Call implementation. + * + * @param jobs Jobs. + * @return Internal future. + */ + private <R> IgniteInternalFuture<Collection<R>> callAsync0(Collection<? extends IgniteCallable<R>> jobs) { A.notEmpty(jobs, "jobs"); guard(); try { - return saveOrGet(ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes()); } finally { unguard(); @@ -495,16 +803,36 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <T, R> Collection<R> apply(final IgniteClosure<T, R> job, @Nullable Collection<? extends T> args) { + try { + return saveOrGet(applyAsync0(job, args)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<Collection<R>> applyAsync(IgniteClosure<T, R> job, + Collection<? extends T> args) throws IgniteException { + return (IgniteFuture<Collection<R>>)createFuture(applyAsync0(job, args)); + } + + /** + * Apply implementation. + * + * @param job Job. + * @param args Arguments/ + * @return Internal future. + */ + private <T, R> IgniteInternalFuture<Collection<R>> applyAsync0(final IgniteClosure<T, R> job, + @Nullable Collection<? extends T> args) { A.notNull(job, "job"); A.notNull(args, "args"); guard(); try { - return saveOrGet(ctx.closure().callAsync(job, args, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().callAsync(job, args, prj.nodes()); } finally { unguard(); @@ -513,16 +841,36 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R1, R2> R2 call(Collection<? extends IgniteCallable<R1>> jobs, IgniteReducer<R1, R2> rdc) { + try { + return saveOrGet(callAsync0(jobs, rdc)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R1, R2> IgniteFuture<R2> callAsync(Collection<? extends IgniteCallable<R1>> jobs, + IgniteReducer<R1, R2> rdc) throws IgniteException { + return (IgniteFuture<R2>)createFuture(callAsync0(jobs, rdc)); + } + + /** + * Call with reducer implementation. + * + * @param jobs Jobs. + * @param rdc Reducer. + * @return Internal future. + */ + private <R1, R2> IgniteInternalFuture<R2> callAsync0(Collection<? extends IgniteCallable<R1>> jobs, + IgniteReducer<R1, R2> rdc) { A.notEmpty(jobs, "jobs"); A.notNull(rdc, "rdc"); guard(); try { - return saveOrGet(ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes()); } finally { unguard(); @@ -532,6 +880,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public <R1, R2, T> R2 apply(IgniteClosure<T, R1> job, Collection<? extends T> args, IgniteReducer<R1, R2> rdc) { + try { + return saveOrGet(applyAsync0(job, args, rdc)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <R1, R2, T> IgniteFuture<R2> applyAsync(IgniteClosure<T, R1> job, + Collection<? extends T> args, IgniteReducer<R1, R2> rdc) throws IgniteException { + return createFuture(applyAsync0(job, args, rdc)); + } + + /** + * Apply with reducer implementation. + * + * @param job Job + * @param args Arguments. + * @param rdc Reducer. + * @return Internal future. + */ + private <R1, R2, T> IgniteInternalFuture<R2> applyAsync0(IgniteClosure<T, R1> job, Collection<? extends T> args, + IgniteReducer<R1, R2> rdc) { A.notNull(job, "job"); A.notNull(rdc, "rdc"); A.notNull(args, "args"); @@ -539,10 +911,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return saveOrGet(ctx.closure().callAsync(job, args, rdc, prj.nodes())); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return ctx.closure().callAsync(job, args, rdc, prj.nodes()); } finally { unguard(); @@ -646,7 +1015,8 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - ctx.deploy().undeployTask(taskName, prj.node(ctx.localNodeId()) != null, prj.forRemotes().nodes()); + ctx.deploy().undeployTask(taskName, prj.node(ctx.localNodeId()) != null, + prj.forRemotes().nodes()); } finally { unguard();
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java index 3c6218d..9acccab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java @@ -27,13 +27,16 @@ import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteEvents; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.events.Event; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -93,12 +96,34 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<List<T>> remoteQueryAsync(IgnitePredicate<T> p, long timeout, + @Nullable int... types) throws IgniteException { + + guard(); + + try { + return new IgniteFutureImpl<>(ctx.event().remoteEventsAsync(compoundPredicate(p, types), + prj.nodes(), timeout)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) { return remoteListen(1, 0, true, locLsnr, rmtFilter, types); } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync( + @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) throws IgniteException { + return remoteListenAsync(1, 0, true, locLsnr, rmtFilter, types); + } + + /** {@inheritDoc} */ @Override public <T extends Event> UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) { @@ -128,6 +153,32 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize, long interval, + boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) throws IgniteException { + A.ensure(bufSize > 0, "bufSize > 0"); + A.ensure(interval >= 0, "interval >= 0"); + + guard(); + + try { + GridEventConsumeHandler hnd = new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr, + (IgnitePredicate<Event>)rmtFilter, types); + + return new IgniteFutureImpl<>(ctx.continuous().startRoutine( + hnd, + false, + bufSize, + interval, + autoUnsubscribe, + prj.predicate())); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void stopRemoteListen(UUID opId) { A.notNull(opId, "consumeId"); @@ -145,6 +196,21 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException { + A.notNull(opId, "consumeId"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter, @Nullable int... types) { guard(); @@ -161,6 +227,19 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable IgnitePredicate<T> filter, + @Nullable int... types) throws IgniteException { + guard(); + + try { + return new IgniteFutureImpl<>(ctx.event().waitForEvent(filter, types)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) { A.notNull(p, "p"); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 541fad4..4c23dd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -25,15 +25,18 @@ import java.io.ObjectStreamException; import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** @@ -76,6 +79,17 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> /** {@inheritDoc} */ @Override public void send(@Nullable Object topic, Object msg) { + send0(topic, msg, isAsync()); + } + + /** + * Implementation of send. + * @param topic Topic. + * @param msg Message. + * @param async Async flag. + * @throws IgniteException On error. + */ + private void send0(@Nullable Object topic, Object msg, boolean async) throws IgniteException { A.notNull(msg, "msg"); guard(); @@ -86,7 +100,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (snapshot.isEmpty()) throw U.emptyTopologyException(); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, async); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -98,6 +112,17 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> /** {@inheritDoc} */ @Override public void send(@Nullable Object topic, Collection<?> msgs) { + send0(topic, msgs, isAsync()); + } + + /** + * Implementation of send. + * @param topic Topic. + * @param msgs Messages. + * @param async Async flag. + * @throws IgniteException On error. + */ + private void send0(@Nullable Object topic, Collection<?> msgs, boolean async) throws IgniteException { A.ensure(!F.isEmpty(msgs), "msgs cannot be null or empty"); guard(); @@ -111,7 +136,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> for (Object msg : msgs) { A.notNull(msg, "msg"); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, async); } } catch (IgniteCheckedException e) { @@ -200,6 +225,28 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> } /** {@inheritDoc} */ + @Override public IgniteFuture<UUID> remoteListenAsync(@Nullable Object topic, + IgniteBiPredicate<UUID, ?> p) throws IgniteException { + A.notNull(p, "p"); + + guard(); + + try { + GridContinuousHandler hnd = new GridMessageListenHandler(topic, (IgniteBiPredicate<UUID, Object>)p); + + return new IgniteFutureImpl<>(ctx.continuous().startRoutine(hnd, + false, + 1, + 0, + false, + prj.predicate())); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void stopRemoteListen(UUID opId) { A.notNull(opId, "opId"); @@ -216,6 +263,20 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> } } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException { + A.notNull(opId, "opId"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId)); + } + finally { + unguard(); + } + } + /** * <tt>ctx.gateway().readLock()</tt> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java index df6e5df..607dccc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java @@ -28,8 +28,10 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; @@ -38,6 +40,7 @@ import org.jetbrains.annotations.Nullable; /** * {@link org.apache.ignite.IgniteServices} implementation. */ +@SuppressWarnings("unchecked") public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteServices, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -91,6 +94,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployNodeSingleton(prj, name, svc)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deployClusterSingleton(String name, Service svc) { A.notNull(name, "name"); A.notNull(svc, "svc"); @@ -109,6 +127,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployClusterSingleton(prj, name, svc)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) { A.notNull(name, "name"); A.notNull(svc, "svc"); @@ -127,6 +160,23 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, + int maxPerNodeCnt) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployMultiple(prj, name, svc, + totalCnt, maxPerNodeCnt)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deployKeyAffinitySingleton(String name, Service svc, @Nullable String cacheName, Object affKey) { A.notNull(name, "name"); @@ -147,6 +197,24 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, + @Nullable String cacheName, Object affKey) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + A.notNull(affKey, "affKey"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployKeyAffinitySingleton(name, svc, + cacheName, affKey)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deploy(ServiceConfiguration cfg) { A.notNull(cfg, "cfg"); @@ -164,6 +232,20 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException { + A.notNull(cfg, "cfg"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deploy(cfg)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void cancel(String name) { A.notNull(name, "name"); @@ -181,6 +263,20 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> cancelAsync(String name) throws IgniteException { + A.notNull(name, "name"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancel(name)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void cancelAll() { guard(); @@ -196,6 +292,18 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> cancelAllAsync() throws IgniteException { + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancelAll()); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ServiceDescriptor> serviceDescriptors() { guard(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java index 5d5c06f..75c9a71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -212,7 +212,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { if (compute == null) { assert ctx != null; - compute = new IgniteComputeImpl(ctx, this, subjId, false); + compute = new IgniteComputeImpl(ctx, this, subjId); } return compute; http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java index fb9b190..d392813 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.io.ObjectStreamException; import java.util.Collection; import java.util.Map; import java.util.UUID; @@ -30,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCluster; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; @@ -115,7 +115,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> int maxConn) { try { - return saveOrGet(cluster.startNodesAsync(file, restart, timeout, maxConn)); + return saveOrGet(cluster.startNodesAsync0(file, restart, timeout, maxConn)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -123,6 +123,12 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, + int timeout, int maxConn) throws IgniteException { + return cluster.startNodesAsync(file, restart, timeout, maxConn); + } + + /** {@inheritDoc} */ @Override public Collection<ClusterStartNodeResult> startNodes( Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, @@ -131,7 +137,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> int maxConn) { try { - return saveOrGet(cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn)); + return saveOrGet(cluster.startNodesAsync0(hosts, dflts, restart, timeout, maxConn)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -139,6 +145,13 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync( + Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, + boolean restart, int timeout, int maxConn) throws IgniteException { + return cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn); + } + + /** {@inheritDoc} */ @Override public void stopNodes() { cluster.stopNodes(); } @@ -312,13 +325,4 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(cluster); } - - /** - * @return Cluster async instance. - * - * @throws ObjectStreamException If failed. - */ - protected Object readResolve() throws ObjectStreamException { - return cluster.withAsync(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java index 58a2128..e429547 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.nodestart.IgniteRemoteStartSpecification; import org.apache.ignite.internal.util.nodestart.IgniteSshHelper; import org.apache.ignite.internal.util.nodestart.StartNodeCallable; @@ -222,7 +223,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus throws IgniteException { try { - return startNodesAsync(file, restart, timeout, maxConn).get(); + return startNodesAsync0(file, restart, timeout, maxConn).get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -230,6 +231,12 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, + int timeout, int maxConn) throws IgniteException { + return new IgniteFutureImpl<>(startNodesAsync0(file, restart, timeout, maxConn)); + } + + /** {@inheritDoc} */ @Override public Collection<ClusterStartNodeResult> startNodes(Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, boolean restart, @@ -238,7 +245,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus throws IgniteException { try { - return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get(); + return startNodesAsync0(hosts, dflts, restart, timeout, maxConn).get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -246,6 +253,13 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync( + Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, + boolean restart, int timeout, int maxConn) throws IgniteException { + return new IgniteFutureImpl<>(startNodesAsync0(hosts, dflts, restart, timeout, maxConn)); + } + + /** {@inheritDoc} */ @Override public void stopNodes() throws IgniteException { guard(); @@ -330,7 +344,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus * @return Future with results. * @see IgniteCluster#startNodes(java.io.File, boolean, int, int) */ - IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, + IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(File file, boolean restart, int timeout, int maxConn) @@ -342,7 +356,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus try { IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file); - return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn); + return startNodesAsync0(t.get1(), t.get2(), restart, timeout, maxConn); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -358,7 +372,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus * @return Future with results. * @see IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int) */ - IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync( + IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0( Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, boolean restart, http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index 2220bfe..5ee28f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -623,11 +623,7 @@ public class JdbcConnection implements Connection { throw new SQLException("Failed to establish connection with node (is it a server node?): " + nodeId); - IgniteCompute compute = ignite.compute(grp).withAsync(); - - compute.call(task); - - return compute.<Boolean>future().get(timeout, SECONDS); + return ignite.compute(grp).callAsync(task).get(timeout, SECONDS); } else return task.call(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 0e8c263..28cf0db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -93,6 +93,7 @@ import org.jetbrains.annotations.Nullable; /** * Cache proxy. */ +@SuppressWarnings("unchecked") public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V>> implements IgniteCache<K, V>, Externalizable { /** */ @@ -378,10 +379,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { if (isAsync()) { - if (ctx.cache().isLocal()) - setFuture(ctx.cache().localLoadCacheAsync(p, args)); - else - setFuture(ctx.cache().globalLoadCacheAsync(p, args)); + setFuture(ctx.cache().isLocal() ? + ctx.cache().localLoadCacheAsync(p, args) + : ctx.cache().globalLoadCacheAsync(p, args)); } else { if (ctx.cache().isLocal()) @@ -400,6 +400,27 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, + @Nullable Object... args) throws CacheException { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(ctx.cache().isLocal() ? + ctx.cache().localLoadCacheAsync(p, args) : ctx.cache().globalLoadCacheAsync(p, args)); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { try { GridCacheGateway<K, V> gate = this.gate; @@ -422,6 +443,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, + @Nullable Object... args) throws CacheException { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(delegate.localLoadCacheAsync(p, args)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { try { GridCacheGateway<K, V> gate = this.gate; @@ -447,6 +483,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getAndPutIfAbsentAsync(key, val)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public Lock lock(K key) throws CacheException { return lockAll(Collections.singleton(key)); } @@ -475,6 +525,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @param transformer Transformer * @param grp Optional cluster group. * @return Cursor. + * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private <T, R> QueryCursor<R> query( @@ -535,6 +586,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @param filter Filter. * @param grp Optional cluster group. * @return Cursor. + * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp) @@ -643,6 +695,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * * @param qry Query. * @param loc Local flag. + * @param keepBinary Keep binary flag. * @return Initial iteration cursor. */ @SuppressWarnings("unchecked") @@ -1004,6 +1057,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.sizeAsync(peekModes)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException { GridCacheGateway<K, V> gate = this.gate; @@ -1027,6 +1094,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.sizeLongAsync(peekModes)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException { GridCacheGateway<K, V> gate = this.gate; @@ -1050,6 +1131,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Long> sizeLongAsync(int part, CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.sizeLongAsync(part, peekModes)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { GridCacheGateway<K, V> gate = this.gate; @@ -1126,6 +1221,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAsync(K key) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getAsync(key)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public CacheEntry<K, V> getEntry(K key) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1151,6 +1260,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getEntryAsync(key)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1176,6 +1299,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getAllAsync(keys)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1201,6 +1338,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getEntriesAsync(keys)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1225,6 +1376,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getAllOutTxAsync(keys)); + } + finally { + onLeave(gate, prev); + } + } + /** * @param keys Keys. * @return Values map. @@ -1293,6 +1458,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeyAsync(K key) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.containsKeyAsync(key)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public boolean containsKeys(Set<? extends K> keys) { GridCacheGateway<K, V> gate = this.gate; @@ -1313,6 +1492,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.containsKeysAsync(keys)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void loadAll( Set<? extends K> keys, boolean replaceExisting, @@ -1353,24 +1546,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V CacheOperationContext prev = onEnter(gate, opCtx); try { - if (isAsync()) { - IgniteInternalFuture<Boolean> fut = delegate.putAsync(key, val); - - IgniteInternalFuture<Void> fut0 = fut.chain(new CX1<IgniteInternalFuture<Boolean>, Void>() { - @Override public Void applyx(IgniteInternalFuture<Boolean> fut) throws IgniteCheckedException { - try { - fut.get(); - } - catch (RuntimeException e) { - throw new GridClosureException(e); - } - - return null; - } - }); - - setFuture(fut0); - } + if (isAsync()) + setFuture(putAsync0(key, val)); else delegate.put(key, val); } @@ -1384,6 +1561,44 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> putAsync(K key, V val) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(putAsync0(key, val)); + } + finally { + onLeave(gate, prev); + } + } + + /** + * Put async internal operation implementation. + * + * @param key Key. + * @param val Value. + * @return Internal future. + */ + private IgniteInternalFuture<Void> putAsync0(K key, V val) { + IgniteInternalFuture<Boolean> fut = delegate.putAsync(key, val); + + return fut.chain(new CX1<IgniteInternalFuture<Boolean>, Void>() { + @Override public Void applyx(IgniteInternalFuture<Boolean> fut1) throws IgniteCheckedException { + try { + fut1.get(); + } + catch (RuntimeException e) { + throw new GridClosureException(e); + } + + return null; + } + }); + } + + /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1409,6 +1624,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndPutAsync(K key, V val) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getAndPutAsync(key, val)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1431,6 +1660,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(delegate.putAllAsync(map)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1456,6 +1699,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.putIfAbsentAsync(key, val)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public boolean remove(K key) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1481,6 +1738,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> removeAsync(K key) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.removeAsync(key)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1506,6 +1777,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.removeAsync(key, oldVal)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public V getAndRemove(K key) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1531,6 +1816,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndRemoveAsync(K key) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getAndRemoveAsync(key)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1556,6 +1855,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.replaceAsync(key, oldVal, newVal)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1581,6 +1894,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.replaceAsync(key, val)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1606,6 +1933,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.getAndReplaceAsync(key, val)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1628,6 +1969,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync(keys)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void removeAll() { GridCacheGateway<K, V> gate = this.gate; @@ -1648,6 +2003,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> removeAllAsync() { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync()); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void clear(K key) { GridCacheGateway<K, V> gate = this.gate; @@ -1668,6 +2037,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAsync(K key) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(delegate.clearAsync(key)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) { GridCacheGateway<K, V> gate = this.gate; @@ -1688,6 +2071,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(delegate.clearAllAsync(keys)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void clear() { GridCacheGateway<K, V> gate = this.gate; @@ -1708,6 +2105,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAsync() { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return (IgniteFuture<Void>)createFuture(delegate.clearAsync()); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public void localClear(K key) { GridCacheGateway<K, V> gate = this.gate; @@ -1746,23 +2157,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { if (isAsync()) { - IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); - - IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { - @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) - throws IgniteCheckedException { - try { - EntryProcessorResult<T> res = fut.get(); - - return res != null ? res.get() : null; - } - catch (RuntimeException e) { - throw new GridClosureException(e); - } - } - }); - - setFuture(fut0); + setFuture(invokeAsync0(key, entryProcessor, args)); return null; } @@ -1782,11 +2177,59 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, + Object... args) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(invokeAsync0(key, entryProcessor, args)); + } + finally { + onLeave(gate, prev); + } + } + + /** + * Invoke async operation internal implementation. + * + * @param key Key. + * @param entryProcessor Processor. + * @param args Arguments. + * @return Internal future. + */ + private <T> IgniteInternalFuture<T> invokeAsync0(K key, EntryProcessor<K, V, T> entryProcessor, Object[] args) { + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + return fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut1) + throws IgniteCheckedException { + try { + EntryProcessorResult<T> res = fut1.get(); + + return res != null ? res.get() : null; + } + catch (RuntimeException e) { + throw new GridClosureException(e); + } + } + }); + } + + + /** {@inheritDoc} */ @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args); } + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor, + Object... args) { + return invokeAsync(key, (EntryProcessor<K, V, T>)entryProcessor, args); + } + /** * @param topVer Locked topology version. * @param key Key. @@ -1849,6 +2292,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, Object... args) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { @@ -1876,6 +2334,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { @@ -1903,6 +2376,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return createFuture(delegate.invokeAllAsync(map, args)); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public String getName() { return delegate.name(); } @@ -2109,6 +2597,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** + * @param dataCenterId Data center ID. * @return Projection for data center id. */ @SuppressWarnings("unchecked") @@ -2186,7 +2675,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @param fut Future for async operation. */ private <R> void setFuture(IgniteInternalFuture<R> fut) { - curFut.set(new IgniteCacheFutureImpl<>(fut)); + curFut.set(createFuture(fut)); + } + + /** {@inheritDoc} */ + @Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) { + return new IgniteCacheFutureImpl<>(fut); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 7a69a6f..ca4edb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -1394,6 +1394,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> commitAsync() throws IgniteException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public void close() throws IgniteException { throw new UnsupportedOperationException(); } @@ -1404,6 +1409,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public IgniteAsyncSupport withAsync() { throw new UnsupportedOperationException(); }
