IGNITE-2943 .NET: Improve cache error propagation and interop performance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/886ed64f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/886ed64f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/886ed64f Branch: refs/heads/ignite-comm-opts2 Commit: 886ed64fd6958d7dad1226ef1de7e75e7b29ff2f Parents: ccd036c Author: Pavel Tupitsyn <ptupit...@apache.org> Authored: Thu Aug 18 18:14:12 2016 +0300 Committer: Pavel Tupitsyn <ptupit...@apache.org> Committed: Fri Sep 16 13:04:13 2016 +0300 ---------------------------------------------------------------------- .../platform/PlatformAbstractTarget.java | 17 +- .../platform/cache/PlatformCache.java | 335 ++++++++++--------- .../dotnet/PlatformDotNetCacheStore.java | 12 +- .../platform/utils/PlatformFutureUtils.java | 6 +- .../platform/utils/PlatformUtils.java | 25 ++ .../include/ignite/impl/binary/binary_utils.h | 87 +++++ .../src/impl/binary/binary_reader_impl.cpp | 30 +- .../ignite/impl/interop/interop_target.h | 15 +- .../cpp/core/include/ignite/impl/operations.h | 47 ++- .../cpp/core/src/impl/cache/cache_impl.cpp | 14 +- .../core/src/impl/interop/interop_target.cpp | 41 ++- .../src/impl/transactions/transactions_impl.cpp | 5 + .../Apache.Ignite.Benchmarks/BenchmarkRunner.cs | 5 +- .../Interop/PlatformBenchmarkBase.cs | 2 +- .../Cache/CacheAbstractTest.cs | 2 +- .../Cache/Store/CacheStoreTest.cs | 39 ++- .../Cache/Store/CacheTestStore.cs | 50 ++- .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 239 ++++++------- .../Apache.Ignite.Core/Impl/ExceptionUtils.cs | 10 +- .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 80 ++++- .../Impl/Unmanaged/UnmanagedCallbacks.cs | 21 +- 21 files changed, 746 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 0cd683d..0ca4453 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -38,6 +38,9 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** Constant: FALSE. */ protected static final int FALSE = 0; + /** Constant: ERROR. */ + protected static final int ERROR = -1; + /** */ private static final int OP_META = -1; @@ -69,7 +72,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { return TRUE; } else - return processInStreamOutLong(type, reader); + return processInStreamOutLong(type, reader, mem); } catch (Exception e) { throw convertException(e); @@ -235,6 +238,18 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { } /** + * Process IN operation. + * + * @param type Type. + * @param reader Binary reader. + * @return Result. + * @throws IgniteCheckedException In case of exception. + */ + protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { + return processInStreamOutLong(type, reader); + } + + /** * Process IN-OUT operation. * * @param type Type. http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index d572e8b..a7b6e41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -42,10 +42,13 @@ import org.apache.ignite.internal.processors.platform.PlatformNativeException; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor; import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.C1; @@ -290,109 +293,207 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { - switch (type) { - case OP_PUT: - cache.put(reader.readObjectDetached(), reader.readObjectDetached()); + @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { + try { + switch (type) { + case OP_PUT: + cache.put(reader.readObjectDetached(), reader.readObjectDetached()); - return TRUE; + return TRUE; - case OP_REMOVE_BOOL: - return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + case OP_GET: + return writeResult(mem, cache.get(reader.readObjectDetached())); - case OP_REMOVE_ALL: - cache.removeAll(PlatformUtils.readSet(reader)); + case OP_REMOVE_BOOL: + return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - return TRUE; + case OP_REMOVE_ALL: + cache.removeAll(PlatformUtils.readSet(reader)); - case OP_PUT_ALL: - cache.putAll(PlatformUtils.readMap(reader)); + return TRUE; - return TRUE; + case OP_PUT_ALL: + cache.putAll(PlatformUtils.readMap(reader)); - case OP_LOC_EVICT: - cache.localEvict(PlatformUtils.readCollection(reader)); + return TRUE; - return TRUE; + case OP_LOC_EVICT: + cache.localEvict(PlatformUtils.readCollection(reader)); - case OP_CONTAINS_KEY: - return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE; + return TRUE; - case OP_CONTAINS_KEYS: - return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE; + case OP_CONTAINS_KEY: + return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE; - case OP_LOC_PROMOTE: { - cache.localPromote(PlatformUtils.readSet(reader)); + case OP_CONTAINS_KEYS: + return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE; - break; - } + case OP_LOC_PROMOTE: { + cache.localPromote(PlatformUtils.readSet(reader)); - case OP_REPLACE_3: - return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(), - reader.readObjectDetached()) ? TRUE : FALSE; + return TRUE; + } - case OP_LOC_LOAD_CACHE: - loadCache0(reader, true); + case OP_REPLACE_3: + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(), + reader.readObjectDetached()) ? TRUE : FALSE; - break; + case OP_LOC_LOAD_CACHE: + loadCache0(reader, true); - case OP_LOAD_CACHE: - loadCache0(reader, false); + return TRUE; - break; + case OP_LOAD_CACHE: + loadCache0(reader, false); - case OP_CLEAR: - cache.clear(reader.readObjectDetached()); + return TRUE; - break; + case OP_CLEAR: + cache.clear(reader.readObjectDetached()); - case OP_CLEAR_ALL: - cache.clearAll(PlatformUtils.readSet(reader)); + return TRUE; - break; + case OP_CLEAR_ALL: + cache.clearAll(PlatformUtils.readSet(reader)); - case OP_LOCAL_CLEAR: - cache.localClear(reader.readObjectDetached()); + return TRUE; - break; + case OP_LOCAL_CLEAR: + cache.localClear(reader.readObjectDetached()); - case OP_LOCAL_CLEAR_ALL: - cache.localClearAll(PlatformUtils.readSet(reader)); + return TRUE; - break; + case OP_LOCAL_CLEAR_ALL: + cache.localClearAll(PlatformUtils.readSet(reader)); - case OP_PUT_IF_ABSENT: { - return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - } + return TRUE; - case OP_REPLACE_2: { - return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - } + case OP_PUT_IF_ABSENT: + return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - case OP_REMOVE_OBJ: { - return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE; - } + case OP_REPLACE_2: + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_REMOVE_OBJ: + return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_IS_LOCAL_LOCKED: + return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; + + case OP_LOAD_ALL: { + long futId = reader.readLong(); + boolean replaceExisting = reader.readBoolean(); + + CompletionListenable fut = new CompletionListenable(); + + PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this); + + cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut); + + return TRUE; + } + + case OP_GET_AND_PUT: + return writeResult(mem, cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached())); + + case OP_GET_AND_REPLACE: + return writeResult(mem, cache.getAndReplace(reader.readObjectDetached(), reader.readObjectDetached())); + + case OP_GET_AND_REMOVE: + return writeResult(mem, cache.getAndRemove(reader.readObjectDetached())); + + case OP_GET_AND_PUT_IF_ABSENT: + return writeResult(mem, cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached())); + + case OP_PEEK: { + Object key = reader.readObjectDetached(); + + CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); + + return writeResult(mem, cache.localPeek(key, modes)); + } + + case OP_GET_ALL: { + Set keys = PlatformUtils.readSet(reader); + + Map entries = cache.getAll(keys); + + return writeResult(mem, entries, new PlatformWriterClosure<Map>() { + @Override public void write(BinaryRawWriterEx writer, Map val) { + PlatformUtils.writeNullableMap(writer, val); + } + }); + } + + case OP_INVOKE: { + Object key = reader.readObjectDetached(); - case OP_IS_LOCAL_LOCKED: - return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - case OP_LOAD_ALL: { - long futId = reader.readLong(); - boolean replaceExisting = reader.readBoolean(); + return writeResult(mem, cache.invoke(key, proc)); + } - CompletionListenable fut = new CompletionListenable(); + case OP_INVOKE_ALL: { + Set<Object> keys = PlatformUtils.readSet(reader); - PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this); + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut); + Map results = cache.invokeAll(keys, proc); - return TRUE; + return writeResult(mem, results, new PlatformWriterClosure<Map>() { + @Override public void write(BinaryRawWriterEx writer, Map val) { + writeInvokeAllResult(writer, val); + } + }); + } + + case OP_LOCK: + return registerLock(cache.lock(reader.readObjectDetached())); + + case OP_LOCK_ALL: + return registerLock(cache.lockAll(PlatformUtils.readCollection(reader))); } + } + catch (Exception e) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = platformCtx.writer(out); - default: - return super.processInStreamOutLong(type, reader); + Exception err = convertException(e); + + PlatformUtils.writeError(err, writer); + PlatformUtils.writeErrorData(err, writer); + + out.synchronize(); + + return ERROR; } + return super.processInStreamOutLong(type, reader, mem); + } + + /** + * Writes the result to reused stream, if any. + */ + private long writeResult(PlatformMemory mem, Object obj) { + return writeResult(mem, obj, null); + } + + /** + * Writes the result to reused stream, if any. + */ + private long writeResult(PlatformMemory mem, Object obj, PlatformWriterClosure clo) { + if (obj == null) + return FALSE; + + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = platformCtx.writer(out); + + if (clo == null) + writer.writeObjectDetached(obj); + else + clo.write(writer, obj); + + out.synchronize(); return TRUE; } @@ -555,106 +656,6 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) - throws IgniteCheckedException { - switch (type) { - case OP_GET: { - writer.writeObjectDetached(cache.get(reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_PUT: { - writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_REPLACE: { - writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(), - reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_REMOVE: { - writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_PUT_IF_ABSENT: { - writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached())); - - break; - } - - case OP_PEEK: { - Object key = reader.readObjectDetached(); - - CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); - - writer.writeObjectDetached(cache.localPeek(key, modes)); - - break; - } - - case OP_GET_ALL: { - Set keys = PlatformUtils.readSet(reader); - - Map entries = cache.getAll(keys); - - PlatformUtils.writeNullableMap(writer, entries); - - break; - } - - case OP_INVOKE: { - Object key = reader.readObjectDetached(); - - CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - - try { - writer.writeObjectDetached(cache.invoke(key, proc)); - } - catch (EntryProcessorException ex) - { - if (ex.getCause() instanceof PlatformNativeException) - writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); - else - throw ex; - } - - break; - } - - case OP_INVOKE_ALL: { - Set<Object> keys = PlatformUtils.readSet(reader); - - CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - - writeInvokeAllResult(writer, cache.invokeAll(keys, proc)); - - break; - } - - case OP_LOCK: - writer.writeLong(registerLock(cache.lock(reader.readObjectDetached()))); - - break; - - case OP_LOCK_ALL: - writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader)))); - - break; - - default: - super.processInStreamOutStream(type, reader, writer); - } - } - - /** {@inheritDoc} */ @Override public Exception convertException(Exception e) { if (e instanceof CachePartialUpdateException) return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(), @@ -699,7 +700,7 @@ public class PlatformCache extends PlatformAbstractTarget { catch (Exception ex) { writer.writeBoolean(true); // Exception - writeError(writer, ex); + PlatformUtils.writeError(ex, writer); } } } @@ -1033,7 +1034,7 @@ public class PlatformCache extends PlatformAbstractTarget { else { writer.writeBoolean(true); // Error. - writeError(writer, (Exception) err); + PlatformUtils.writeError(err, writer); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java index 1c60a42..d38fd8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java @@ -389,10 +389,9 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor * * @param task Task. * @param cb Optional callback. - * @return Result. * @throws org.apache.ignite.IgniteCheckedException If failed. */ - protected int doInvoke(IgniteInClosureX<BinaryRawWriterEx> task, @Nullable PlatformCacheStoreCallback cb) + protected void doInvoke(IgniteInClosureX<BinaryRawWriterEx> task, @Nullable PlatformCacheStoreCallback cb) throws IgniteCheckedException{ try (PlatformMemory mem = platformCtx.memory().allocate()) { PlatformOutputStream out = mem.output(); @@ -403,7 +402,14 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor out.synchronize(); - return platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb); + int res = platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb); + + if (res != 0) { + // Read error + Object nativeErr = platformCtx.reader(mem.input()).readObjectDetached(); + + throw platformCtx.createNativeException(nativeErr); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index 6692a23..5985d22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; @@ -293,10 +292,7 @@ public class PlatformFutureUtils { BinaryRawWriterEx outWriter = ctx.writer(out); - outWriter.writeString(err.getClass().getName()); - outWriter.writeString(err.getMessage()); - outWriter.writeString(X.getFullStackTrace(err)); - + PlatformUtils.writeError(err, outWriter); PlatformUtils.writeErrorData(err, outWriter); out.synchronize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index dd90fda..ccdd59d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -575,6 +575,31 @@ public class PlatformUtils { } /** + * Writes error. + * + * @param ex Error. + * @param writer Writer. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public static void writeError(Throwable ex, BinaryRawWriterEx writer) { + writer.writeObjectDetached(ex.getClass().getName()); + + writer.writeObjectDetached(ex.getMessage()); + + writer.writeObjectDetached(X.getFullStackTrace(ex)); + + PlatformNativeException nativeCause = X.cause(ex, PlatformNativeException.class); + + if (nativeCause != null) { + writer.writeBoolean(true); + + writer.writeObjectDetached(nativeCause.cause()); + } + else + writer.writeBoolean(false); + } + + /** * Writer error data. * * @param err Error. http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h index 88130d8..3abd651 100644 --- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h +++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h @@ -26,6 +26,8 @@ #include "ignite/date.h" #include "ignite/timestamp.h" +#include "ignite/binary/binary_type.h" + namespace ignite { namespace impl @@ -538,7 +540,92 @@ namespace ignite */ static Timestamp MakeTimestampLocal(int year = 1900, int month = 1, int day = 1, int hour = 0, int min = 0, int sec = 0, long ns = 0); + + /** + * Get default value for the type. + * + * @return Null value for non primitive types and zeroes for primitives. + */ + template<typename T> + static T GetDefaultValue() + { + ignite::binary::BinaryType<T> binType; + + return binType.GetNull(); + } }; + + template<> + inline int8_t BinaryUtils::GetDefaultValue<int8_t>() + { + return 0; + } + + template<> + inline int16_t BinaryUtils::GetDefaultValue<int16_t>() + { + return 0; + } + + template<> + inline uint16_t BinaryUtils::GetDefaultValue<uint16_t>() + { + return 0; + } + + template<> + inline int32_t BinaryUtils::GetDefaultValue<int32_t>() + { + return 0; + } + + template<> + inline int64_t BinaryUtils::GetDefaultValue<int64_t>() + { + return 0; + } + + template<> + inline bool BinaryUtils::GetDefaultValue<bool>() + { + return false; + } + + template<> + inline float BinaryUtils::GetDefaultValue<float>() + { + return 0.0f; + } + + template<> + inline double BinaryUtils::GetDefaultValue<double>() + { + return 0.0; + } + + template<> + inline Guid BinaryUtils::GetDefaultValue<Guid>() + { + return Guid(); + } + + template<> + inline Date BinaryUtils::GetDefaultValue<Date>() + { + return Date(); + } + + template<> + inline Timestamp BinaryUtils::GetDefaultValue<Timestamp>() + { + return Timestamp(); + } + + template<> + inline std::string BinaryUtils::GetDefaultValue<std::string>() + { + return std::string(); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp index 33205e4..c3f4fcc 100644 --- a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp @@ -676,49 +676,57 @@ namespace ignite template <> int8_t BinaryReaderImpl::ReadTopObject<int8_t>() { - return ReadTopObject0(IGNITE_TYPE_BYTE, BinaryUtils::ReadInt8, static_cast<int8_t>(0)); + return ReadTopObject0(IGNITE_TYPE_BYTE, BinaryUtils::ReadInt8, + BinaryUtils::GetDefaultValue<int8_t>()); } template <> bool BinaryReaderImpl::ReadTopObject<bool>() { - return ReadTopObject0(IGNITE_TYPE_BOOL, BinaryUtils::ReadBool, static_cast<bool>(0)); + return ReadTopObject0(IGNITE_TYPE_BOOL, BinaryUtils::ReadBool, + BinaryUtils::GetDefaultValue<bool>()); } template <> int16_t BinaryReaderImpl::ReadTopObject<int16_t>() { - return ReadTopObject0(IGNITE_TYPE_SHORT, BinaryUtils::ReadInt16, static_cast<int16_t>(0)); + return ReadTopObject0(IGNITE_TYPE_SHORT, BinaryUtils::ReadInt16, + BinaryUtils::GetDefaultValue<int16_t>()); } template <> uint16_t BinaryReaderImpl::ReadTopObject<uint16_t>() { - return ReadTopObject0(IGNITE_TYPE_CHAR, BinaryUtils::ReadUInt16, static_cast<uint16_t>(0)); + return ReadTopObject0(IGNITE_TYPE_CHAR, BinaryUtils::ReadUInt16, + BinaryUtils::GetDefaultValue<uint16_t>()); } template <> int32_t BinaryReaderImpl::ReadTopObject<int32_t>() { - return ReadTopObject0(IGNITE_TYPE_INT, BinaryUtils::ReadInt32, static_cast<int32_t>(0)); + return ReadTopObject0(IGNITE_TYPE_INT, BinaryUtils::ReadInt32, + BinaryUtils::GetDefaultValue<int32_t>()); } template <> int64_t BinaryReaderImpl::ReadTopObject<int64_t>() { - return ReadTopObject0(IGNITE_TYPE_LONG, BinaryUtils::ReadInt64, static_cast<int64_t>(0)); + return ReadTopObject0(IGNITE_TYPE_LONG, BinaryUtils::ReadInt64, + BinaryUtils::GetDefaultValue<int64_t>()); } template <> float BinaryReaderImpl::ReadTopObject<float>() { - return ReadTopObject0(IGNITE_TYPE_FLOAT, BinaryUtils::ReadFloat, static_cast<float>(0)); + return ReadTopObject0(IGNITE_TYPE_FLOAT, BinaryUtils::ReadFloat, + BinaryUtils::GetDefaultValue<float>()); } template <> double BinaryReaderImpl::ReadTopObject<double>() { - return ReadTopObject0(IGNITE_TYPE_DOUBLE, BinaryUtils::ReadDouble, static_cast<double>(0)); + return ReadTopObject0(IGNITE_TYPE_DOUBLE, BinaryUtils::ReadDouble, + BinaryUtils::GetDefaultValue<double>()); } template <> @@ -729,7 +737,7 @@ namespace ignite if (typeId == IGNITE_TYPE_UUID) return BinaryUtils::ReadGuid(stream); else if (typeId == IGNITE_HDR_NULL) - return Guid(); + return BinaryUtils::GetDefaultValue<Guid>(); else { int32_t pos = stream->Position() - 1; @@ -747,7 +755,7 @@ namespace ignite else if (typeId == IGNITE_TYPE_TIMESTAMP) return Date(BinaryUtils::ReadTimestamp(stream).GetMilliseconds()); else if (typeId == IGNITE_HDR_NULL) - return Date(); + return BinaryUtils::GetDefaultValue<Date>(); else { int32_t pos = stream->Position() - 1; @@ -763,7 +771,7 @@ namespace ignite if (typeId == IGNITE_TYPE_TIMESTAMP) return BinaryUtils::ReadTimestamp(stream); else if (typeId == IGNITE_HDR_NULL) - return Timestamp(); + return BinaryUtils::GetDefaultValue<Timestamp>(); else { int32_t pos = stream->Position() - 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h index 8b6ebb9..4042fa2 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h +++ b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h @@ -68,14 +68,25 @@ namespace ignite /** * Internal out-in operation. + * Uses two independent memory pieces to write and read data. * * @param opType Operation type. * @param inOp Input. * @param outOp Output. * @param err Error. */ - void OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, - IgniteError* err); + void OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err); + + /** + * Internal out-in operation. + * Uses single memory piece to write and read data. + * + * @param opType Operation type. + * @param inOp Input. + * @param outOp Output. + * @param err Error. + */ + void OutInOpX(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err); /** * Get environment shared pointer. http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/cpp/core/include/ignite/impl/operations.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h index ed01ece..a8fef93 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/operations.h +++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h @@ -27,6 +27,7 @@ #include "ignite/cache/cache_entry.h" #include "ignite/impl/binary/binary_reader_impl.h" #include "ignite/impl/binary/binary_writer_impl.h" +#include "ignite/impl/binary/binary_utils.h" #include "ignite/binary/binary.h" namespace ignite @@ -270,7 +271,12 @@ namespace ignite * * @param reader Reader. */ - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) = 0; + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) = 0; + + /** + * Sets result to null value. + */ + virtual void SetNull() = 0; }; /** @@ -288,11 +294,16 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { val = reader.ReadTopObject<T>(); } + virtual void SetNull() + { + val = binary::BinaryUtils::GetDefaultValue<T>(); + } + /** * Get value. * @@ -324,12 +335,18 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { val1 = reader.ReadTopObject<T1>(); val2 = reader.ReadTopObject<T2>(); } + virtual void SetNull() + { + val1 = binary::BinaryUtils::GetDefaultValue<T1>(); + val2 = binary::BinaryUtils::GetDefaultValue<T2>(); + } + /** * Get value 1. * @@ -375,7 +392,7 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { val1 = reader.ReadTopObject<T1>(); val2 = reader.ReadTopObject<T2>(); @@ -383,6 +400,14 @@ namespace ignite val4 = reader.ReadTopObject<T4>(); } + virtual void SetNull() + { + val1 = binary::BinaryUtils::GetDefaultValue<T1>(); + val2 = binary::BinaryUtils::GetDefaultValue<T2>(); + val3 = binary::BinaryUtils::GetDefaultValue<T3>(); + val4 = binary::BinaryUtils::GetDefaultValue<T4>(); + } + /** * Get value 1. * @@ -454,7 +479,7 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { bool exists = reader.GetStream()->ReadBool(); @@ -475,6 +500,11 @@ namespace ignite } } + virtual void SetNull() + { + // No-op. + } + /** * Get value. * @@ -506,7 +536,7 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { int32_t cnt = reader.ReadInt32(); @@ -519,6 +549,11 @@ namespace ignite } } + virtual void SetNull() + { + res->clear(); + } + private: /** Entries. */ std::vector<ignite::cache::CacheEntry<K, V>>* res; http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp index e728f55..8197187 100644 --- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp @@ -156,17 +156,17 @@ namespace ignite void CacheImpl::LocalPeek(InputOperation& inOp, OutputOperation& outOp, int32_t peekModes, IgniteError* err) { - OutInOp(OP_LOCAL_PEEK, inOp, outOp, err); + OutInOpX(OP_LOCAL_PEEK, inOp, outOp, err); } void CacheImpl::Get(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET, inOp, outOp, err); + OutInOpX(OP_GET, inOp, outOp, err); } void CacheImpl::GetAll(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_ALL, inOp, outOp, err); + OutInOpX(OP_GET_ALL, inOp, outOp, err); } void CacheImpl::Put(InputOperation& inOp, IgniteError* err) @@ -181,17 +181,17 @@ namespace ignite void CacheImpl::GetAndPut(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_PUT, inOp, outOp, err); + OutInOpX(OP_GET_AND_PUT, inOp, outOp, err); } void CacheImpl::GetAndReplace(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_REPLACE, inOp, outOp, err); + OutInOpX(OP_GET_AND_REPLACE, inOp, outOp, err); } void CacheImpl::GetAndRemove(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_REMOVE, inOp, outOp, err); + OutInOpX(OP_GET_AND_REMOVE, inOp, outOp, err); } bool CacheImpl::PutIfAbsent(InputOperation& inOp, IgniteError* err) @@ -201,7 +201,7 @@ namespace ignite void CacheImpl::GetAndPutIfAbsent(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_PUT_IF_ABSENT, inOp, outOp, err); + OutInOpX(OP_GET_AND_PUT_IF_ABSENT, inOp, outOp, err); } bool CacheImpl::Replace(InputOperation& inOp, IgniteError* err) http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp index 05764c7..5d17214 100644 --- a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp +++ b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp @@ -31,6 +31,21 @@ namespace ignite { namespace interop { + /** + * Operation result. + */ + enum OperationResult + { + /** Null. */ + ResultNull = 0, + + /** Object. */ + ResultObject = 1, + + /** Error. */ + ResultError = -1 + }; + InteropTarget::InteropTarget(SharedPointer<IgniteEnvironment> env, jobject javaRef) : env(env), javaRef(javaRef) { @@ -116,8 +131,7 @@ namespace ignite return false; } - void InteropTarget::OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, - IgniteError* err) + void InteropTarget::OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { JniErrorInfo jniErr; @@ -137,6 +151,29 @@ namespace ignite ReadFrom(inMem.Get(), outOp); } } + + void InteropTarget::OutInOpX(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err) + { + JniErrorInfo jniErr; + + SharedPointer<InteropMemory> outInMem = env.Get()->AllocateMemory(); + + int64_t outInPtr = WriteTo(outInMem.Get(), inOp, err); + + if (outInPtr) + { + int64_t res = env.Get()->Context()->TargetInStreamOutLong(javaRef, opType, outInPtr, &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS && res == ResultObject) + ReadFrom(outInMem.Get(), outOp); + else if (res == ResultNull) + outOp.SetNull(); + + //Read and process error if res == ResultError here. + } + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp index 6c01332..fed43fc 100644 --- a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp @@ -145,6 +145,11 @@ namespace ignite val = TransactionMetrics(commitTime, rollbackTime, commits, rollbacks); } + virtual void SetNull() + { + // No-op. + } + /** * Get value. * http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs index 5d8e78a..40ae01e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs @@ -21,6 +21,7 @@ namespace Apache.Ignite.Benchmarks using System.Diagnostics; using System.Text; using Apache.Ignite.Benchmarks.Binary; + using Apache.Ignite.Benchmarks.Interop; /// <summary> /// Benchmark runner. @@ -35,8 +36,8 @@ namespace Apache.Ignite.Benchmarks public static void Main(string[] args) { args = new[] { - typeof(BinarizableReadBenchmark).FullName, - "-ConfigPath", @"modules\platforms\dotnet\Apache.Ignite.Benchmarks\Config\benchmark.xml", + typeof(GetBenchmark).FullName, + "-ConfigPath", @"C:\W\incubator-ignite\modules\platforms\dotnet\Apache.Ignite.Benchmarks\Config\benchmark.xml", "-Threads", "1", "-Warmup", "0", "-Duration", "60", http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs index eeebed0..f437eb8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs @@ -66,7 +66,7 @@ namespace Apache.Ignite.Benchmarks.Interop "-DIGNITE_QUIET=false", "-DIGNITE_NO_SHUTDOWN_HOOK=true" }, - JvmClasspath = Classpath ?? Core.Impl.Common.Classpath.CreateClasspath(), + JvmClasspath = Classpath ?? Core.Impl.Common.Classpath.CreateClasspath(forceTestClasspath: true), JvmDllPath = DllPath, SpringConfigUrl = ConfigPath }; http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs index 62dc2df..7627ce0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs @@ -2906,7 +2906,7 @@ namespace Apache.Ignite.Core.Tests.Cache Assert.IsInstanceOf<CacheEntryProcessorException>(ex); if (string.IsNullOrEmpty(containsText)) - Assert.AreEqual(ex.InnerException.Message, AddArgCacheEntryProcessor.ExceptionText); + Assert.AreEqual(AddArgCacheEntryProcessor.ExceptionText, ex.InnerException.Message); else Assert.IsTrue(ex.ToString().Contains(containsText)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs index 8061e9f..d39ccde 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs @@ -178,13 +178,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store [TearDown] public void AfterTest() { + CacheTestStore.Reset(); + var cache = GetCache(); cache.Clear(); - Assert.IsTrue(cache.IsEmpty(), "Cache is not empty: " + cache.GetSize()); - - CacheTestStore.Reset(); + Assert.IsTrue(cache.IsEmpty(), + "Cache is not empty: " + + string.Join(", ", cache.Select(x => string.Format("[{0}:{1}]", x.Key, x.Value)))); TestUtils.AssertHandleRegistryHasItems(300, _storeCount, Ignition.GetIgnite(GridName)); @@ -210,6 +212,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Store // Test exception in filter Assert.Throws<CacheStoreException>(() => cache.LoadCache(new ExceptionalEntryFilter(), 100, 10)); + + // Test exception in store + CacheTestStore.ThrowError = true; + CheckCustomStoreError(Assert.Throws<CacheStoreException>(() => + cache.LoadCache(new CacheEntryFilter(), 100, 10)).InnerException); } [Test] @@ -262,6 +269,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { Assert.AreEqual("val_" + i, cache.GetAsync(i).Result); } + + // Test errors + CacheTestStore.ThrowError = true; + CheckCustomStoreError( + Assert.Throws<AggregateException>( + () => cache.LocalLoadCacheAsync(new CacheEntryFilter(), 100, 10).Wait()) + .InnerException); } [Test] @@ -282,6 +296,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store Assert.AreEqual("val", cache.Get(1)); Assert.AreEqual(1, cache.GetSize()); + + // Test errors + CacheTestStore.ThrowError = true; + CheckCustomStoreError(Assert.Throws<CacheStoreException>(() => cache.Put(-2, "fail")).InnerException); + + cache.LocalEvict(new[] { 1 }); + CheckCustomStoreError(Assert.Throws<CacheStoreException>(() => cache.Get(1)).InnerException); } [Test] @@ -418,8 +439,6 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using (var tx = cache.Ignite.GetTransactions().TxStart()) { - CacheTestStore.ExpCommit = true; - tx.AddMeta("meta", 100); cache.Put(1, "val"); @@ -549,6 +568,16 @@ namespace Apache.Ignite.Core.Tests.Cache.Store return Ignition.GetIgnite(GridName).GetOrCreateCache<int, string>(cacheName); } + + private static void CheckCustomStoreError(Exception err) + { + var customErr = err as CacheTestStore.CustomStoreException ?? + err.InnerException as CacheTestStore.CustomStoreException; + + Assert.IsNotNull(customErr); + + Assert.AreEqual(customErr.Message, customErr.Details); + } } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs index b4b1670..4224835 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; + using System.Runtime.Serialization; using System.Threading; using Apache.Ignite.Core.Cache.Store; using Apache.Ignite.Core.Resource; @@ -32,12 +33,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { public static readonly IDictionary Map = new ConcurrentDictionary<object, object>(); - public static bool ExpCommit; - public static bool LoadMultithreaded; public static bool LoadObjects; + public static bool ThrowError; + [InstanceResource] private IIgnite _grid = null; @@ -54,13 +55,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { Map.Clear(); - ExpCommit = false; LoadMultithreaded = false; LoadObjects = false; + ThrowError = false; } public void LoadCache(Action<object, object> act, params object[] args) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); if (LoadMultithreaded) @@ -91,6 +94,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public object Load(object key) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); return Map[key]; @@ -98,6 +103,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public IDictionary LoadAll(ICollection keys) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); return keys.OfType<object>().ToDictionary(key => key, key => "val_" + key); @@ -105,6 +112,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void Write(object key, object val) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); Map[key] = val; @@ -112,6 +121,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void WriteAll(IDictionary map) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); foreach (DictionaryEntry e in map) @@ -120,6 +131,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void Delete(object key) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); Map.Remove(key); @@ -127,6 +140,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void DeleteAll(ICollection keys) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); foreach (object key in keys) @@ -151,5 +166,34 @@ namespace Apache.Ignite.Core.Tests.Cache.Store get { return stringProperty; } set { stringProperty = value; } } + + private static void ThrowIfNeeded() + { + if (ThrowError) + throw new CustomStoreException("Exception in cache store"); + } + + [Serializable] + public class CustomStoreException : Exception, ISerializable + { + public string Details { get; private set; } + + public CustomStoreException(string message) : base(message) + { + Details = message; + } + + protected CustomStoreException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) + { + Details = info.GetString("details"); + } + + public override void GetObjectData(SerializationInfo info, StreamingContext context) + { + info.AddValue("details", Details); + + base.GetObjectData(info, context); + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index 32c59de..8ba3e29 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -271,7 +271,7 @@ namespace Apache.Ignite.Core.Impl.Cache /// </summary> private void LoadCache0(ICacheEntryFilter<TK, TV> p, object[] args, int opId) { - DoOutOp(opId, writer => + DoOutInOpX(opId, writer => { if (p != null) { @@ -284,7 +284,7 @@ namespace Apache.Ignite.Core.Impl.Cache writer.WriteObject<CacheEntryFilterHolder>(null); writer.WriteArray(args); - }); + }, ReadException); } /** <inheritDoc /> */ @@ -296,7 +296,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues) { - return GetFuture<object>((futId, futTyp) => DoOutOp((int) CacheOp.LoadAll, writer => + return GetFuture<object>((futId, futTyp) => DoOutOp(CacheOp.LoadAll, writer => { writer.WriteLong(futId); writer.WriteBoolean(replaceExistingValues); @@ -309,7 +309,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int)CacheOp.ContainsKey, key) == True; + return DoOutOp(CacheOp.ContainsKey, key); } /** <inheritDoc /> */ @@ -325,7 +325,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOp((int)CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)) == True; + return DoOutOp(CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)); } /** <inheritDoc /> */ @@ -354,11 +354,14 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - var res = DoOutInOpNullable<TV>((int)CacheOp.Peek, writer => - { - writer.Write(key); - writer.WriteInt(EncodePeekModes(modes)); - }); + var res = DoOutInOpX((int)CacheOp.Peek, + w => + { + w.Write(key); + w.WriteInt(EncodePeekModes(modes)); + }, + (s, r) => r == True ? new CacheResult<TV>(Unmarshal<TV>(s)) : new CacheResult<TV>(), + ReadException); value = res.Success ? res.Value : default(TV); @@ -389,19 +392,22 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - var result = DoOutInOpNullable<TK, TV>((int) CacheOp.Get, key); - - if (!IsAsync) - { - if (!result.Success) - throw GetKeyNotFoundException(); + return DoOutInOpX((int) CacheOp.Get, + w => w.Write(key), + (stream, res) => + { + if (res == True) // Not null + { + Debug.Assert(!IsAsync); - return result.Value; - } + return Unmarshal<TV>(stream); + } - Debug.Assert(!result.Success); + if (!IsAsync) + throw GetKeyNotFoundException(); - return default(TV); + return default(TV); + }, ReadException); } /** <inheritDoc /> */ @@ -426,7 +432,7 @@ namespace Apache.Ignite.Core.Impl.Cache if (IsAsync) throw new InvalidOperationException("TryGet can't be used in async mode."); - var res = DoOutInOpNullable<TK, TV>((int) CacheOp.Get, key); + var res = DoOutInOpNullable(CacheOp.Get, key); value = res.Value; @@ -448,14 +454,10 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp((int)CacheOp.GetAll, + return DoOutInOpX((int) CacheOp.GetAll, writer => WriteEnumerable(writer, keys), - input => - { - var reader = Marshaller.StartUnmarshal(input, _flagKeepBinary); - - return ReadGetAllDictionary(reader); - }); + (s, r) => r == True ? ReadGetAllDictionary(Marshaller.StartUnmarshal(s, _flagKeepBinary)) : null, + ReadException); } /** <inheritDoc /> */ @@ -473,7 +475,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - DoOutOp((int)CacheOp.Put, key, val); + DoOutOp(CacheOp.Put, key, val); } /** <inheritDoc /> */ @@ -491,7 +493,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable<TK, TV, TV>((int)CacheOp.GetAndPut, key, val); + return DoOutInOpNullable(CacheOp.GetAndPut, key, val); } /** <inheritDoc /> */ @@ -509,7 +511,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable<TK, TV, TV>((int) CacheOp.GetAndReplace, key, val); + return DoOutInOpNullable(CacheOp.GetAndReplace, key, val); } /** <inheritDoc /> */ @@ -525,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpNullable<TK, TV>((int)CacheOp.GetAndRemove, key); + return DoOutInOpNullable(CacheOp.GetAndRemove, key); } /** <inheritDoc /> */ @@ -543,7 +545,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int) CacheOp.PutIfAbsent, key, val) == True; + return DoOutOp(CacheOp.PutIfAbsent, key, val); } /** <inheritDoc /> */ @@ -561,7 +563,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable<TK, TV, TV>((int)CacheOp.GetAndPutIfAbsent, key, val); + return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val); } /** <inheritDoc /> */ @@ -579,7 +581,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int) CacheOp.Replace2, key, val) == True; + return DoOutOp(CacheOp.Replace2, key, val); } /** <inheritDoc /> */ @@ -599,7 +601,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(newVal, "newVal"); - return DoOutOp((int)CacheOp.Replace3, key, oldVal, newVal) == True; + return DoOutOp(CacheOp.Replace3, key, oldVal, newVal); } /** <inheritDoc /> */ @@ -615,7 +617,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(vals, "vals"); - DoOutOp((int) CacheOp.PutAll, writer => WriteDictionary(writer, vals)); + DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals)); } /** <inheritDoc /> */ @@ -631,7 +633,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int) CacheOp.LocEvict, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocEvict, writer => WriteEnumerable(writer, keys)); } /** <inheritdoc /> */ @@ -653,7 +655,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - DoOutOp((int) CacheOp.Clear, key); + DoOutOp(CacheOp.Clear, key); } /** <inheritDoc /> */ @@ -669,7 +671,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); } /** <inheritDoc /> */ @@ -685,7 +687,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - DoOutOp((int) CacheOp.LocalClear, key); + DoOutOp(CacheOp.LocalClear, key); } /** <inheritdoc /> */ @@ -693,7 +695,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys)); } /** <inheritdoc /> */ @@ -701,7 +703,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int) CacheOp.RemoveObj, key) == True; + return DoOutOp(CacheOp.RemoveObj, key); } /** <inheritDoc /> */ @@ -719,7 +721,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int)CacheOp.RemoveBool, key, val) == True; + return DoOutOp(CacheOp.RemoveBool, key, val); } /** <inheritDoc /> */ @@ -735,7 +737,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys)); } /** <inheritDoc /> */ @@ -798,7 +800,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.LocPromote, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocPromote, writer => WriteEnumerable(writer, keys)); } /** <inheritdoc /> */ @@ -811,12 +813,14 @@ namespace Apache.Ignite.Core.Impl.Cache var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV)); - return DoOutInOp((int)CacheOp.Invoke, writer => - { - writer.Write(key); - writer.Write(holder); - }, - input => GetResultOrThrow<TRes>(Unmarshal<object>(input))); + return DoOutInOpX((int) CacheOp.Invoke, + writer => + { + writer.Write(key); + writer.Write(holder); + }, + (input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes), + ReadException); } /** <inheritDoc /> */ @@ -849,17 +853,19 @@ namespace Apache.Ignite.Core.Impl.Cache var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV)); - return DoOutInOp((int) CacheOp.InvokeAll, + return DoOutInOpX((int) CacheOp.InvokeAll, writer => { WriteEnumerable(writer, keys); writer.Write(holder); }, - input => ReadInvokeAllResults<TRes>(input)); + (input, res) => res == True ? ReadInvokeAllResults<TRes>(input) : null, + ReadException); } /** <inheritDoc /> */ - public Task<IDictionary<TK, ICacheEntryProcessorResult<TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) + public Task<IDictionary<TK, ICacheEntryProcessorResult<TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys, + ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) { AsyncInstance.InvokeAll(keys, processor, arg); @@ -871,10 +877,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOp((int)CacheOp.Lock, writer => - { - writer.Write(key); - }, input => new CacheLock(input.ReadInt(), Target)); + return DoOutInOpX((int) CacheOp.Lock, w => w.Write(key), + (stream, res) => new CacheLock(res, Target), ReadException); } /** <inheritdoc /> */ @@ -882,10 +886,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp((int)CacheOp.LockAll, writer => - { - WriteEnumerable(writer, keys); - }, input => new CacheLock(input.ReadInt(), Target)); + return DoOutInOpX((int) CacheOp.LockAll, w => WriteEnumerable(w, keys), + (stream, res) => new CacheLock(res, Target), ReadException); } /** <inheritdoc /> */ @@ -893,11 +895,11 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int)CacheOp.IsLocalLocked, writer => + return DoOutOp(CacheOp.IsLocalLocked, writer => { writer.Write(key); writer.WriteBoolean(byCurrentThread); - }) == True; + }); } /** <inheritDoc /> */ @@ -1159,22 +1161,6 @@ namespace Apache.Ignite.Core.Impl.Cache } /// <summary> - /// Unwraps an exception. - /// </summary> - /// <typeparam name="T">Result type.</typeparam> - /// <param name="obj">Object.</param> - /// <returns>Result.</returns> - private static T GetResultOrThrow<T>(object obj) - { - var err = obj as Exception; - - if (err != null) - throw err as CacheEntryProcessorException ?? new CacheEntryProcessorException(err); - - return obj == null ? default(T) : (T) obj; - } - - /// <summary> /// Reads results of InvokeAll operation. /// </summary> /// <typeparam name="T">The type of the result.</typeparam> @@ -1208,9 +1194,11 @@ namespace Apache.Ignite.Core.Impl.Cache /// </summary> /// <param name="inStream">The stream.</param> /// <returns>Exception.</returns> - private CacheEntryProcessorException ReadException(IBinaryStream inStream) + private Exception ReadException(IBinaryStream inStream) { - var item = Unmarshal<object>(inStream); + var reader = Marshaller.StartUnmarshal(inStream, _flagKeepBinary); + + var item = reader.ReadObject<object>(); var clsName = item as string; @@ -1219,8 +1207,9 @@ namespace Apache.Ignite.Core.Impl.Cache var msg = Unmarshal<string>(inStream); var trace = Unmarshal<string>(inStream); - - return new CacheEntryProcessorException(ExceptionUtils.GetException(_ignite, clsName, msg, trace)); + var inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null; + + return ExceptionUtils.GetException(_ignite, clsName, msg, trace, reader, inner); } /// <summary> @@ -1272,49 +1261,73 @@ namespace Apache.Ignite.Core.Impl.Cache } /// <summary> - /// Perform simple out-in operation accepting single argument. + /// Does the out op. /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val">Value.</param> - /// <returns>Result.</returns> - private CacheResult<TR> DoOutInOpNullable<T1, TR>(int type, T1 val) + private bool DoOutOp<T1>(CacheOp op, T1 x) { - var res = DoOutInOp<T1, object>(type, val); + return DoOutInOpX((int) op, w => + { + w.Write(x); + }, ReadException); + } - return res == null - ? new CacheResult<TR>() - : new CacheResult<TR>((TR)res); + /// <summary> + /// Does the out op. + /// </summary> + private bool DoOutOp<T1, T2>(CacheOp op, T1 x, T2 y) + { + return DoOutInOpX((int) op, w => + { + w.Write(x); + w.Write(y); + }, ReadException); } /// <summary> - /// Perform out-in operation. + /// Does the out op. /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="outAction">Out action.</param> - /// <returns>Result.</returns> - private CacheResult<TR> DoOutInOpNullable<TR>(int type, Action<BinaryWriter> outAction) + private bool DoOutOp<T1, T2, T3>(CacheOp op, T1 x, T2 y, T3 z) { - var res = DoOutInOp<object>(type, outAction); + return DoOutInOpX((int) op, w => + { + w.Write(x); + w.Write(y); + w.Write(z); + }, ReadException); + } - return res == null - ? new CacheResult<TR>() - : new CacheResult<TR>((TR)res); + /// <summary> + /// Does the out op. + /// </summary> + private bool DoOutOp(CacheOp op, Action<BinaryWriter> write) + { + return DoOutInOpX((int) op, write, ReadException); } /// <summary> - /// Perform simple out-in operation accepting single argument. + /// Does the out-in op. /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val1">Value.</param> - /// <param name="val2">Value.</param> - /// <returns>Result.</returns> - private CacheResult<TR> DoOutInOpNullable<T1, T2, TR>(int type, T1 val1, T2 val2) + private CacheResult<TV> DoOutInOpNullable(CacheOp cacheOp, TK x) { - var res = DoOutInOp<T1, T2, object>(type, val1, val2); + return DoOutInOpX((int)cacheOp, + w => w.Write(x), + (stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(), + ReadException); + } - return res == null - ? new CacheResult<TR>() - : new CacheResult<TR>((TR)res); + /// <summary> + /// Does the out-in op. + /// </summary> + private CacheResult<TV> DoOutInOpNullable<T1, T2>(CacheOp cacheOp, T1 x, T2 y) + { + return DoOutInOpX((int)cacheOp, + w => + { + w.Write(x); + w.Write(y); + }, + (stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(), + ReadException); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs index 22881c6..461872f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs @@ -115,10 +115,14 @@ namespace Apache.Ignite.Core.Impl /// <param name="msg">Exception message.</param> /// <param name="stackTrace">Native stack trace.</param> /// <param name="reader">Error data reader.</param> + /// <param name="innerException">Inner exception.</param> /// <returns>Exception.</returns> - public static Exception GetException(IIgnite ignite, string clsName, string msg, string stackTrace, BinaryReader reader = null) + public static Exception GetException(IIgnite ignite, string clsName, string msg, string stackTrace, + BinaryReader reader = null, Exception innerException = null) { - Exception innerException = string.IsNullOrEmpty(stackTrace) ? null : new JavaException(stackTrace); + // Set JavaException as inner only if there is no InnerException. + if (innerException == null && !string.IsNullOrEmpty(stackTrace)) + innerException = new JavaException(stackTrace); ExceptionFactoryDelegate ctor; @@ -158,7 +162,7 @@ namespace Apache.Ignite.Core.Impl /// <param name="reader">Reader.</param> /// <returns>CachePartialUpdateException.</returns> [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - private static Exception ProcessCachePartialUpdateException(IIgnite ignite, string msg, string stackTrace, + private static Exception ProcessCachePartialUpdateException(IIgnite ignite, string msg, string stackTrace, BinaryReader reader) { if (reader == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index 26b6033..5f24e43 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -43,6 +43,9 @@ namespace Apache.Ignite.Core.Impl protected const int True = 1; /** */ + protected const int Error = -1; + + /** */ private const int OpMeta = -1; /** */ @@ -470,7 +473,82 @@ namespace Apache.Ignite.Core.Impl } } } - + + /// <summary> + /// Perform out-in operation with a single stream. + /// </summary> + /// <typeparam name="TR">The type of the r.</typeparam> + /// <param name="type">Operation type.</param> + /// <param name="outAction">Out action.</param> + /// <param name="inAction">In action.</param> + /// <param name="inErrorAction">The action to read an error.</param> + /// <returns> + /// Result. + /// </returns> + protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction, + Func<IBinaryStream, Exception> inErrorAction) + { + Debug.Assert(inErrorAction != null); + + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(stream); + + outAction(writer); + + FinishMarshal(writer); + + var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + + if (res != Error && inAction == null) + return default(TR); // quick path for void operations + + stream.SynchronizeInput(); + + stream.Seek(0, SeekOrigin.Begin); + + if (res != Error) + return inAction != null ? inAction(stream, res) : default(TR); + + throw inErrorAction(stream); + } + } + + /// <summary> + /// Perform out-in operation with a single stream. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="outAction">Out action.</param> + /// <param name="inErrorAction">The action to read an error.</param> + /// <returns> + /// Result. + /// </returns> + protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction, + Func<IBinaryStream, Exception> inErrorAction) + { + Debug.Assert(inErrorAction != null); + + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(stream); + + outAction(writer); + + FinishMarshal(writer); + + var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + + if (res != Error) + return res == True; + + stream.SynchronizeInput(); + + stream.Seek(0, SeekOrigin.Begin); + + throw inErrorAction(stream); + } + } + /// <summary> /// Perform out-in operation. /// </summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/886ed64f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index b1c840f..fd52c8a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -21,6 +21,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; + using System.Globalization; + using System.IO; using System.Runtime.InteropServices; using System.Threading; using Apache.Ignite.Core.Cache.Affinity; @@ -312,6 +314,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged }, true); } + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] private int CacheStoreInvoke(void* target, long objPtr, long memPtr, void* cb) { @@ -326,7 +329,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - return t.Invoke(stream, cb0, _ignite); + try + { + return t.Invoke(stream, cb0, _ignite); + } + catch (Exception e) + { + stream.Seek(0, SeekOrigin.Begin); + + _ignite.Marshaller.StartMarshal(stream).WriteObject(e); + + return -1; + } } }); } @@ -756,8 +770,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged string errCls = reader.ReadString(); string errMsg = reader.ReadString(); string stackTrace = reader.ReadString(); + Exception inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null; - Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader); + Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader, inner); ProcessFuture(futPtr, fut => { fut.OnError(err); }); } @@ -1084,7 +1099,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged // Stream disposal intentionally omitted: IGNITE-1598 var stream = new PlatformRawMemory(errData, errDataLen).GetStream(); - throw ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, + throw ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, _ignite.Marshaller.StartUnmarshal(stream)); }