IGNITE-4074 Refactor async (*future) operations in PlatformTarget
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02dd07a5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02dd07a5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02dd07a5 Branch: refs/heads/master Commit: 02dd07a58277b357991c1f74a7dbdfdd9de2a2cc Parents: 255b3a3 Author: Pavel Tupitsyn <[email protected]> Authored: Tue Oct 25 12:34:35 2016 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Tue Oct 25 12:34:35 2016 +0300 ---------------------------------------------------------------------- .../platform/PlatformAbstractTarget.java | 87 ++++- .../processors/platform/PlatformTarget.java | 21 -- .../platform/cache/PlatformCache.java | 274 +++++++++++++- .../platform/compute/PlatformCompute.java | 29 +- .../platform/events/PlatformEvents.java | 103 ++++-- .../platform/messaging/PlatformMessaging.java | 53 ++- .../platform/services/PlatformServices.java | 98 +++-- .../transactions/PlatformTransactions.java | 38 +- .../cpp/jni/include/ignite/jni/exports.h | 2 - .../platforms/cpp/jni/include/ignite/jni/java.h | 4 - modules/platforms/cpp/jni/project/vs/module.def | 2 - modules/platforms/cpp/jni/src/exports.cpp | 8 - modules/platforms/cpp/jni/src/java.cpp | 26 -- .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 353 ++++++++++--------- .../Apache.Ignite.Core/Impl/Cache/CacheOp.cs | 27 +- .../Impl/Compute/ComputeImpl.cs | 27 +- .../Apache.Ignite.Core/Impl/Events/Events.cs | 168 ++++----- .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 2 +- .../Impl/Messaging/Messaging.cs | 113 ++---- .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 139 +++++++- .../Impl/Services/Services.cs | 125 ++++--- .../Impl/Transactions/TransactionsImpl.cs | 13 +- .../Impl/Unmanaged/IgniteJniNativeMethods.cs | 12 - .../Impl/Unmanaged/UnmanagedUtils.cs | 25 -- .../dotnet/Apache.Ignite.sln.DotSettings | 3 +- .../Apache.Ignite.sln.TeamCity.DotSettings | 1 + 26 files changed, 1060 insertions(+), 693 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 6197bc8..29b603a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; +import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** @@ -244,23 +246,12 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** {@inheritDoc} */ @Override public void listenFuture(final long futId, int typ) throws Exception { - listenFutureAndGet(futId, typ); + PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this); } /** {@inheritDoc} */ @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { - listenFutureForOperationAndGet(futId, typ, opId); - } - - /** {@inheritDoc} */ - @Override public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception { - return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this); - } - - /** {@inheritDoc} */ - @Override public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId) - throws Exception { - return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this); + PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this); } /** @@ -413,4 +404,74 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { protected <T> T throwUnsupported(int type) throws IgniteCheckedException { throw new IgniteCheckedException("Unsupported operation type: " + type); } + + /** + * Reads future information and listens. + * + * @param reader Reader. + * @param fut Future. + * @param writer Writer. + * @throws IgniteCheckedException In case of error. + */ + protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteInternalFuture fut, + PlatformFutureUtils.Writer writer) + throws IgniteCheckedException { + long futId = reader.readLong(); + int futTyp = reader.readInt(); + + return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, writer, this); + } + + /** + * Reads future information and listens. + * + * @param reader Reader. + * @param fut Future. + * @param writer Writer. + * @throws IgniteCheckedException In case of error. + */ + protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteFuture fut, + PlatformFutureUtils.Writer writer) + throws IgniteCheckedException { + long futId = reader.readLong(); + int futTyp = reader.readInt(); + + return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, writer, this); + } + + /** + * Reads future information and listens. + * + * @param reader Reader. + * @param fut Future. + * @throws IgniteCheckedException In case of error. + */ + protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteInternalFuture fut) + throws IgniteCheckedException { + return readAndListenFuture(reader, fut, null); + } + + /** + * Reads future information and listens. + * + * @param reader Reader. + * @param fut Future. + * @throws IgniteCheckedException In case of error. + */ + protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteFuture fut) + throws IgniteCheckedException { + return readAndListenFuture(reader, fut, null); + } + + /** + * Reads future information and listens. + * + * @param reader Reader. + * @throws IgniteCheckedException In case of error. + */ + protected long readAndListenFuture(BinaryRawReader reader) throws IgniteCheckedException { + readAndListenFuture(reader, currentFuture(), null); + + return TRUE; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java index 40773d0..3ab5d7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java @@ -137,25 +137,4 @@ public interface PlatformTarget { */ @SuppressWarnings("UnusedDeclaration") public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception; - - /** - * Start listening for the future. - * - * @param futId Future ID. - * @param typ Result type. - * @throws IgniteCheckedException In case of failure. - */ - @SuppressWarnings("UnusedDeclaration") - public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception; - - /** - * Start listening for the future for specific operation type. - * - * @param futId Future ID. - * @param typ Result type. - * @param opId Operation ID required to pick correct result writer. - * @throws IgniteCheckedException In case of failure. - */ - @SuppressWarnings("UnusedDeclaration") - public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index d3fa2c8..558a9b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -245,7 +245,82 @@ public class PlatformCache extends PlatformAbstractTarget { public static final int OP_SIZE_LOC = 56; /** */ - public static final int OP_EXTENSION = 57; + public static final int OP_PUT_ASYNC = 57; + + /** */ + public static final int OP_CLEAR_CACHE_ASYNC = 58; + + /** */ + public static final int OP_CLEAR_ALL_ASYNC = 59; + + /** */ + public static final int OP_REMOVE_ALL2_ASYNC = 60; + + /** */ + public static final int OP_SIZE_ASYNC = 61; + + /** */ + public static final int OP_CLEAR_ASYNC = 62; + + /** */ + public static final int OP_LOAD_CACHE_ASYNC = 63; + + /** */ + public static final int OP_LOC_LOAD_CACHE_ASYNC = 64; + + /** */ + public static final int OP_PUT_ALL_ASYNC = 65; + + /** */ + public static final int OP_REMOVE_ALL_ASYNC = 66; + + /** */ + public static final int OP_GET_ASYNC = 67; + + /** */ + public static final int OP_CONTAINS_KEY_ASYNC = 68; + + /** */ + public static final int OP_CONTAINS_KEYS_ASYNC = 69; + + /** */ + public static final int OP_REMOVE_BOOL_ASYNC = 70; + + /** */ + public static final int OP_REMOVE_OBJ_ASYNC = 71; + + /** */ + public static final int OP_GET_ALL_ASYNC = 72; + + /** */ + public static final int OP_GET_AND_PUT_ASYNC = 73; + + /** */ + public static final int OP_GET_AND_PUT_IF_ABSENT_ASYNC = 74; + + /** */ + public static final int OP_GET_AND_REMOVE_ASYNC = 75; + + /** */ + public static final int OP_GET_AND_REPLACE_ASYNC = 76; + + /** */ + public static final int OP_REPLACE_2_ASYNC = 77; + + /** */ + public static final int OP_REPLACE_3_ASYNC = 78; + + /** */ + public static final int OP_INVOKE_ASYNC = 79; + + /** */ + public static final int OP_INVOKE_ALL_ASYNC = 80; + + /** */ + public static final int OP_PUT_IF_ABSENT_ASYNC = 81; + + /** */ + public static final int OP_EXTENSION = 82; /** Underlying JCache in binary mode. */ private final IgniteCacheProxy cache; @@ -253,6 +328,9 @@ public class PlatformCache extends PlatformAbstractTarget { /** Initial JCache (not in binary mode). */ private final IgniteCache rawCache; + /** Underlying JCache in async mode. */ + private final IgniteCache cacheAsync; + /** Whether this cache is created with "keepBinary" flag on the other side. */ private final boolean keepBinary; @@ -302,8 +380,9 @@ public class PlatformCache extends PlatformAbstractTarget { assert exts != null; rawCache = cache; - - this.cache = (IgniteCacheProxy)cache.withKeepBinary(); + IgniteCache binCache = cache.withKeepBinary(); + cacheAsync = binCache.withAsync(); + this.cache = (IgniteCacheProxy)binCache; this.keepBinary = keepBinary; this.exts = exts; } @@ -380,12 +459,12 @@ public class PlatformCache extends PlatformAbstractTarget { reader.readObjectDetached()) ? TRUE : FALSE; case OP_LOC_LOAD_CACHE: - loadCache0(reader, true); + loadCache0(reader, true, cache); return TRUE; case OP_LOAD_CACHE: - loadCache0(reader, false); + loadCache0(reader, false, cache); return TRUE; @@ -422,14 +501,17 @@ public class PlatformCache extends PlatformAbstractTarget { return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; case OP_LOAD_ALL: { - long futId = reader.readLong(); boolean replaceExisting = reader.readBoolean(); + Set<Object> keys = PlatformUtils.readSet(reader); + + long futId = reader.readLong(); + int futTyp = reader.readInt(); CompletionListenable fut = new CompletionListenable(); - PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this); + PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, null, this); - cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut); + cache.loadAll(keys, replaceExisting, fut); return TRUE; } @@ -482,6 +564,167 @@ public class PlatformCache extends PlatformAbstractTarget { }); } + + case OP_PUT_ASYNC: { + cacheAsync.put(reader.readObjectDetached(), reader.readObjectDetached()); + + return readAndListenFuture(reader); + } + + case OP_CLEAR_CACHE_ASYNC: { + cacheAsync.clear(); + + return readAndListenFuture(reader); + } + + case OP_CLEAR_ALL_ASYNC: { + cacheAsync.clearAll(PlatformUtils.readSet(reader)); + + return readAndListenFuture(reader); + } + + case OP_REMOVE_ALL2_ASYNC: { + cacheAsync.removeAll(); + + return readAndListenFuture(reader); + } + + case OP_SIZE_ASYNC: { + CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); + + cacheAsync.size(modes); + + return readAndListenFuture(reader); + } + + case OP_CLEAR_ASYNC: { + cacheAsync.clear(reader.readObjectDetached()); + + return readAndListenFuture(reader); + } + + case OP_LOAD_CACHE_ASYNC: { + loadCache0(reader, false, cacheAsync); + + return readAndListenFuture(reader); + } + + case OP_LOC_LOAD_CACHE_ASYNC: { + loadCache0(reader, true, cacheAsync); + + return readAndListenFuture(reader); + } + + case OP_PUT_ALL_ASYNC: + cacheAsync.putAll(PlatformUtils.readMap(reader)); + + return readAndListenFuture(reader); + + case OP_REMOVE_ALL_ASYNC: + cacheAsync.removeAll(PlatformUtils.readSet(reader)); + + return readAndListenFuture(reader); + + case OP_REBALANCE: + readAndListenFuture(reader, cache.rebalance()); + + return TRUE; + + case OP_GET_ASYNC: + cacheAsync.get(reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_CONTAINS_KEY_ASYNC: + cacheAsync.containsKey(reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_CONTAINS_KEYS_ASYNC: + cacheAsync.containsKeys(PlatformUtils.readSet(reader)); + + return readAndListenFuture(reader); + + case OP_REMOVE_OBJ_ASYNC: + cacheAsync.remove(reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_REMOVE_BOOL_ASYNC: + cacheAsync.remove(reader.readObjectDetached(), reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_GET_ALL_ASYNC: { + Set keys = PlatformUtils.readSet(reader); + + cacheAsync.getAll(keys); + + readAndListenFuture(reader, cacheAsync.future(), WRITER_GET_ALL); + + return TRUE; + } + + case OP_GET_AND_PUT_ASYNC: + cacheAsync.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_GET_AND_PUT_IF_ABSENT_ASYNC: + cacheAsync.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_GET_AND_REMOVE_ASYNC: + cacheAsync.getAndRemove(reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_GET_AND_REPLACE_ASYNC: + cacheAsync.getAndReplace(reader.readObjectDetached(), reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_REPLACE_2_ASYNC: + cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_REPLACE_3_ASYNC: + cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached(), + reader.readObjectDetached()); + + return readAndListenFuture(reader); + + case OP_INVOKE_ASYNC: { + Object key = reader.readObjectDetached(); + + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); + + cacheAsync.invoke(key, proc); + + readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE); + + return TRUE; + } + + case OP_INVOKE_ALL_ASYNC: { + Set<Object> keys = PlatformUtils.readSet(reader); + + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); + + cacheAsync.invokeAll(keys, proc); + + readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE_ALL); + + return TRUE; + } + + case OP_PUT_IF_ABSENT_ASYNC: + cacheAsync.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()); + + return readAndListenFuture(reader); + case OP_INVOKE: { Object key = reader.readObjectDetached(); @@ -573,12 +816,10 @@ public class PlatformCache extends PlatformAbstractTarget { return TRUE; } - - /** * Loads cache via localLoadCache or loadCache. */ - private void loadCache0(BinaryRawReaderEx reader, boolean loc) { + private void loadCache0(BinaryRawReaderEx reader, boolean loc, IgniteCache cache) { PlatformCacheEntryFilter filter = null; Object pred = reader.readObjectDetached(); @@ -836,6 +1077,15 @@ public class PlatformCache extends PlatformAbstractTarget { return TRUE; } + case OP_CLEAR_CACHE: + cache.clear(); + + return TRUE; + + case OP_REMOVE_ALL2: + cache.removeAll(); + + return TRUE; case OP_REBALANCE: { PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() { @Override public Object apply(IgniteFuture fut) { @@ -916,7 +1166,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** <inheritDoc /> */ @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)cache.future()).internalFuture(); + return ((IgniteFutureImpl) cacheAsync.future()).internalFuture(); } /** <inheritDoc /> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 36d709a..0c10a53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -78,8 +78,6 @@ public class PlatformCompute extends PlatformAbstractTarget { /** Compute instance for platform-only nodes. */ private final IgniteComputeImpl computeForPlatform; - /** Future for previous asynchronous operation. */ - protected ThreadLocal<IgniteInternalFuture> curFut = new ThreadLocal<>(); /** * Constructor. * @@ -121,6 +119,9 @@ public class PlatformCompute extends PlatformAbstractTarget { return executeNative0(task); } + case OP_EXEC_ASYNC: + return executeJavaTask(reader, true); + default: return super.processInStreamOutObject(type, reader); } @@ -235,26 +236,11 @@ public class PlatformCompute extends PlatformAbstractTarget { break; - case OP_EXEC_ASYNC: - writer.writeObjectDetached(executeJavaTask(reader, true)); - - break; - default: super.processInStreamOutStream(type, reader, writer); } } - /** <inheritDoc /> */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { - IgniteInternalFuture fut = curFut.get(); - - if (fut == null) - throw new IllegalStateException("Asynchronous operation not started."); - - return fut; - } - /** * Execute task. * @@ -287,7 +273,7 @@ public class PlatformCompute extends PlatformAbstractTarget { * @param reader Reader. * @return Task result. */ - protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) { + protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) throws IgniteCheckedException { String taskName = reader.readString(); boolean keepBinary = reader.readBoolean(); Object arg = reader.readObjectDetached(); @@ -304,11 +290,8 @@ public class PlatformCompute extends PlatformAbstractTarget { Object res = compute0.execute(taskName, arg); - if (async) { - curFut.set(new ComputeConvertingFuture(compute0.future())); - - return null; - } + if (async) + return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.future())); else return toBinary(res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index f133524..383e7ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -81,9 +81,18 @@ public class PlatformEvents extends PlatformAbstractTarget { private static final int OP_STOP_LOCAL_LISTEN = 14; /** */ + private static final int OP_REMOTE_QUERY_ASYNC = 15; + + /** */ + private static final int OP_WAIT_FOR_LOCAL_ASYNC = 16; + + /** */ private final IgniteEvents events; /** */ + private final IgniteEvents eventsAsync; + + /** */ private final EventResultWriter eventResWriter; /** */ @@ -101,6 +110,7 @@ public class PlatformEvents extends PlatformAbstractTarget { assert events != null; this.events = events; + eventsAsync = events.withAsync(); eventResWriter = new EventResultWriter(platformCtx); eventColResWriter = new EventCollectionResultWriter(platformCtx); @@ -136,6 +146,21 @@ public class PlatformEvents extends PlatformAbstractTarget { return TRUE; + case OP_REMOTE_QUERY_ASYNC: + startRemoteQuery(reader, eventsAsync); + + readAndListenFuture(reader, currentFuture(), eventColResWriter); + + return TRUE; + + case OP_WAIT_FOR_LOCAL_ASYNC: { + startWaitForLocal(reader, eventsAsync); + + readAndListenFuture(reader, currentFuture(), eventResWriter); + + return TRUE; + } + default: return super.processInStreamOutLong(type, reader); } @@ -159,13 +184,7 @@ public class PlatformEvents extends PlatformAbstractTarget { } case OP_WAIT_FOR_LOCAL: { - boolean hasFilter = reader.readBoolean(); - - IgnitePredicate pred = hasFilter ? localFilter(reader.readLong()) : null; - - int[] eventTypes = readEventTypes(reader); - - EventAdapter result = (EventAdapter) events.waitForLocal(pred, eventTypes); + EventAdapter result = startWaitForLocal(reader, events); platformCtx.writeEvent(writer, result); @@ -203,24 +222,9 @@ public class PlatformEvents extends PlatformAbstractTarget { } case OP_REMOTE_QUERY: { - Object pred = reader.readObjectDetached(); + Collection<Event> result = startRemoteQuery(reader, events); - long timeout = reader.readLong(); - - int[] types = readEventTypes(reader); - - PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types); - - Collection<Event> result = events.remoteQuery(filter, timeout); - - if (result == null) - writer.writeInt(-1); - else { - writer.writeInt(result.size()); - - for (Event e : result) - platformCtx.writeEvent(writer, e); - } + eventColResWriter.write(writer, result, null); break; } @@ -230,6 +234,42 @@ public class PlatformEvents extends PlatformAbstractTarget { } } + /** + * Starts the waitForLocal. + * + * @param reader Reader + * @param events Events. + * @return Result. + */ + private EventAdapter startWaitForLocal(BinaryRawReaderEx reader, IgniteEvents events) { + Long filterHnd = reader.readObject(); + + IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null; + + int[] eventTypes = readEventTypes(reader); + + return (EventAdapter) events.waitForLocal(filter, eventTypes); + } + + /** + * Starts the remote query. + * + * @param reader Reader. + * @param events Events. + * @return Result. + */ + private Collection<Event> startRemoteQuery(BinaryRawReaderEx reader, IgniteEvents events) { + Object pred = reader.readObjectDetached(); + + long timeout = reader.readLong(); + + int[] types = readEventTypes(reader); + + PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types); + + return events.remoteQuery(filter, timeout); + } + /** {@inheritDoc} */ @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { @@ -271,7 +311,7 @@ public class PlatformEvents extends PlatformAbstractTarget { /** {@inheritDoc} */ @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)events.future()).internalFuture(); + return ((IgniteFutureImpl)eventsAsync.future()).internalFuture(); } /** {@inheritDoc} */ @@ -381,12 +421,17 @@ public class PlatformEvents extends PlatformAbstractTarget { /** <inheritDoc /> */ @SuppressWarnings("unchecked") @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { - Collection<EventAdapter> events = (Collection<EventAdapter>)obj; + Collection<Event> events = (Collection<Event>)obj; - writer.writeInt(events.size()); + if (obj != null) { + writer.writeInt(events.size()); - for (EventAdapter e : events) - platformCtx.writeEvent(writer, e); + for (Event e : events) + platformCtx.writeEvent(writer, e); + } + else { + writer.writeInt(-1); + } } /** <inheritDoc /> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java index 1b05eca..216427a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -59,8 +59,17 @@ public class PlatformMessaging extends PlatformAbstractTarget { public static final int OP_WITH_ASYNC = 8; /** */ + public static final int OP_REMOTE_LISTEN_ASYNC = 9; + + /** */ + public static final int OP_STOP_REMOTE_LISTEN_ASYNC = 10; + + /** */ private final IgniteMessaging messaging; + /** */ + private final IgniteMessaging messagingAsync; + /** * Ctor. * @@ -73,6 +82,7 @@ public class PlatformMessaging extends PlatformAbstractTarget { assert messaging != null; this.messaging = messaging; + messagingAsync = messaging.withAsync(); } /** {@inheritDoc} */ @@ -120,6 +130,18 @@ public class PlatformMessaging extends PlatformAbstractTarget { return TRUE; } + case OP_REMOTE_LISTEN_ASYNC: { + startRemoteListen(reader, messagingAsync); + + return readAndListenFuture(reader); + } + + case OP_STOP_REMOTE_LISTEN_ASYNC: { + messagingAsync.stopRemoteListen(reader.readUuid()); + + return readAndListenFuture(reader); + } + default: return super.processInStreamOutLong(type, reader); } @@ -131,17 +153,7 @@ public class PlatformMessaging extends PlatformAbstractTarget { throws IgniteCheckedException { switch (type) { case OP_REMOTE_LISTEN:{ - Object nativeFilter = reader.readObjectDetached(); - - long ptr = reader.readLong(); // interop pointer - - Object topic = reader.readObjectDetached(); - - PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); - - UUID listenId = messaging.remoteListen(topic, filter); - - writer.writeUuid(listenId); + writer.writeUuid(startRemoteListen(reader, messaging)); break; } @@ -151,9 +163,26 @@ public class PlatformMessaging extends PlatformAbstractTarget { } } + /** + * Starts the remote listener. + * @param reader Reader. + * @return Listen id. + */ + private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messaging) { + Object nativeFilter = reader.readObjectDetached(); + + long ptr = reader.readLong(); // interop pointer + + Object topic = reader.readObjectDetached(); + + PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); + + return messaging.remoteListen(topic, filter); + } + /** {@inheritDoc} */ @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)messaging.future()).internalFuture(); + return ((IgniteFutureImpl)messagingAsync.future()).internalFuture(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 7aaf597..5898979 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -82,6 +82,18 @@ public class PlatformServices extends PlatformAbstractTarget { private static final int OP_CANCEL_ALL = 10; /** */ + private static final int OP_DOTNET_DEPLOY_ASYNC = 11; + + /** */ + private static final int OP_DOTNET_DEPLOY_MULTIPLE_ASYNC = 12; + + /** */ + private static final int OP_CANCEL_ASYNC = 13; + + /** */ + private static final int OP_CANCEL_ALL_ASYNC = 14; + + /** */ private static final byte PLATFORM_JAVA = 0; /** */ @@ -94,6 +106,9 @@ public class PlatformServices extends PlatformAbstractTarget { /** */ private final IgniteServices services; + /** */ + private final IgniteServices servicesAsync; + /** Server keep binary flag. */ private final boolean srvKeepBinary; @@ -110,6 +125,7 @@ public class PlatformServices extends PlatformAbstractTarget { assert services != null; this.services = services; + servicesAsync = services.withAsync(); this.srvKeepBinary = srvKeepBinary; } @@ -132,43 +148,45 @@ public class PlatformServices extends PlatformAbstractTarget { throws IgniteCheckedException { switch (type) { case OP_DOTNET_DEPLOY: { - ServiceConfiguration cfg = new ServiceConfiguration(); + dotnetDeploy(reader, services); - cfg.setName(reader.readString()); - cfg.setService(new PlatformDotNetServiceImpl(reader.readObjectDetached(), platformCtx, srvKeepBinary)); - cfg.setTotalCount(reader.readInt()); - cfg.setMaxPerNodeCount(reader.readInt()); - cfg.setCacheName(reader.readString()); - cfg.setAffinityKey(reader.readObjectDetached()); + return TRUE; + } - Object filter = reader.readObjectDetached(); + case OP_DOTNET_DEPLOY_ASYNC: { + dotnetDeploy(reader, servicesAsync); - if (filter != null) - cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter)); + return readAndListenFuture(reader); + } - services.deploy(cfg); + case OP_DOTNET_DEPLOY_MULTIPLE: { + dotnetDeployMultiple(reader, services); return TRUE; } - case OP_DOTNET_DEPLOY_MULTIPLE: { - String name = reader.readString(); - Object svc = reader.readObjectDetached(); - int totalCnt = reader.readInt(); - int maxPerNodeCnt = reader.readInt(); + case OP_DOTNET_DEPLOY_MULTIPLE_ASYNC: { + dotnetDeployMultiple(reader, servicesAsync); + + return readAndListenFuture(reader); + } - services.deployMultiple(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepBinary), - totalCnt, maxPerNodeCnt); + case OP_CANCEL: { + services.cancel(reader.readString()); return TRUE; } - case OP_CANCEL: { - String name = reader.readString(); + case OP_CANCEL_ASYNC: { + servicesAsync.cancel(reader.readString()); - services.cancel(name); + return readAndListenFuture(reader); + } - return TRUE; + case OP_CANCEL_ALL_ASYNC: { + servicesAsync.cancelAll(); + + return readAndListenFuture(reader); } default: @@ -334,7 +352,41 @@ public class PlatformServices extends PlatformAbstractTarget { /** {@inheritDoc} */ @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)services.future()).internalFuture(); + return ((IgniteFutureImpl)servicesAsync.future()).internalFuture(); + } + + /** + * Deploys multiple dotnet services. + */ + private void dotnetDeployMultiple(BinaryRawReaderEx reader, IgniteServices services) { + String name = reader.readString(); + Object svc = reader.readObjectDetached(); + int totalCnt = reader.readInt(); + int maxPerNodeCnt = reader.readInt(); + + services.deployMultiple(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepBinary), + totalCnt, maxPerNodeCnt); + } + + /** + * Deploys dotnet service. + */ + private void dotnetDeploy(BinaryRawReaderEx reader, IgniteServices services) { + ServiceConfiguration cfg = new ServiceConfiguration(); + + cfg.setName(reader.readString()); + cfg.setService(new PlatformDotNetServiceImpl(reader.readObjectDetached(), platformCtx, srvKeepBinary)); + cfg.setTotalCount(reader.readInt()); + cfg.setMaxPerNodeCount(reader.readInt()); + cfg.setCacheName(reader.readString()); + cfg.setAffinityKey(reader.readObjectDetached()); + + Object filter = reader.readObjectDetached(); + + if (filter != null) + cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter)); + + services.deploy(cfg); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java index 339937c..9c8ad50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.processors.platform.transactions; -import java.sql.Timestamp; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.configuration.TransactionConfiguration; @@ -27,7 +24,6 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.lang.IgniteFuture; @@ -36,6 +32,10 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionMetrics; +import java.sql.Timestamp; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + /** * Native transaction wrapper implementation. */ @@ -95,21 +95,6 @@ public class PlatformTransactions extends PlatformAbstractTarget { } /** - * Listens to the transaction future and notifies .NET int future. - */ - private void listenAndNotifyIntFuture(final long futId, final Transaction asyncTx) { - IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() { - private static final long serialVersionUID = 0L; - - @Override public Object apply(IgniteFuture fut) { - return null; - } - }); - - PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, this); - } - - /** * Register transaction. * * @param tx Transaction. @@ -138,10 +123,9 @@ public class PlatformTransactions extends PlatformAbstractTarget { /** * @param id Transaction ID. - * @throws org.apache.ignite.IgniteCheckedException In case of error. * @return Transaction state. */ - private int txClose(long id) throws IgniteCheckedException { + private int txClose(long id) { Transaction tx = tx(id); try { @@ -209,7 +193,6 @@ public class PlatformTransactions extends PlatformAbstractTarget { /** {@inheritDoc} */ @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { long txId = reader.readLong(); - long futId = reader.readLong(); final Transaction asyncTx = (Transaction)tx(txId).withAsync(); @@ -229,7 +212,16 @@ public class PlatformTransactions extends PlatformAbstractTarget { return super.processInStreamOutLong(type, reader); } - listenAndNotifyIntFuture(futId, asyncTx); + // Future result is the tx itself, we do not want to return it to the platform. + IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() { + private static final long serialVersionUID = 0L; + + @Override public Object apply(IgniteFuture fut) { + return null; + } + }); + + readAndListenFuture(reader, fut); return TRUE; } http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/include/ignite/jni/exports.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h index 276c06a..586c389 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h @@ -68,8 +68,6 @@ extern "C" { void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType); void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ); void IGNITE_CALL IgniteTargetListenFutureForOperation(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId); - void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ); - void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId); void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj); void IGNITE_CALL IgniteRelease(void* obj); http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index 9e5bcae..4c79a61 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -282,8 +282,6 @@ namespace ignite jmethodID m_PlatformTarget_inObjectStreamOutObjectStream; jmethodID m_PlatformTarget_listenFuture; jmethodID m_PlatformTarget_listenFutureForOperation; - jmethodID m_PlatformTarget_listenFutureAndGet; - jmethodID m_PlatformTarget_listenFutureForOperationAndGet; jclass c_PlatformUtils; jmethodID m_PlatformUtils_reallocate; @@ -465,8 +463,6 @@ namespace ignite jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL); void TargetListenFuture(jobject obj, long long futId, int typ); void TargetListenFutureForOperation(jobject obj, long long futId, int typ, int opId); - void* TargetListenFutureAndGet(jobject obj, long long futId, int typ); - void* TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId); jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr); http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/project/vs/module.def ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def index 2e76bf7..d9bc411 100644 --- a/modules/platforms/cpp/jni/project/vs/module.def +++ b/modules/platforms/cpp/jni/project/vs/module.def @@ -39,8 +39,6 @@ IgniteProcessorExtensions @97 IgniteProcessorAtomicLong @98 IgniteListenableCancel @110 IgniteListenableIsCancelled @111 -IgniteTargetListenFutureAndGet @112 -IgniteTargetListenFutureForOperationAndGet @113 IgniteProcessorCreateCacheFromConfig @114 IgniteProcessorGetOrCreateCacheFromConfig @115 IgniteProcessorGetIgniteConfiguration @116 http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/src/exports.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp index ab569b0..ee2f5c7 100644 --- a/modules/platforms/cpp/jni/src/exports.cpp +++ b/modules/platforms/cpp/jni/src/exports.cpp @@ -190,14 +190,6 @@ extern "C" { ctx->TargetListenFutureForOperation(static_cast<jobject>(obj), futId, typ, opId); } - void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ) { - return ctx->TargetListenFutureAndGet(static_cast<jobject>(obj), futId, typ); - } - - void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId) { - return ctx->TargetListenFutureForOperationAndGet(static_cast<jobject>(obj), futId, typ, opId); - } - void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj) { return ctx->Acquire(static_cast<jobject>(obj)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index fbfb17e..c1efbe2 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -231,8 +231,6 @@ namespace ignite JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false); JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE = JniMethod("listenFuture", "(JI)V", false); JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION = JniMethod("listenFutureForOperation", "(JII)V", false); - JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET = JniMethod("listenFutureAndGet", "(JI)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false); - JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET = JniMethod("listenFutureForOperationAndGet", "(JII)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false); const char* C_PLATFORM_CALLBACK_UTILS = "org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils"; @@ -551,8 +549,6 @@ namespace ignite m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM); m_PlatformTarget_listenFuture = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE); m_PlatformTarget_listenFutureForOperation = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION); - m_PlatformTarget_listenFutureAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET); - m_PlatformTarget_listenFutureForOperationAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET); c_PlatformUtils = FindClass(env, C_PLATFORM_UTILS); m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC); @@ -1439,28 +1435,6 @@ namespace ignite ExceptionCheck(env); } - void* JniContext::TargetListenFutureAndGet(jobject obj, long long futId, int typ) { - JNIEnv* env = Attach(); - - jobject res = env->CallObjectMethod(obj, - jvm->GetMembers().m_PlatformTarget_listenFutureAndGet, futId, typ); - - ExceptionCheck(env); - - return LocalToGlobal(env, res); - } - - void* JniContext::TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId) { - JNIEnv* env = Attach(); - - jobject res = env->CallObjectMethod(obj, - jvm->GetMembers().m_PlatformTarget_listenFutureForOperationAndGet, futId, typ, opId); - - ExceptionCheck(env); - - return LocalToGlobal(env, res); - } - jobject JniContext::CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* err) { JNIEnv* env = Attach(); http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index d9adc06..b1cf611 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache using System; using System.Collections; using System.Collections.Generic; - using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Apache.Ignite.Core.Binary; @@ -35,7 +34,6 @@ namespace Apache.Ignite.Core.Impl.Cache using Apache.Ignite.Core.Impl.Cache.Query.Continuous; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Native cache wrapper. @@ -61,15 +59,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** Flag: keep binary. */ private readonly bool _flagKeepBinary; - /** Flag: async mode.*/ - private readonly bool _flagAsync; - /** Flag: no-retries.*/ private readonly bool _flagNoRetries; - /** Async instance. */ - private readonly Lazy<CacheImpl<TK, TV>> _asyncInstance; - /// <summary> /// Constructor. /// </summary> @@ -78,72 +70,72 @@ namespace Apache.Ignite.Core.Impl.Cache /// <param name="marsh">Marshaller.</param> /// <param name="flagSkipStore">Skip store flag.</param> /// <param name="flagKeepBinary">Keep binary flag.</param> - /// <param name="flagAsync">Async mode flag.</param> /// <param name="flagNoRetries">No-retries mode flag.</param> public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh, - bool flagSkipStore, bool flagKeepBinary, bool flagAsync, bool flagNoRetries) : base(target, marsh) + bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries) : base(target, marsh) { _ignite = grid; _flagSkipStore = flagSkipStore; _flagKeepBinary = flagKeepBinary; - _flagAsync = flagAsync; _flagNoRetries = flagNoRetries; + } - _asyncInstance = new Lazy<CacheImpl<TK, TV>>(WithAsync); + /** <inheritDoc /> */ + public IIgnite Ignite + { + get { return _ignite; } } /// <summary> - /// Returns an instance with async mode enabled. + /// Performs async operation. /// </summary> - private CacheImpl<TK, TV> WithAsync() + private Task DoOutOpAsync<T1>(CacheOp op, T1 val1) { - var target = DoOutOpObject((int) CacheOp.WithAsync); - - return new CacheImpl<TK, TV>(_ignite, target, Marshaller, _flagSkipStore, _flagKeepBinary, - true, _flagNoRetries); + return DoOutOpAsync<object, T1>((int) op, val1); } - /** <inheritDoc /> */ - public IIgnite Ignite + /// <summary> + /// Performs async operation. + /// </summary> + private Task<TR> DoOutOpAsync<T1, TR>(CacheOp op, T1 val1) { - get { return _ignite; } + return DoOutOpAsync<T1, TR>((int) op, val1); } - /** <inheritDoc /> */ - private bool IsAsync + /// <summary> + /// Performs async operation. + /// </summary> + private Task DoOutOpAsync<T1, T2>(CacheOp op, T1 val1, T2 val2) { - get { return _flagAsync; } + return DoOutOpAsync<T1, T2, object>((int) op, val1, val2); } /// <summary> - /// Gets and resets task for previous asynchronous operation. + /// Performs async operation. /// </summary> - /// <param name="lastAsyncOp">The last async op id.</param> - /// <returns> - /// Task for previous asynchronous operation. - /// </returns> - private Task GetTask(CacheOp lastAsyncOp) + private Task<TR> DoOutOpAsync<T1, T2, TR>(CacheOp op, T1 val1, T2 val2) { - return GetTask<object>(lastAsyncOp); + return DoOutOpAsync<T1, T2, TR>((int) op, val1, val2); } /// <summary> - /// Gets and resets task for previous asynchronous operation. + /// Performs async operation. /// </summary> - /// <typeparam name="TResult">The type of the result.</typeparam> - /// <param name="lastAsyncOp">The last async op id.</param> - /// <param name="converter">The converter.</param> - /// <returns> - /// Task for previous asynchronous operation. - /// </returns> - private Task<TResult> GetTask<TResult>(CacheOp lastAsyncOp, Func<BinaryReader, TResult> converter = null) + private Task DoOutOpAsync(CacheOp op, Action<BinaryWriter> writeAction = null) { - Debug.Assert(_flagAsync); + return DoOutOpAsync<object>(op, writeAction); + } - return GetFuture((futId, futTypeId) => UU.TargetListenFutureForOperation(Target, futId, futTypeId, - (int) lastAsyncOp), _flagKeepBinary, converter).Task; + /// <summary> + /// Performs async operation. + /// </summary> + private Task<T> DoOutOpAsync<T>(CacheOp op, Action<BinaryWriter> writeAction = null, + Func<BinaryReader, T> convertFunc = null) + { + return DoOutOpAsync((int)op, writeAction, IsKeepBinary, convertFunc); } + /** <inheritDoc /> */ public string Name { @@ -169,7 +161,7 @@ namespace Apache.Ignite.Core.Impl.Cache return this; return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithSkipStore), Marshaller, - true, _flagKeepBinary, _flagAsync, true); + true, _flagKeepBinary, true); } /// <summary> @@ -193,7 +185,7 @@ namespace Apache.Ignite.Core.Impl.Cache } return new CacheImpl<TK1, TV1>(_ignite, DoOutOpObject((int) CacheOp.WithKeepBinary), Marshaller, - _flagSkipStore, true, _flagAsync, _flagNoRetries); + _flagSkipStore, true, _flagNoRetries); } /** <inheritDoc /> */ @@ -212,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Cache w.WriteLong(access); }); - return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepBinary, _flagAsync, _flagNoRetries); + return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepBinary, _flagNoRetries); } /// <summary> @@ -244,50 +236,43 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) { - LoadCache0(p, args, (int)CacheOp.LoadCache); + DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException); } /** <inheritDoc /> */ public Task LoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args) { - AsyncInstance.LoadCache(p, args); - - return AsyncInstance.GetTask(CacheOp.LoadCache); + return DoOutOpAsync(CacheOp.LoadCacheAsync, writer => WriteLoadCacheData(writer, p, args)); } /** <inheritDoc /> */ public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) { - LoadCache0(p, args, (int)CacheOp.LocLoadCache); + DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException); } /** <inheritDoc /> */ public Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args) { - AsyncInstance.LocalLoadCache(p, args); - - return AsyncInstance.GetTask(CacheOp.LocLoadCache); + return DoOutOpAsync(CacheOp.LocLoadCacheAsync, writer => WriteLoadCacheData(writer, p, args)); } /// <summary> - /// Loads the cache. + /// Writes the load cache data to the writer. /// </summary> - private void LoadCache0(ICacheEntryFilter<TK, TV> p, object[] args, int opId) + private void WriteLoadCacheData(IBinaryRawWriter writer, ICacheEntryFilter<TK, TV> p, object[] args) { - DoOutInOpX(opId, writer => + if (p != null) { - if (p != null) - { - var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry<TK, TV>((TK) k, (TV) v)), - Marshaller, IsKeepBinary); + var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry<TK, TV>((TK) k, (TV) v)), + Marshaller, IsKeepBinary); - writer.WriteObject(p0); - } - else - writer.WriteObject<CacheEntryFilterHolder>(null); + writer.WriteObject(p0); + } + else + writer.WriteObject<CacheEntryFilterHolder>(null); - writer.WriteArray(args); - }, ReadException); + writer.WriteArray(args); } /** <inheritDoc /> */ @@ -299,12 +284,11 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues) { - return GetFuture<object>((futId, futTyp) => DoOutOp(CacheOp.LoadAll, writer => + return DoOutOpAsync(CacheOp.LoadAll, writer => { - writer.WriteLong(futId); writer.WriteBoolean(replaceExistingValues); WriteEnumerable(writer, keys); - })).Task; + }); } /** <inheritDoc /> */ @@ -318,9 +302,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<bool> ContainsKeyAsync(TK key) { - AsyncInstance.ContainsKey(key); + IgniteArgumentCheck.NotNull(key, "key"); - return AsyncInstance.GetTask<bool>(CacheOp.ContainsKey); + return DoOutOpAsync<TK, bool>(CacheOp.ContainsKeyAsync, key); } /** <inheritDoc /> */ @@ -334,9 +318,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys) { - AsyncInstance.ContainsKeys(keys); + IgniteArgumentCheck.NotNull(keys, "keys"); - return AsyncInstance.GetTask<bool>(CacheOp.ContainsKeys); + return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => WriteEnumerable(writer, keys)); } /** <inheritDoc /> */ @@ -376,16 +360,10 @@ namespace Apache.Ignite.Core.Impl.Cache { get { - if (IsAsync) - throw new InvalidOperationException("Indexer can't be used in async mode."); - return Get(key); } set { - if (IsAsync) - throw new InvalidOperationException("Indexer can't be used in async mode."); - Put(key, value); } } @@ -399,26 +377,19 @@ namespace Apache.Ignite.Core.Impl.Cache w => w.Write(key), (stream, res) => { - if (res == True) // Not null - { - Debug.Assert(!IsAsync); - - return Unmarshal<TV>(stream); - } - - if (!IsAsync) + if (res != True) throw GetKeyNotFoundException(); - return default(TV); + return Unmarshal<TV>(stream); }, ReadException); } /** <inheritDoc /> */ public Task<TV> GetAsync(TK key) { - AsyncInstance.Get(key); + IgniteArgumentCheck.NotNull(key, "key"); - return AsyncInstance.GetTask(CacheOp.Get, reader => + return DoOutOpAsync(CacheOp.GetAsync, w => w.WriteObject(key), reader => { if (reader != null) return reader.ReadObject<TV>(); @@ -432,9 +403,6 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - if (IsAsync) - throw new InvalidOperationException("TryGet can't be used in async mode."); - var res = DoOutInOpNullable(CacheOp.Get, key); value = res.Value; @@ -447,9 +415,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - AsyncInstance.Get(key); - - return AsyncInstance.GetTask(CacheOp.Get, GetCacheResult); + return DoOutOpAsync(CacheOp.GetAsync, w => w.WriteObject(key), reader => GetCacheResult(reader)); } /** <inheritDoc /> */ @@ -466,9 +432,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<IDictionary<TK, TV>> GetAllAsync(IEnumerable<TK> keys) { - AsyncInstance.GetAll(keys); + IgniteArgumentCheck.NotNull(keys, "keys"); - return AsyncInstance.GetTask(CacheOp.GetAll, r => r == null ? null : ReadGetAllDictionary(r)); + return DoOutOpAsync(CacheOp.GetAllAsync, w => WriteEnumerable(w, keys), r => ReadGetAllDictionary(r)); } /** <inheritdoc /> */ @@ -484,16 +450,16 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task PutAsync(TK key, TV val) { - AsyncInstance.Put(key, val); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); - return AsyncInstance.GetTask(CacheOp.Put); + return DoOutOpAsync(CacheOp.PutAsync, key, val); } /** <inheritDoc /> */ public CacheResult<TV> GetAndPut(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(val, "val"); return DoOutInOpNullable(CacheOp.GetAndPut, key, val); @@ -502,9 +468,14 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val) { - AsyncInstance.GetAndPut(key, val); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); - return AsyncInstance.GetTask(CacheOp.GetAndPut, GetCacheResult); + return DoOutOpAsync(CacheOp.GetAndPutAsync, w => + { + w.WriteObject(key); + w.WriteObject(val); + }, r => GetCacheResult(r)); } /** <inheritDoc /> */ @@ -520,9 +491,14 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val) { - AsyncInstance.GetAndReplace(key, val); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); - return AsyncInstance.GetTask(CacheOp.GetAndReplace, GetCacheResult); + return DoOutOpAsync(CacheOp.GetAndReplaceAsync, w => + { + w.WriteObject(key); + w.WriteObject(val); + }, r => GetCacheResult(r)); } /** <inheritDoc /> */ @@ -536,9 +512,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<CacheResult<TV>> GetAndRemoveAsync(TK key) { - AsyncInstance.GetAndRemove(key); + IgniteArgumentCheck.NotNull(key, "key"); - return AsyncInstance.GetTask(CacheOp.GetAndRemove, GetCacheResult); + return DoOutOpAsync(CacheOp.GetAndRemoveAsync, w => w.WriteObject(key), r => GetCacheResult(r)); } /** <inheritdoc /> */ @@ -554,9 +530,10 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<bool> PutIfAbsentAsync(TK key, TV val) { - AsyncInstance.PutIfAbsent(key, val); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); - return AsyncInstance.GetTask<bool>(CacheOp.PutIfAbsent); + return DoOutOpAsync<TK, TV, bool>(CacheOp.PutIfAbsentAsync, key, val); } /** <inheritdoc /> */ @@ -572,9 +549,14 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val) { - AsyncInstance.GetAndPutIfAbsent(key, val); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); - return AsyncInstance.GetTask(CacheOp.GetAndPutIfAbsent, GetCacheResult); + return DoOutOpAsync(CacheOp.GetAndPutIfAbsentAsync, w => + { + w.WriteObject(key); + w.WriteObject(val); + }, r => GetCacheResult(r)); } /** <inheritdoc /> */ @@ -590,9 +572,10 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<bool> ReplaceAsync(TK key, TV val) { - AsyncInstance.Replace(key, val); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); - return AsyncInstance.GetTask<bool>(CacheOp.Replace2); + return DoOutOpAsync<TK, TV, bool>(CacheOp.Replace2Async, key, val); } /** <inheritdoc /> */ @@ -610,9 +593,16 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal) { - AsyncInstance.Replace(key, oldVal, newVal); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(oldVal, "oldVal"); + IgniteArgumentCheck.NotNull(newVal, "newVal"); - return AsyncInstance.GetTask<bool>(CacheOp.Replace3); + return DoOutOpAsync<bool>(CacheOp.Replace3Async, w => + { + w.WriteObject(key); + w.WriteObject(oldVal); + w.WriteObject(newVal); + }); } /** <inheritdoc /> */ @@ -626,9 +616,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task PutAllAsync(IDictionary<TK, TV> vals) { - AsyncInstance.PutAll(vals); + IgniteArgumentCheck.NotNull(vals, "vals"); - return AsyncInstance.GetTask(CacheOp.PutAll); + return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer, vals)); } /** <inheritdoc /> */ @@ -648,9 +638,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task ClearAsync() { - AsyncInstance.Clear(); - - return AsyncInstance.GetTask(); + return DoOutOpAsync(CacheOp.ClearCacheAsync); } /** <inheritdoc /> */ @@ -664,9 +652,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task ClearAsync(TK key) { - AsyncInstance.Clear(key); + IgniteArgumentCheck.NotNull(key, "key"); - return AsyncInstance.GetTask(CacheOp.Clear); + return DoOutOpAsync(CacheOp.ClearAsync, key); } /** <inheritdoc /> */ @@ -680,9 +668,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task ClearAllAsync(IEnumerable<TK> keys) { - AsyncInstance.ClearAll(keys); + IgniteArgumentCheck.NotNull(keys, "keys"); - return AsyncInstance.GetTask(CacheOp.ClearAll); + return DoOutOpAsync(CacheOp.ClearAllAsync, writer => WriteEnumerable(writer, keys)); } /** <inheritdoc /> */ @@ -712,16 +700,15 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<bool> RemoveAsync(TK key) { - AsyncInstance.Remove(key); + IgniteArgumentCheck.NotNull(key, "key"); - return AsyncInstance.GetTask<bool>(CacheOp.RemoveObj); + return DoOutOpAsync<TK, bool>(CacheOp.RemoveObjAsync, key); } /** <inheritDoc /> */ public bool Remove(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(val, "val"); return DoOutOp(CacheOp.RemoveBool, key, val); @@ -730,9 +717,10 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<bool> RemoveAsync(TK key, TV val) { - AsyncInstance.Remove(key, val); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); - return AsyncInstance.GetTask<bool>(CacheOp.RemoveBool); + return DoOutOpAsync<TK, TV, bool>(CacheOp.RemoveBoolAsync, key, val); } /** <inheritDoc /> */ @@ -746,9 +734,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task RemoveAllAsync(IEnumerable<TK> keys) { - AsyncInstance.RemoveAll(keys); + IgniteArgumentCheck.NotNull(keys, "keys"); - return AsyncInstance.GetTask(CacheOp.RemoveAll); + return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer, keys)); } /** <inheritDoc /> */ @@ -760,9 +748,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task RemoveAllAsync() { - AsyncInstance.RemoveAll(); - - return AsyncInstance.GetTask(); + return DoOutOpAsync(CacheOp.RemoveAll2Async); } /** <inheritDoc /> */ @@ -780,9 +766,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<int> GetSizeAsync(params CachePeekMode[] modes) { - AsyncInstance.GetSize(modes); + var modes0 = EncodePeekModes(modes); - return AsyncInstance.GetTask<int>(); + return DoOutOpAsync<int>(CacheOp.SizeAsync, w => w.WriteInt(modes0)); } /// <summary> @@ -831,20 +817,29 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task<TRes> InvokeAsync<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) { - AsyncInstance.Invoke(key, processor, arg); + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(processor, "processor"); - return AsyncInstance.GetTask(CacheOp.Invoke, r => - { - if (r == null) - return default(TRes); + var holder = new CacheEntryProcessorHolder(processor, arg, + (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV)); - var hasError = r.ReadBoolean(); + return DoOutOpAsync(CacheOp.InvokeAsync, writer => + { + writer.Write(key); + writer.Write(holder); + }, + r => + { + if (r == null) + return default(TRes); - if (hasError) - throw ReadException(r.Stream); + var hasError = r.ReadBoolean(); - return r.ReadObject<TRes>(); - }); + if (hasError) + throw ReadException(r); + + return r.ReadObject<TRes>(); + }); } /** <inheritdoc /> */ @@ -864,17 +859,28 @@ namespace Apache.Ignite.Core.Impl.Cache WriteEnumerable(writer, keys); writer.Write(holder); }, - (input, res) => res == True ? ReadInvokeAllResults<TRes>(input) : null, - ReadException); + (input, res) => res == True ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary)): null, ReadException); } /** <inheritDoc /> */ public Task<IDictionary<TK, ICacheEntryProcessorResult<TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) { - AsyncInstance.InvokeAll(keys, processor, arg); + IgniteArgumentCheck.NotNull(keys, "keys"); + + IgniteArgumentCheck.NotNull(processor, "processor"); + + var holder = new CacheEntryProcessorHolder(processor, arg, + (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV)); + + return DoOutOpAsync(CacheOp.InvokeAllAsync, + writer => + { + WriteEnumerable(writer, keys); + writer.Write(holder); + }, + input => ReadInvokeAllResults<TRes>(input)); - return AsyncInstance.GetTask(CacheOp.InvokeAll, reader => ReadInvokeAllResults<TRes>(reader.Stream)); } /** <inheritDoc /> */ @@ -936,7 +942,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task Rebalance() { - return GetFuture<object>((futId, futTyp) => DoOutInOpLong((int) CacheOp.Rebalance, futId)).Task; + return DoOutOpAsync(CacheOp.Rebalance); } /** <inheritDoc /> */ @@ -946,15 +952,7 @@ namespace Apache.Ignite.Core.Impl.Cache return this; return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithNoRetries), Marshaller, - _flagSkipStore, _flagKeepBinary, _flagAsync, true); - } - - /// <summary> - /// Gets the asynchronous instance. - /// </summary> - private CacheImpl<TK, TV> AsyncInstance - { - get { return _asyncInstance.Value; } + _flagSkipStore, _flagKeepBinary, true); } #region Queries @@ -1136,11 +1134,11 @@ namespace Apache.Ignite.Core.Impl.Cache /// Reads results of InvokeAll operation. /// </summary> /// <typeparam name="T">The type of the result.</typeparam> - /// <param name="inStream">Stream.</param> + /// <param name="reader">Stream.</param> /// <returns>Results of InvokeAll operation.</returns> - private IDictionary<TK, ICacheEntryProcessorResult<T>> ReadInvokeAllResults<T>(IBinaryStream inStream) + private IDictionary<TK, ICacheEntryProcessorResult<T>> ReadInvokeAllResults<T>(BinaryReader reader) { - var count = inStream.ReadInt(); + var count = reader.ReadInt(); if (count == -1) return null; @@ -1149,27 +1147,33 @@ namespace Apache.Ignite.Core.Impl.Cache for (var i = 0; i < count; i++) { - var key = Unmarshal<TK>(inStream); + var key = reader.ReadObject<TK>(); - var hasError = inStream.ReadBool(); + var hasError = reader.ReadBoolean(); results[key] = hasError - ? new CacheEntryProcessorResult<T>(ReadException(inStream)) - : new CacheEntryProcessorResult<T>(Unmarshal<T>(inStream)); + ? new CacheEntryProcessorResult<T>(ReadException(reader)) + : new CacheEntryProcessorResult<T>(reader.ReadObject<T>()); } return results; } /// <summary> + /// Reads the exception. + /// </summary> + private Exception ReadException(IBinaryStream stream) + { + return ReadException(Marshaller.StartUnmarshal(stream)); + } + + /// <summary> /// Reads the exception, either in binary wrapper form, or as a pair of strings. /// </summary> - /// <param name="inStream">The stream.</param> + /// <param name="reader">The stream.</param> /// <returns>Exception.</returns> - private Exception ReadException(IBinaryStream inStream) + private Exception ReadException(BinaryReader reader) { - var reader = Marshaller.StartUnmarshal(inStream, _flagKeepBinary); - var item = reader.ReadObject<object>(); var clsName = item as string; @@ -1177,8 +1181,8 @@ namespace Apache.Ignite.Core.Impl.Cache if (clsName == null) return new CacheEntryProcessorException((Exception) item); - var msg = Unmarshal<string>(inStream); - var trace = Unmarshal<string>(inStream); + var msg = reader.ReadObject<string>(); + var trace = reader.ReadObject<string>(); var inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null; return ExceptionUtils.GetException(_ignite, clsName, msg, trace, reader, inner); @@ -1191,6 +1195,9 @@ namespace Apache.Ignite.Core.Impl.Cache /// <returns>Dictionary.</returns> private static IDictionary<TK, TV> ReadGetAllDictionary(BinaryReader reader) { + if (reader == null) + return null; + IBinaryStream stream = reader.Stream; if (stream.ReadBool()) http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs index a43df38..8bf3945 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs @@ -78,6 +78,31 @@ namespace Apache.Ignite.Core.Impl.Cache CloseLock = 54, Rebalance = 55, SizeLoc = 56, - Extension = 57 + PutAsync = 57, + ClearCacheAsync = 58, + ClearAllAsync = 59, + RemoveAll2Async = 60, + SizeAsync = 61, + ClearAsync = 62, + LoadCacheAsync = 63, + LocLoadCacheAsync = 64, + PutAllAsync = 65, + RemoveAllAsync = 66, + GetAsync = 67, + ContainsKeyAsync = 68, + ContainsKeysAsync = 69, + RemoveBoolAsync = 70, + RemoveObjAsync = 71, + GetAllAsync = 72, + GetAndPutAsync = 73, + GetAndPutIfAbsentAsync = 74, + GetAndRemoveAsync = 75, + GetAndReplaceAsync = 76, + Replace2Async = 77, + Replace3Async = 78, + InvokeAsync = 79, + InvokeAllAsync = 80, + PutIfAbsentAsync = 81, + Extension = 82 } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs index 1b2e2aa..df68e1c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Impl.Compute using System.Linq; using System.Runtime.Serialization; using System.Threading; + using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Compute; @@ -141,12 +142,7 @@ namespace Apache.Ignite.Core.Impl.Compute try { - TReduceRes res = DoOutInOp<TReduceRes>(OpExec, writer => - { - WriteTask(writer, taskName, taskArg, nodes); - }); - - return res; + return DoOutInOp<TReduceRes>(OpExec, writer => WriteTask(writer, taskName, taskArg, nodes)); } finally { @@ -167,18 +163,7 @@ namespace Apache.Ignite.Core.Impl.Compute try { - Future<TReduceRes> fut = null; - - DoOutInOp(OpExecAsync, writer => - { - WriteTask(writer, taskName, taskArg, nodes); - }, input => - { - fut = GetFuture<TReduceRes>((futId, futTyp) => - UU.TargetListenFutureAndGet(Target, futId, futTyp), _keepBinary.Value); - }); - - return fut; + return DoOutOpObjectAsync<TReduceRes>(OpExecAsync, w => WriteTask(w, taskName, taskArg, nodes)); } finally { @@ -625,12 +610,12 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="taskName">Task name.</param> /// <param name="taskArg">Task arg.</param> /// <param name="nodes">Nodes.</param> - private void WriteTask(BinaryWriter writer, string taskName, object taskArg, + private void WriteTask(IBinaryRawWriter writer, string taskName, object taskArg, ICollection<IClusterNode> nodes) { writer.WriteString(taskName); writer.WriteBoolean(_keepBinary.Value); - writer.Write(taskArg); + writer.WriteObject(taskArg); WriteNodeIds(writer, nodes); } @@ -640,7 +625,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// </summary> /// <param name="writer">Writer.</param> /// <param name="nodes">Nodes.</param> - private static void WriteNodeIds(BinaryWriter writer, ICollection<IClusterNode> nodes) + private static void WriteNodeIds(IBinaryRawWriter writer, ICollection<IClusterNode> nodes) { if (nodes == null) writer.WriteBoolean(false);
