IGNITE-4475: New async API: now all async methods are defined explicitly, IgniteAsyncSupport is deprecated. This closes #1648.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/282b334f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/282b334f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/282b334f Branch: refs/heads/master Commit: 282b334f76479460613f28347d8cea97ba23f795 Parents: 906b692 Author: tledkov-gridgain <[email protected]> Authored: Mon Mar 27 13:19:47 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Mar 27 13:19:49 2017 +0300 ---------------------------------------------------------------------- .../computegrid/ComputeAsyncExample.java | 8 +- .../ComputeFibonacciContinuationExample.java | 13 +- .../examples/datagrid/CacheAsyncApiExample.java | 17 +- .../datastructures/IgniteLockExample.java | 4 +- .../datastructures/IgniteSemaphoreExample.java | 4 +- .../examples/ScalarContinuationExample.scala | 10 +- .../singlesplit/SingleSplitsLoadTest.java | 8 +- .../ignite/tests/utils/TestTransaction.java | 11 + .../internal/client/ClientStartNodeTask.java | 10 +- .../java/org/apache/ignite/IgniteCache.java | 611 +++++++++++- .../java/org/apache/ignite/IgniteCluster.java | 126 +++ .../java/org/apache/ignite/IgniteCompute.java | 262 ++++- .../java/org/apache/ignite/IgniteEvents.java | 128 ++- .../org/apache/ignite/IgniteFileSystem.java | 76 ++ .../java/org/apache/ignite/IgniteMessaging.java | 27 + .../java/org/apache/ignite/IgniteServices.java | 221 ++++- .../ignite/internal/IgniteComputeImpl.java | 546 +++++++++-- .../ignite/internal/IgniteEventsImpl.java | 79 ++ .../ignite/internal/IgniteMessagingImpl.java | 65 +- .../ignite/internal/IgniteServicesImpl.java | 108 +++ .../internal/cluster/ClusterGroupAdapter.java | 2 +- .../cluster/IgniteClusterAsyncImpl.java | 28 +- .../internal/cluster/IgniteClusterImpl.java | 24 +- .../ignite/internal/jdbc2/JdbcConnection.java | 6 +- .../processors/cache/IgniteCacheProxy.java | 574 ++++++++++- .../store/GridCacheStoreManagerAdapter.java | 10 + .../transactions/TransactionProxyImpl.java | 39 +- .../internal/processors/igfs/IgfsAsyncImpl.java | 43 +- .../internal/processors/igfs/IgfsImpl.java | 62 +- .../platform/PlatformAbstractTarget.java | 24 +- .../platform/PlatformAsyncTarget.java | 44 - .../platform/PlatformTargetProxy.java | 22 - .../platform/PlatformTargetProxyImpl.java | 36 +- .../platform/cache/PlatformCache.java | 218 +++-- .../platform/compute/PlatformCompute.java | 14 +- ...formDotNetEntityFrameworkCacheExtension.java | 8 +- .../platform/events/PlatformEvents.java | 70 +- .../platform/messaging/PlatformMessaging.java | 35 +- .../platform/services/PlatformServices.java | 95 +- .../transactions/PlatformTransactions.java | 9 +- .../visor/cache/VisorCacheClearTask.java | 14 +- .../visor/compute/VisorGatewayTask.java | 6 +- .../apache/ignite/lang/IgniteAsyncSupport.java | 52 +- .../ignite/lang/IgniteAsyncSupported.java | 4 +- .../apache/ignite/transactions/Transaction.java | 22 + .../IgniteCacheExpiryStoreLoadSelfTest.java | 18 +- .../internal/ClusterGroupAbstractTest.java | 73 +- .../ComputeJobCancelWithServiceSelfTest.java | 7 +- .../internal/GridCancelOnGridStopSelfTest.java | 2 +- .../GridCancelledJobsMetricsSelfTest.java | 11 +- .../internal/GridContinuousTaskSelfTest.java | 21 +- .../GridEventStorageCheckAllEventsSelfTest.java | 12 +- .../GridFailoverCustomTopologySelfTest.java | 7 +- .../GridJobMasterLeaveAwareSelfTest.java | 93 +- .../internal/GridMultipleJobsSelfTest.java | 8 +- .../ignite/internal/GridReduceSelfTest.java | 9 +- .../GridTaskCancelSingleNodeSelfTest.java | 7 +- .../internal/GridTaskExecutionSelfTest.java | 22 +- .../GridTaskFailoverAffinityRunTest.java | 7 +- .../GridTaskInstanceExecutionSelfTest.java | 7 +- .../internal/GridTaskJobRejectSelfTest.java | 7 +- .../IgniteClientReconnectApiExceptionTest.java | 10 +- .../IgniteComputeEmptyClusterGroupTest.java | 26 +- .../cache/CacheConcurrentReadThroughTest.java | 8 +- .../cache/CacheFutureExceptionSelfTest.java | 6 +- .../CachePutEventListenerErrorSelfTest.java | 7 +- .../GridCacheAbstractFailoverSelfTest.java | 6 +- ...cheAbstractFullApiMultithreadedSelfTest.java | 105 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 876 ++++++++++++++--- .../cache/GridCacheAbstractMetricsSelfTest.java | 52 +- .../GridCacheAsyncOperationsLimitSelfTest.java | 9 +- .../GridCacheConcurrentTxMultiNodeTest.java | 10 +- .../GridCacheInterceptorAbstractSelfTest.java | 31 +- .../GridCacheMissingCommitVersionSelfTest.java | 6 +- .../cache/GridCachePutAllFailoverSelfTest.java | 16 +- .../GridCacheReferenceCleanupSelfTest.java | 15 +- .../IgniteCacheAbstractStopBusySelfTest.java | 7 +- .../IgniteCacheConfigVariationsFullApiTest.java | 965 +++++++++++++++++-- .../cache/IgniteCacheInvokeAbstractTest.java | 20 +- .../IgniteCacheManyAsyncOperationsTest.java | 6 +- .../cache/IgniteCachePeekModesAbstractTest.java | 35 +- .../cache/WithKeepBinaryCacheFullApiTest.java | 228 ++--- .../CacheKeepBinaryWithInterceptorTest.java | 16 +- .../GridCacheBinaryObjectsAbstractSelfTest.java | 75 +- ...eAbstractDataStructuresFailoverSelfTest.java | 7 +- ...ridCacheQueueJoinedNodeSelfAbstractTest.java | 12 +- .../IgniteCountDownLatchAbstractSelfTest.java | 7 +- .../IgniteLockAbstractSelfTest.java | 7 +- .../IgniteSemaphoreAbstractSelfTest.java | 7 +- ...acheAsyncOperationsFailoverAbstractTest.java | 12 +- .../distributed/CacheAsyncOperationsTest.java | 32 +- .../CachePutAllFailoverAbstractTest.java | 8 +- .../GridCacheAbstractJobExecutionTest.java | 15 +- .../GridCacheBasicOpAbstractTest.java | 38 +- .../distributed/GridCacheEventAbstractTest.java | 82 +- .../GridCacheMultiNodeAbstractTest.java | 25 +- ...yMetadataUpdateChangingTopologySelfTest.java | 13 +- .../IgniteCacheConnectionRecoveryTest.java | 10 +- ...eCacheMessageRecoveryIdleConnectionTest.java | 6 +- ...cOriginatingNodeFailureAbstractSelfTest.java | 6 +- .../dht/GridCacheGlobalLoadTest.java | 21 +- .../dht/GridCacheTxNodeFailureSelfTest.java | 12 +- .../IgniteCachePutRetryAbstractSelfTest.java | 26 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 34 +- ...idCacheNearOnlyMultiNodeFullApiSelfTest.java | 11 +- .../GridCachePartitionedLoadCacheSelfTest.java | 9 +- .../GridCacheEmptyEntriesAbstractSelfTest.java | 21 +- .../GridCacheContinuousQueryConcurrentTest.java | 5 +- .../closure/GridClosureProcessorSelfTest.java | 74 +- ...ComputeJobExecutionErrorToLogManualTest.java | 10 +- ...gniteComputeConfigVariationsFullApiTest.java | 533 +++++++++- .../continuous/GridEventConsumeSelfTest.java | 196 +++- .../internal/processors/igfs/IgfsMock.java | 41 + .../processors/igfs/IgfsTaskSelfTest.java | 19 + ...niteMessagingConfigVariationFullApiTest.java | 93 +- .../GridServiceProcessorAbstractSelfTest.java | 291 +++++- .../GridServiceProcessorMultiNodeSelfTest.java | 18 +- .../GridServiceProcessorStopSelfTest.java | 7 +- .../loadtest/GridSingleExecutionTest.java | 10 +- .../loadtests/colocation/GridTestMain.java | 7 +- .../multisplit/GridMultiSplitsLoadTest.java | 7 +- ...ridSingleSplitsNewNodesAbstractLoadTest.java | 8 +- .../ignite/loadtests/dsi/GridDsiClient.java | 12 +- ...GridJobExecutionLoadTestClientSemaphore.java | 9 +- ...JobExecutionSingleNodeSemaphoreLoadTest.java | 10 +- .../loadtests/job/GridJobLoadTestSubmitter.java | 7 +- .../mergesort/GridMergeSortLoadTask.java | 7 +- .../ignite/messaging/GridMessagingSelfTest.java | 73 +- .../messaging/IgniteMessagingSendAsyncTest.java | 83 +- ...idSessionFutureWaitJobAttributeSelfTest.java | 7 +- ...GridSessionSetJobAttributeOrderSelfTest.java | 8 +- ...sionSetJobAttributeWaitListenerSelfTest.java | 7 +- .../GridSessionSetTaskAttributeSelfTest.java | 7 +- ...GridSessionTaskWaitJobAttributeSelfTest.java | 7 +- .../GridSessionWaitAttributeSelfTest.java | 9 +- .../cache/GridAbstractCacheStoreSelfTest.java | 11 + .../junits/common/GridCommonAbstractTest.java | 26 +- .../multijvm/IgniteCacheProcessProxy.java | 172 ++++ .../multijvm/IgniteClusterProcessProxy.java | 13 + .../multijvm/IgniteEventsProcessProxy.java | 31 + ...niteCacheLockPartitionOnAffinityRunTest.java | 6 +- .../cache/IgniteCacheQueryLoadSelfTest.java | 20 +- .../cpp/jni/include/ignite/jni/exports.h | 2 - .../platforms/cpp/jni/include/ignite/jni/java.h | 4 - modules/platforms/cpp/jni/project/vs/module.def | 2 - modules/platforms/cpp/jni/src/exports.cpp | 8 - modules/platforms/cpp/jni/src/java.cpp | 20 - .../scalar/pimps/ScalarProjectionPimp.scala | 24 +- ...gniteProjectionStartStopRestartSelfTest.java | 6 +- .../commands/tasks/VisorTasksCommandSpec.scala | 22 +- .../IgniteAtomicInvokeRetryBenchmark.java | 12 +- .../failover/IgniteAtomicRetriesBenchmark.java | 12 +- .../IgniteFailoverAbstractBenchmark.java | 18 +- ...IgniteTransactionalInvokeRetryBenchmark.java | 10 +- ...IgniteTransactionalWriteInvokeBenchmark.java | 16 +- .../IgniteTransactionalWriteReadBenchmark.java | 12 +- 156 files changed, 6806 insertions(+), 2268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java index e8321a5..8064ace 100644 --- a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java @@ -49,21 +49,19 @@ public class ComputeAsyncExample { System.out.println("Compute asynchronous example started."); // Enable asynchronous mode. - IgniteCompute compute = ignite.compute().withAsync(); + IgniteCompute compute = ignite.compute(); Collection<IgniteFuture<?>> futs = new ArrayList<>(); // Iterate through all words in the sentence and create runnable jobs. for (final String word : "Print words using runnable".split(" ")) { // Execute runnable on some node. - compute.run(new IgniteRunnable() { + futs.add(compute.runAsync(new IgniteRunnable() { @Override public void run() { System.out.println(); System.out.println(">>> Printing '" + word + "' on this node from ignite job."); } - }); - - futs.add(compute.future()); + })); } // Wait for completion of all futures. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java index 6642e9d..0fe12f1 100644 --- a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java @@ -27,7 +27,6 @@ import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobContext; -import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.examples.ExampleNodeStartup; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; @@ -142,13 +141,12 @@ public final class ComputeFibonacciContinuationExample { ClusterGroup p = ignite.cluster().forPredicate(nodeFilter); - IgniteCompute compute = ignite.compute(p).withAsync(); + IgniteCompute compute = ignite.compute(p); // If future is not cached in node-local-map, cache it. if (fut1 == null) { - compute.apply(new ContinuationFibonacciClosure(nodeFilter), n - 1); - - ComputeTaskFuture<BigInteger> futVal = compute.future(); + IgniteFuture<BigInteger> futVal = compute.applyAsync( + new ContinuationFibonacciClosure(nodeFilter), n - 1); fut1 = locMap.putIfAbsent(n - 1, futVal); @@ -158,9 +156,8 @@ public final class ComputeFibonacciContinuationExample { // If future is not cached in node-local-map, cache it. if (fut2 == null) { - compute.apply(new ContinuationFibonacciClosure(nodeFilter), n - 2); - - ComputeTaskFuture<BigInteger> futVal = compute.<BigInteger>future(); + IgniteFuture<BigInteger> futVal = compute.applyAsync( + new ContinuationFibonacciClosure(nodeFilter), n - 2); fut2 = locMap.putIfAbsent(n - 2, futVal); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheAsyncApiExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheAsyncApiExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheAsyncApiExample.java index 3699361..69b23e7 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheAsyncApiExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheAsyncApiExample.java @@ -53,27 +53,18 @@ public class CacheAsyncApiExample { // Auto-close cache at the end of the example. try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) { - // Enable asynchronous mode. - IgniteCache<Integer, String> asyncCache = cache.withAsync(); - Collection<IgniteFuture<?>> futs = new ArrayList<>(); // Execute several puts asynchronously. - for (int i = 0; i < 10; i++) { - asyncCache.put(i, String.valueOf(i)); - - futs.add(asyncCache.future()); - } + for (int i = 0; i < 10; i++) + futs.add(cache.putAsync(i, String.valueOf(i))); // Wait for completion of all futures. for (IgniteFuture<?> fut : futs) fut.get(); - // Execute get operation asynchronously. - asyncCache.get(1); - - // Asynchronously wait for result. - asyncCache.<String>future().listen(new IgniteInClosure<IgniteFuture<String>>() { + // Execute get operation asynchronously and wait for result. + cache.getAsync(1).listen(new IgniteInClosure<IgniteFuture<String>>() { @Override public void apply(IgniteFuture<String> fut) { System.out.println("Get operation completed [value=" + fut.get() + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java index 1f84787..ba035ae 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java @@ -87,11 +87,11 @@ public class IgniteLockExample { // Start consumers on all cluster nodes. for (int i = 0; i < NUM_CONSUMERS; i++) - ignite.compute().withAsync().run(new Consumer(reentrantLockName)); + ignite.compute().runAsync(new Consumer(reentrantLockName)); // Start producers on all cluster nodes. for (int i = 0; i < NUM_PRODUCERS; i++) - ignite.compute().withAsync().run(new Producer(reentrantLockName)); + ignite.compute().runAsync(new Producer(reentrantLockName)); System.out.println("Master node is waiting for all other nodes to finish..."); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java index 1c078b0..12d1eab 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java @@ -67,11 +67,11 @@ public class IgniteSemaphoreExample { // Start consumers on all cluster nodes. for (int i = 0; i < NUM_CONSUMERS; i++) - ignite.compute().withAsync().run(new Consumer(semaphoreName)); + ignite.compute().runAsync(new Consumer(semaphoreName)); // Start producers on all cluster nodes. for (int i = 0; i < NUM_PRODUCERS; i++) - ignite.compute().withAsync().run(new Producer(semaphoreName)); + ignite.compute().runAsync(new Producer(semaphoreName)); System.out.println("Master node is waiting for all other nodes to finish..."); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala index 203f0b7..62b3a13 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala @@ -117,14 +117,12 @@ class FibonacciClosure ( // Group that excludes node with id passed in constructor if others exists. val prj = if (ignite$.cluster().nodes().size() > 1) ignite$.cluster().forOthers(excludeNode) else ignite$.cluster().forNode(excludeNode) - val comp = ignite$.compute(prj).withAsync() + val comp = ignite$.compute(prj) // If future is not cached in node-local store, cache it. // Note recursive execution! if (fut1 == null) { - comp.apply(new FibonacciClosure(excludeNodeId), n - 1) - - val futVal = comp.future[BigInteger]() + val futVal = comp.applyAsync(new FibonacciClosure(excludeNodeId), n - 1) fut1 = store.putIfAbsent(n - 1, futVal) @@ -134,9 +132,7 @@ class FibonacciClosure ( // If future is not cached in node-local store, cache it. if (fut2 == null) { - comp.apply(new FibonacciClosure(excludeNodeId), n - 2) - - val futVal = comp.future[BigInteger]() + val futVal = comp.applyAsync(new FibonacciClosure(excludeNodeId), n - 2) fut2 = store.putIfAbsent(n - 2, futVal) http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/aop/src/test/java/org/apache/loadtests/direct/singlesplit/SingleSplitsLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/aop/src/test/java/org/apache/loadtests/direct/singlesplit/SingleSplitsLoadTest.java b/modules/aop/src/test/java/org/apache/loadtests/direct/singlesplit/SingleSplitsLoadTest.java index 402bdf4..b42ff71 100644 --- a/modules/aop/src/test/java/org/apache/loadtests/direct/singlesplit/SingleSplitsLoadTest.java +++ b/modules/aop/src/test/java/org/apache/loadtests/direct/singlesplit/SingleSplitsLoadTest.java @@ -18,7 +18,6 @@ package org.apache.loadtests.direct.singlesplit; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.configuration.IgniteConfiguration; @@ -104,11 +103,8 @@ public class SingleSplitsLoadTest extends GridCommonAbstractTest { try { int levels = 20; - IgniteCompute comp = ignite.compute().withAsync(); - - comp.execute(new SingleSplitTestTask(), levels); - - ComputeTaskFuture<Integer> fut = comp.future(); + ComputeTaskFuture<Integer> fut = ignite.compute().executeAsync( + new SingleSplitTestTask(), levels); int res = fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java index 5f3ec69..4a03d25 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java @@ -17,6 +17,7 @@ package org.apache.ignite.tests.utils; +import org.apache.ignite.IgniteException; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; @@ -106,6 +107,11 @@ public class TestTransaction implements Transaction { } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> commitAsync() throws IgniteException { + return null; + } + + /** {@inheritDoc} */ @Override public void close() { // No-op. } @@ -129,4 +135,9 @@ public class TestTransaction implements Transaction { @Override public void rollback() { // No-op. } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException { + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java index 48275e7..b3f69a3 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteLogger; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeJobResultPolicy; @@ -159,14 +158,9 @@ public class ClientStartNodeTask extends TaskSingleJobSplitAdapter<String, Integ private static void changeTopology(Ignite parent, int add, int rmv, String type) { Collection<ComputeTaskFuture<?>> tasks = new ArrayList<>(); - IgniteCompute comp = parent.compute().withAsync(); - // Start nodes in parallel. - while (add-- > 0) { - comp.execute(ClientStartNodeTask.class, type); - - tasks.add(comp.future()); - } + while (add-- > 0) + tasks.add(parent.compute().executeAsync(ClientStartNodeTask.class, type)); for (ComputeTaskFuture<?> task : tasks) task.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index d7bccf5..33e0e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -89,17 +89,13 @@ import org.jetbrains.annotations.Nullable; * <h1 class="header">Transactions</h1> * Cache API supports transactions. You can group and set of cache methods within a transaction * to provide ACID-compliant behavior. See {@link IgniteTransactions} for more information. - * <h1 class="header">Asynchronous Mode</h1> - * Cache API supports asynchronous mode via {@link IgniteAsyncSupport} functionality. To turn on - * asynchronous mode invoke {@link #withAsync()} method. Once asynchronous mode is enabled, - * all methods with {@link IgniteAsyncSupported @IgniteAsyncSupported} annotation will be executed - * asynchronously. * * @param <K> Cache key type. * @param <V> Cache value type. */ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncSupport { /** {@inheritDoc} */ + @Deprecated @Override public IgniteCache<K, V> withAsync(); /** {@inheritDoc} */ @@ -191,6 +187,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException; /** + * Asynchronously executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes. + * + * @param p Optional predicate (may be {@code null}). If provided, will be used to + * filter values loaded from storage before they are put into cache. + * @param args Optional user arguments to be passed into + * {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method. + * @return a Future representing pending completion of the cache loading. + * @throws CacheException If loading failed. + */ + public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) + throws CacheException; + + /** * Delegates to {@link CacheStore#loadCache(IgniteBiInClosure,Object...)} method * to load state from the underlying persistent storage. The loaded values * will then be given to the optionally passed in predicate, and, if the predicate returns @@ -215,6 +224,31 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException; /** + * Asynchronously loads state from the underlying persistent storage by delegating + * to {@link CacheStore#loadCache(IgniteBiInClosure,Object...)} method. The loaded values + * will then be given to the optionally passed in predicate, and, if the predicate returns + * {@code true}, will be stored in cache. If predicate is {@code null}, then + * all loaded values will be stored in cache. + * <p> + * Note that this method does not receive keys as a parameter, so it is up to + * {@link CacheStore} implementation to provide all the data to be loaded. + * <p> + * This method is not transactional and may end up loading a stale value into + * cache if another thread has updated the value immediately after it has been + * loaded. It is mostly useful when pre-loading the cache from underlying + * data store before start, or for read-only caches. + * + * @param p Optional predicate (may be {@code null}). If provided, will be used to + * filter values to be put into cache. + * @param args Optional user arguments to be passed into + * {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method. + * @return a Future representing pending completion of the cache loading. + * @throws CacheException If loading failed. + */ + public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) + throws CacheException; + + /** * Stores given key-value pair in cache only if cache had no previous mapping for it. If cache * previously contained value for the given key, then this value is returned. * In case of {@link CacheMode#PARTITIONED} or {@link CacheMode#REPLICATED} caches, @@ -247,6 +281,36 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public V getAndPutIfAbsent(K key, V val) throws CacheException; /** + * Asynchronously stores given key-value pair in cache only if cache had no previous mapping for it. If cache + * previously contained value for the given key, then this value is returned. + * In case of {@link CacheMode#PARTITIONED} or {@link CacheMode#REPLICATED} caches, + * the value will be loaded from the primary node, which in its turn may load the value + * from the swap storage, and consecutively, if it's not in swap, + * from the underlying persistent storage. If value has to be loaded from persistent + * storage, {@link CacheStore#load(Object)} method will be used. + * <p> + * If the returned value is not needed, method {@link #putIfAbsentAsync(Object, Object)} should + * always be used instead of this one to avoid the overhead associated with returning of the + * previous value. + * <p> + * If write-through is enabled, the stored value will be persisted to {@link CacheStore} + * via {@link CacheStore#write(javax.cache.Cache.Entry)} method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param key Key to store in cache. + * @param val Value to be associated with the given key. + * @return a Future representing pending completion of the operation. + * @throws NullPointerException If either key or value are {@code null}. + * @throws CacheException If put operation failed. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException; + + /** * Creates a {@link Lock} instance associated with passed key. * This method does not acquire lock immediately, you have to call appropriate method on returned instance. * Returned lock does not support {@link Lock#newCondition()} method, @@ -363,6 +427,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * This method does not participate in any transactions. * * @param key Entry key. + * @param peekModes Peek modes. * @return Peeked value, or {@code null} if not found. * @throws NullPointerException If key is {@code null}. */ @@ -388,11 +453,25 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * * @param peekModes Optional peek modes. If not provided, then total cache size is returned. * @return Cache size across all nodes. + * @throws CacheException On error. */ @IgniteAsyncSupported public int size(CachePeekMode... peekModes) throws CacheException; /** + * Asynchronously gets the number of all entries cached across all nodes. By default, + * if {@code peekModes} value isn't defined, only size of primary copies across all nodes will be returned. + * This behavior is identical to calling this method with {@link CachePeekMode#PRIMARY} peek mode. + * <p> + * NOTE: this operation is distributed and will query all participating nodes for their cache sizes. + * + * @param peekModes Optional peek modes. If not provided, then total cache size is returned. + * @return a Future representing pending completion of the operation. + * @throws CacheException On error. + */ + public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException; + + /** * Gets the number of all entries cached across all nodes as a long value. By default, if {@code peekModes} value * isn't defined, only size of primary copies across all nodes will be returned. This behavior is identical to * calling this method with {@link CachePeekMode#PRIMARY} peek mode. @@ -401,11 +480,25 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * * @param peekModes Optional peek modes. If not provided, then total cache size is returned. * @return Cache size across all nodes. + * @throws CacheException On error. */ @IgniteAsyncSupported public long sizeLong(CachePeekMode... peekModes) throws CacheException; /** + * Asynchronously gets the number of all entries cached across all nodes as a long value. By default, + * if {@code peekModes} value isn't defined, only size of primary copies across all nodes will be returned. + * This behavior is identical to calling this method with {@link CachePeekMode#PRIMARY} peek mode. + * <p> + * NOTE: this operation is distributed and will query all participating nodes for their cache sizes. + * + * @param peekModes Optional peek modes. If not provided, then total cache size is returned. + * @return a Future representing pending completion of the operation. + * @throws CacheException On error. + */ + public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException; + + /** * Gets the number of all entries cached in a partition as a long value. By default, if {@code peekModes} value * isn't defined, only size of primary copies across all nodes will be returned. This behavior is identical to * calling this method with {@link CachePeekMode#PRIMARY} peek mode. @@ -414,12 +507,27 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * * @param partition partition. * @param peekModes Optional peek modes. If not provided, then total partition cache size is returned. - * @return Partion cache size across all nodes. + * @return Partition cache size across all nodes. + * @throws CacheException On error. */ @IgniteAsyncSupported public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException; /** + * Asynchronously gets the number of all entries cached in a partition as a long value. By default, if {@code peekModes} value + * isn't defined, only size of primary copies across all nodes will be returned. This behavior is identical to + * calling this method with {@link CachePeekMode#PRIMARY} peek mode. + * <p> + * NOTE: this operation is distributed and will query all participating nodes for their partition cache sizes. + * + * @param partition partition. + * @param peekModes Optional peek modes. If not provided, then total partition cache size is returned. + * @return a Future representing pending completion of the operation. + * @throws CacheException On error. + */ + public IgniteFuture<Long> sizeLongAsync(int partition, CachePeekMode... peekModes) throws CacheException; + + /** * Gets the number of all entries cached on this node. By default, if {@code peekModes} value isn't defined, * only size of primary copies will be returned. This behavior is identical to calling this method with * {@link CachePeekMode#PRIMARY} peek mode. @@ -466,6 +574,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS Object... args); /** + * Asynchronously version of the {@link #invokeAll(Set, EntryProcessor, Object...)} method. + * + * @param map Map containing keys and entry processors to be applied to values. + * @param args Additional arguments to pass to the {@link EntryProcessor}. + * @return a Future representing pending completion of the operation. See more about future result + * at the {@link #invokeAll(Map, Object...)}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -475,6 +597,18 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public V get(K key); /** + * Asynchronously gets an entry from the cache. + * <p> + * If the cache is configured to use read-through, and a future result would be null + * because the entry is missing from the cache, the Cache's {@link CacheLoader} + * is called in an attempt to load the entry. + * + * @param key Key. + * @return a Future representing pending completion of the operation. + */ + public IgniteFuture<V> getAsync(K key); + + /** * Gets an entry from the cache. * <p> * If the cache is configured to use read-through, and get would return null @@ -497,6 +631,27 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public CacheEntry<K, V> getEntry(K key); /** + * Asynchronously gets an entry from the cache. + * <p> + * If the cache is configured to use read-through, and a future result would be null + * because the entry is missing from the cache, the Cache's {@link CacheLoader} + * is called in an attempt to load the entry. + * + * @param key The key whose associated value is to be returned. + * @return a Future representing pending completion of the operation. + * @throws IllegalStateException If the cache is {@link #isClosed()}. + * @throws NullPointerException If the key is {@code null}. + * @throws CacheException If there is a problem fetching the value. + * @throws ClassCastException If the implementation is configured to perform + * runtime-type-checking, and the key or value types are incompatible with those that have been + * configured for the {@link Cache}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -506,6 +661,24 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public Map<K, V> getAll(Set<? extends K> keys); /** + * Asynchronously gets a collection of entries from the {@link Cache}, returning them as + * {@link Map} of the values associated with the set of keys requested. + * <p> + * If the cache is configured read-through, and a future result for a key would + * be null because an entry is missing from the cache, the Cache's + * {@link CacheLoader} is called in an attempt to load the entry. If an + * entry cannot be loaded for a given key, the key will not be present in + * the returned Map. + * + * @param keys Keys set. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys); + + /** * Gets a collection of entries from the {@link Cache}. * <p> * If the cache is configured read-through, and a get for a key would @@ -531,6 +704,29 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys); /** + * Asynchronously gets a collection of entries from the {@link Cache}. + * <p> + * If the cache is configured read-through, and a future result for a key would + * be null because an entry is missing from the cache, the Cache's + * {@link CacheLoader} is called in an attempt to load the entry. If an + * entry cannot be loaded for a given key, the key will not be present in + * the returned Collection. + * + * @param keys The keys whose associated values are to be returned. + * @return a Future representing pending completion of the operation. + * @throws NullPointerException If keys is null or if keys contains a {@code null}. + * @throws IllegalStateException If the cache is {@link #isClosed()}. + * @throws CacheException If there is a problem fetching the values. + * @throws ClassCastException If the implementation is configured to perform + * runtime-type-checking, and the key or value types are incompatible with those that have been + * configured for the {@link Cache}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys); + + /** * Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries * and will not lock any keys if pessimistic transaction is started by thread. * @@ -541,6 +737,15 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public Map<K, V> getAllOutTx(Set<? extends K> keys); /** + * Asynchronously gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries + * and will not lock any keys if pessimistic transaction is started by thread. + * + * @param keys The keys whose associated values are to be returned. + * @return a Future representing pending completion of the operation. + */ + public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -550,6 +755,21 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public boolean containsKey(K key); /** + * Asynchronously determines if the {@link Cache} contains an entry for the specified key. + * <p> + * More formally, future result is <tt>true</tt> if and only if this cache contains a + * mapping for a key <tt>k</tt> such that <tt>key.equals(k)</tt>. + * (There can be at most one such mapping.) + * + * @param key Key. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Boolean> containsKeyAsync(K key); + + /** * Determines if the {@link Cache} contains entries for the specified keys. * * @param keys Key whose presence in this cache is to be tested. @@ -562,6 +782,17 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public boolean containsKeys(Set<? extends K> keys); /** + * Asynchronously determines if the {@link Cache} contains entries for the specified keys. + * + * @param keys Key whose presence in this cache is to be tested. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -571,6 +802,23 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public void put(K key, V val); /** + * Asynchronously associates the specified value with the specified key in the cache. + * <p> + * If the {@link Cache} previously contained a mapping for the key, the old + * value is replaced by the specified value. (A cache <tt>c</tt> is said to + * contain a mapping for a key <tt>k</tt> if and only if {@link + * #containsKey(Object) c.containsKey(k)} would return <tt>true</tt>.) + * + * @param key Key. + * @param val Value. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Void> putAsync(K key, V val); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -580,6 +828,28 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public V getAndPut(K key, V val); /** + * Asynchronously associates the specified value with the specified key in this cache, + * returning an existing value if one existed as the future result. + * <p> + * If the cache previously contained a mapping for + * the key, the old value is replaced by the specified value. (A cache + * <tt>c</tt> is said to contain a mapping for a key <tt>k</tt> if and only + * if {@link #containsKey(Object) c.containsKey(k)} would return + * <tt>true</tt>.) + * <p> + * The previous value is returned as the future result, or future result is null if there was no value associated + * with the key previously. + * + * @param key Key. + * @param val Value. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<V> getAndPutAsync(K key, V val); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -589,6 +859,31 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public void putAll(Map<? extends K, ? extends V> map); /** + * Asynchronously copies all of the entries from the specified map to the {@link Cache}. + * <p> + * The effect of this call is equivalent to that of calling + * {@link #putAsync(Object, Object)} putAsync(k, v)} on this cache once for each mapping + * from key <tt>k</tt> to value <tt>v</tt> in the specified map. + * <p> + * The order in which the individual puts occur is undefined. + * <p> + * The behavior of this operation is undefined if entries in the cache + * corresponding to entries in the map are modified or removed while this + * operation is in progress. or if map is modified while the operation is in + * progress. + * <p> + * In Default Consistency mode, individual puts occur atomically but not + * the entire putAll. Listeners may observe individual updates. + * + * @param map Map containing keys and values to put into the cache. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -598,6 +893,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public boolean putIfAbsent(K key, V val); /** + * Asynchronously associates the specified key with the given value if it is + * not already associated with a value. + * + * @param key Key. + * @param val Value. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -607,6 +915,28 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public boolean remove(K key); /** + * Asynchronously removes the mapping for a key from this cache if it is present. + * <p> + * More formally, if this cache contains a mapping from key <tt>k</tt> to + * value <tt>v</tt> such that + * <code>(key==null ? k==null : key.equals(k))</code>, that mapping is removed. + * (The cache can contain at most one such mapping.) + * + * <p>A future result is <tt>true</tt> if this cache previously associated the key, + * or <tt>false</tt> if the cache contained no mapping for the key. + * <p> + * The cache will not contain a mapping for the specified key once the + * returned future is completed. + * + * @param key Key. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Boolean> removeAsync(K key); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -616,6 +946,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public boolean remove(K key, V oldVal); /** + * Asynchronously removes the mapping for a key only if currently mapped to the + * given value. + * + * @param key Key. + * @param oldVal Old value. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Boolean> removeAsync(K key, V oldVal); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -625,6 +968,18 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public V getAndRemove(K key); /** + * Asynchronously removes the entry for a key only if currently mapped to some + * value. + * + * @param key Key. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<V> getAndRemoveAsync(K key); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -634,6 +989,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public boolean replace(K key, V oldVal, V newVal); /** + * Asynchronous version of the {@link #replace(Object, Object, Object)}. + * + * @param key Key. + * @param oldVal Old value. + * @param newVal New value. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -643,6 +1011,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public boolean replace(K key, V val); /** + * Asynchronously replaces the entry for a key only if currently mapped to a + * given value. + * + * @param key Key. + * @param val Value. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Boolean> replaceAsync(K key, V val); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -652,6 +1033,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public V getAndReplace(K key, V val); /** + * Asynchronously replaces the value for a given key if and only if there is a + * value currently mapped by the key. + * + * @param key Key. + * @param val Value. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<V> getAndReplaceAsync(K key, V val); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -661,6 +1055,26 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public void removeAll(Set<? extends K> keys); /** + * Asynchronously removes entries for the specified keys. + * <p> + * The order in which the individual entries are removed is undefined. + * <p> + * For every entry in the key set, the following are called: + * <ul> + * <li>any registered {@link CacheEntryRemovedListener}s</li> + * <li>if the cache is a write-through cache, the {@link CacheWriter}</li> + * </ul> + * If the key set is empty, the {@link CacheWriter} is not called. + * + * @param keys Keys set. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys); + + /** * Removes all of the mappings from this cache. * <p> * The order that the individual entries are removed is undefined. @@ -686,12 +1100,43 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @IgniteAsyncSupported @Override public void removeAll(); + /** + * Asynchronously removes all of the mappings from this cache. + * <p> + * The order that the individual entries are removed is undefined. + * <p> + * For every mapping that exists the following are called: + * <ul> + * <li>any registered {@link CacheEntryRemovedListener}s</li> + * <li>if the cache is a write-through cache, the {@link CacheWriter}</li> + * </ul> + * If the cache is empty, the {@link CacheWriter} is not called. + * <p> + * This is potentially an expensive operation as listeners are invoked. + * Use {@link #clearAsync()} to avoid this. + * + * @return a Future representing pending completion of the operation. + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws CacheException if there is a problem during the remove + * @see #clearAsync() + * @see CacheWriter#deleteAll + */ + public IgniteFuture<Void> removeAllAsync(); + /** {@inheritDoc} */ @IgniteAsyncSupported @Override public void clear(); /** - * Clear entry from the cache and swap storage, without notifying listeners or + * Asynchronously clears the contents of the cache, without notifying listeners or + * {@link CacheWriter}s. + * + * @return a Future representing pending completion of the operation. + */ + public IgniteFuture<Void> clearAsync(); + + /** + * Clears entry from the cache and swap storage, without notifying listeners or * {@link CacheWriter}s. Entry is cleared only if it is not currently locked, * and is not participating in a transaction. * @@ -703,7 +1148,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public void clear(K key); /** - * Clear entries from the cache and swap storage, without notifying listeners or + * Asynchronously clears entry from the cache and swap storage, without notifying listeners or + * {@link CacheWriter}s. Entry is cleared only if it is not currently locked, + * and is not participating in a transaction. + * + * @param key Key to clear. + * @return a Future representing pending completion of the operation. + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws CacheException if there is a problem during the clear + */ + public IgniteFuture<Void> clearAsync(K key); + + /** + * Clears entries from the cache and swap storage, without notifying listeners or * {@link CacheWriter}s. Entry is cleared only if it is not currently locked, * and is not participating in a transaction. * @@ -715,7 +1172,19 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public void clearAll(Set<? extends K> keys); /** - * Clear entry from the cache and swap storage, without notifying listeners or + * Asynchronously clears entries from the cache and swap storage, without notifying listeners or + * {@link CacheWriter}s. Entry is cleared only if it is not currently locked, + * and is not participating in a transaction. + * + * @param keys Keys to clear. + * @return a Future representing pending completion of the operation. + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws CacheException if there is a problem during the clear + */ + public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys); + + /** + * Clears entry from the cache and swap storage, without notifying listeners or * {@link CacheWriter}s. Entry is cleared only if it is not currently locked, * and is not participating in a transaction. * <p/> @@ -728,7 +1197,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public void localClear(K key); /** - * Clear entries from the cache and swap storage, without notifying listeners or + * Clears entries from the cache and swap storage, without notifying listeners or * {@link CacheWriter}s. Entry is cleared only if it is not currently locked, * and is not participating in a transaction. * <p/> @@ -750,6 +1219,22 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments); /** + * Asynchronously invokes an {@link EntryProcessor} against the {@link Entry} specified by + * the provided key. If an {@link Entry} does not exist for the specified key, + * an attempt is made to load it (if a loader is configured) or a surrogate + * {@link Entry}, consisting of the key with a null value is used instead. + * + * @param key The key to the entry. + * @param entryProcessor The {@link EntryProcessor} to invoke. + * @param arguments Additional arguments to pass to the {@link EntryProcessor}. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments); + + /** * Invokes an {@link CacheEntryProcessor} against the {@link javax.cache.Cache.Entry} specified by * the provided key. If an {@link javax.cache.Cache.Entry} does not exist for the specified key, * an attempt is made to load it (if a loader is configured) or a surrogate @@ -782,6 +1267,37 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments); /** + * Asynchronously invokes an {@link CacheEntryProcessor} against the {@link javax.cache.Cache.Entry} specified by + * the provided key. If an {@link javax.cache.Cache.Entry} does not exist for the specified key, + * an attempt is made to load it (if a loader is configured) or a surrogate + * {@link javax.cache.Cache.Entry}, consisting of the key with a null value is used instead. + * <p> + * An instance of entry processor must be stateless as it may be invoked multiple times on primary and + * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always + * the same. + * + * @param key The key to the entry. + * @param entryProcessor The {@link CacheEntryProcessor} to invoke. + * @param arguments Additional arguments to pass to the {@link CacheEntryProcessor}. + * @return a Future representing pending completion of the operation. + * @throws NullPointerException If key or {@link CacheEntryProcessor} is null + * @throws IllegalStateException If the cache is {@link #isClosed()} + * @throws ClassCastException If the implementation is configured to perform + * runtime-type-checking, and the key or value + * types are incompatible with those that have been + * configured for the {@link Cache}. + * @throws EntryProcessorException If an exception is thrown by the {@link + * CacheEntryProcessor}, a Caching Implementation + * must wrap any {@link Exception} thrown + * wrapped in an {@link EntryProcessorException}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + * @see CacheEntryProcessor + */ + public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments); + + /** * {@inheritDoc} * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. @@ -792,6 +1308,39 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS EntryProcessor<K, V, T> entryProcessor, Object... args); /** + * Asynchronously invokes an {@link EntryProcessor} against the set of {@link Entry}s + * specified by the set of keys. + * <p> + * If an {@link Entry} does not exist for the specified key, an attempt is made + * to load it (if a loader is configured) or a surrogate {@link Entry}, + * consisting of the key and a value of null is provided. + * <p> + * The order that the entries for the keys are processed is undefined. + * Implementations may choose to process the entries in any order, including + * concurrently. Furthermore there is no guarantee implementations will + * use the same {@link EntryProcessor} instance to process each entry, as + * the case may be in a non-local cache topology. + * <p> + * The result of executing the {@link EntryProcessor} is returned in the future as a + * {@link Map} of {@link EntryProcessorResult}s, one result per key. Should the + * {@link EntryProcessor} or Caching implementation throw an exception, the + * exception is wrapped and re-thrown when a call to + * {@link javax.cache.processor.EntryProcessorResult#get()} is made. + + * + * @param keys The set of keys. + * @param entryProcessor The {@link EntryProcessor} to invoke. + * @param args Additional arguments to pass to the {@link EntryProcessor}. + * @return a Future representing pending completion of the operation. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ + public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, Object... args); + + + /** * Invokes an {@link CacheEntryProcessor} against the set of {@link javax.cache.Cache.Entry}s * specified by the set of keys. * <p> @@ -838,6 +1387,48 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS CacheEntryProcessor<K, V, T> entryProcessor, Object... args); /** + * Asynchronously invokes an {@link CacheEntryProcessor} against the set of {@link javax.cache.Cache.Entry}s + * specified by the set of keys. + * <p> + * If an {@link javax.cache.Cache.Entry} does not exist for the specified key, an attempt is made + * to load it (if a loader is configured) or a surrogate {@link javax.cache.Cache.Entry}, + * consisting of the key and a value of null is provided. + * <p> + * The order that the entries for the keys are processed is undefined. + * Implementations may choose to process the entries in any order, including + * concurrently. Furthermore there is no guarantee implementations will + * use the same {@link CacheEntryProcessor} instance to process each entry, as + * the case may be in a non-local cache topology. + * <p> + * The result of executing the {@link CacheEntryProcessor} is returned in the future as a + * {@link Map} of {@link EntryProcessorResult}s, one result per key. Should the + * {@link CacheEntryProcessor} or Caching implementation throw an exception, the + * exception is wrapped and re-thrown when a call to + * {@link javax.cache.processor.EntryProcessorResult#get()} is made. + * <p> + * An instance of entry processor must be stateless as it may be invoked multiple times on primary and + * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always + * the same. + * + * @param keys The set of keys for entries to process. + * @param entryProcessor The {@link CacheEntryProcessor} to invoke. + * @param args Additional arguments to pass to the {@link CacheEntryProcessor}. + * @return a Future representing pending completion of the operation. + * @throws NullPointerException If keys or {@link CacheEntryProcessor} are {#code null}. + * @throws IllegalStateException If the cache is {@link #isClosed()}. + * @throws ClassCastException If the implementation is configured to perform + * runtime-type-checking, and the key or value + * types are incompatible with those that have been + * configured for the {@link Cache}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + * @see CacheEntryProcessor + */ + public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + CacheEntryProcessor<K, V, T> entryProcessor, Object... args); + + /** * Closes this cache instance. * <p> * For local cache equivalent to {@link #destroy()}. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java index 23b03df..dc7b687 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java @@ -188,6 +188,33 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { int maxConn) throws IgniteException; /** + * Starts one or more nodes on remote host(s) asynchronously. + * <p> + * This method takes INI file which defines all startup parameters. It can contain one or + * more sections, each for a host or for range of hosts (note that they must have different + * names) and a special '{@code defaults}' section with default values. They are applied to + * undefined parameters in host's sections. + * <p> + * Completed future contains collection of tuples. Each tuple corresponds to one node start attempt and + * contains hostname, success flag and error message if attempt was not successful. Note that + * successful attempt doesn't mean that node was actually started and joined topology. For large + * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual + * node logs for details. + * + * @param file Configuration file. + * @param restart Whether to stop existing nodes. If {@code true}, all existing + * nodes on the host will be stopped before starting new ones. If + * {@code false}, nodes will be started only if there are less + * nodes on the host than expected. + * @param timeout Connection timeout. + * @param maxConn Number of parallel SSH connections to one host. + * @return a Future representing pending completion of the starting nodes. + * @throws IgniteException In case of error. + */ + public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, int timeout, + int maxConn) throws IgniteException; + + /** * Starts one or more nodes on remote host(s). * <p> * Each map in {@code hosts} collection @@ -290,6 +317,104 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { @Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteException; /** + * Starts one or more nodes on remote host(s) asynchronously. + * <p> + * Each map in {@code hosts} collection + * defines startup parameters for one host or for a range of hosts. The following + * parameters are supported: + * <table class="doctable"> + * <tr> + * <th>Name</th> + * <th>Type</th> + * <th>Description</th> + * </tr> + * <tr> + * <td><b>host</b></td> + * <td>String</td> + * <td> + * Hostname (required). Can define several hosts if their IPs are sequential. + * E.g., {@code 10.0.0.1~5} defines range of five IP addresses. Other + * parameters are applied to all hosts equally. + * </td> + * </tr> + * <tr> + * <td><b>port</b></td> + * <td>Integer</td> + * <td>Port number (default is {@code 22}).</td> + * </tr> + * <tr> + * <td><b>uname</b></td> + * <td>String</td> + * <td>Username (if not defined, current local username will be used).</td> + * </tr> + * <tr> + * <td><b>passwd</b></td> + * <td>String</td> + * <td>Password (if not defined, private key file must be defined).</td> + * </tr> + * <tr> + * <td><b>key</b></td> + * <td>File</td> + * <td>Private key file (if not defined, password must be defined).</td> + * </tr> + * <tr> + * <td><b>nodes</b></td> + * <td>Integer</td> + * <td> + * Expected number of nodes on the host. If some nodes are started + * already, then only remaining nodes will be started. If current count of + * nodes is equal to this number, and {@code restart} flag is {@code false}, + * then nothing will happen. + * </td> + * </tr> + * <tr> + * <td><b>igniteHome</b></td> + * <td>String</td> + * <td> + * Path to Ignite installation folder. If not defined, IGNITE_HOME + * environment variable must be set on remote hosts. + * </td> + * </tr> + * <tr> + * <td><b>cfg</b></td> + * <td>String</td> + * <td>Path to configuration file (relative to {@code igniteHome}).</td> + * </tr> + * <tr> + * <td><b>script</b></td> + * <td>String</td> + * <td> + * Custom startup script file name and path (relative to {@code igniteHome}). + * You can also specify a space-separated list of parameters in the same + * string (for example: {@code "bin/my-custom-script.sh -v"}). + * </td> + * </tr> + * </table> + * <p> + * {@code dflts} map defines default values. They are applied to undefined parameters in + * {@code hosts} collection. + * <p> + * Completed future contains collection of tuples. Each tuple corresponds to one node start attempt and + * contains hostname, success flag and error message if attempt was not successful. Note that + * successful attempt doesn't mean that node was actually started and joined topology. For large + * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual + * node logs for details. + * + * @param hosts Startup parameters. + * @param dflts Default values. + * @param restart Whether to stop existing nodes. If {@code true}, all existing + * nodes on the host will be stopped before starting new ones. If + * {@code false}, nodes will be started only if there are less + * nodes on the host than expected. + * @param timeout Connection timeout in milliseconds. + * @param maxConn Number of parallel SSH connections to one host. + * @return a Future representing pending completion of the starting nodes. + * @throws IgniteException In case of error. + */ + public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteException; + + /** * Stops nodes satisfying optional set of predicates. * <p> * <b>NOTE:</b> {@code System.exit(Ignition.KILL_EXIT_CODE)} will be executed on each @@ -347,5 +472,6 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { @Nullable public IgniteFuture<?> clientReconnectFuture(); /** {@inheritDoc} */ + @Deprecated @Override public IgniteCluster withAsync(); }
