IGNITE-4729 Async operation support in platform plugins This closes #1561
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/637c18de Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/637c18de Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/637c18de Branch: refs/heads/ignite-4565-ddl Commit: 637c18de190515293e01434862004a410cfadd53 Parents: be93baa Author: Pavel Tupitsyn <[email protected]> Authored: Wed Mar 15 14:02:12 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Wed Mar 15 14:02:12 2017 +0300 ---------------------------------------------------------------------- .../platform/PlatformAbstractTarget.java | 8 ++ .../platform/PlatformAsyncResult.java | 41 +++++++++ .../processors/platform/PlatformTarget.java | 10 +++ .../platform/PlatformTargetProxy.java | 9 ++ .../platform/PlatformTargetProxyImpl.java | 39 +++++++++ .../plugin/PlatformTestPluginTarget.java | 89 +++++++++++++++++++- .../cpp/jni/include/ignite/jni/exports.h | 1 + .../platforms/cpp/jni/include/ignite/jni/java.h | 2 + modules/platforms/cpp/jni/project/vs/module.def | 1 + modules/platforms/cpp/jni/src/exports.cpp | 4 + modules/platforms/cpp/jni/src/java.cpp | 10 +++ .../Plugin/PluginTest.cs | 17 ++++ .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 29 +++++++ .../Impl/Unmanaged/IgniteJniNativeMethods.cs | 3 + .../Impl/Unmanaged/UnmanagedUtils.cs | 5 ++ .../Interop/IPlatformTarget.cs | 13 +++ 16 files changed, 277 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 506470b..396e784 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -125,6 +125,14 @@ public abstract class PlatformAbstractTarget implements PlatformTarget, Platform return throwUnsupported(type); } + /** {@inheritDoc} */ + @Override public PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader) + throws IgniteCheckedException { + throwUnsupported(type); + + return null; + } + /** * Throw an exception rendering unsupported operation type. * http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java new file mode 100644 index 0000000..879f85d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.lang.IgniteFuture; + +/** + * Represents asynchronous operation result. + */ +public interface PlatformAsyncResult { + /** + * Async operation future. + * + * @return Future. + */ + IgniteFuture future(); + + /** + * Async operation result writer method. + * + * @param writer Writer. + * @param result Async operation result. + */ + void write(BinaryRawWriterEx writer, Object result); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java index 5d234dd..9792df8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java @@ -108,6 +108,16 @@ public interface PlatformTarget { PlatformTarget processOutObject(int type) throws IgniteCheckedException; /** + * Process asynchronous operation. + * + * @param type Type. + * @param reader Binary reader. + * @return Async result (should not be null). + * @throws IgniteCheckedException In case of exception. + */ + PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader) throws IgniteCheckedException; + + /** * Convert caught exception. * * @param e Exception to convert. http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java index a4f2a56..c2a0797 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java @@ -97,6 +97,15 @@ public interface PlatformTargetProxy { Object outObject(int type) throws Exception; /** + * Asynchronous operation accepting memory stream. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @throws Exception If case of failure. + */ + void inStreamAsync(int type, long memPtr) throws Exception; + + /** * Start listening for the future. * * @param futId Future ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java index 25a4de8..7e0036d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.lang.IgniteFuture; /** * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}. @@ -104,6 +106,43 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy { } /** {@inheritDoc} */ + @Override public void inStreamAsync(int type, long memPtr) throws Exception { + try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { + BinaryRawReaderEx reader = platformCtx.reader(mem); + + long futId = reader.readLong(); + int futTyp = reader.readInt(); + + final PlatformAsyncResult res = target.processInStreamAsync(type, reader); + + if (res == null) { + throw new IgniteException("PlatformTarget.processInStreamAsync should not return null."); + } + + IgniteFuture fut = res.future(); + + if (fut == null) { + throw new IgniteException("PlatformAsyncResult.future() should not return null."); + } + + PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() { + /** {@inheritDoc} */ + @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { + res.write(writer, obj); + } + + /** {@inheritDoc} */ + @Override public boolean canWrite(Object obj, Throwable err) { + return err == null; + } + }, target); + } + catch (Exception e) { + throw target.convertException(e); + } + } + + /** {@inheritDoc} */ @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception { try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { BinaryRawReaderEx reader = platformCtx.reader(inMem); http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java index e80a23f..7e69425 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java @@ -21,11 +21,14 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformAsyncResult; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.plugin.PluginConfiguration; import org.jetbrains.annotations.Nullable; @@ -33,17 +36,20 @@ import org.jetbrains.annotations.Nullable; * Test target. */ @SuppressWarnings("ConstantConditions") -class PlatformTestPluginTarget extends PlatformAbstractTarget { +class PlatformTestPluginTarget implements PlatformTarget { /** */ private final String name; + /** */ + private final PlatformContext platformCtx; + /** * Constructor. * * @param platformCtx Context. */ PlatformTestPluginTarget(PlatformContext platformCtx, String name) { - super(platformCtx); + this.platformCtx = platformCtx; if (name == null) { // Initialize from configuration. @@ -65,12 +71,17 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget { return val + 1; } - /** {@inheritDoc} */ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { return reader.readString().length(); } /** {@inheritDoc} */ + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) + throws IgniteCheckedException { + return processInStreamOutLong(type, reader); + } + + /** {@inheritDoc} */ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { String s = reader.readString(); @@ -129,6 +140,76 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget { return new PlatformTestPluginTarget(platformCtx, name); } + /** {@inheritDoc} */ + @Override public PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case 1: { + // Async upper case. + final String val = reader.readString(); + final GridFutureAdapter<String> fa = new GridFutureAdapter<>(); + + new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(500L); + fa.onDone(val.toUpperCase()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + + return new PlatformAsyncResult() { + @Override public IgniteFuture future() { + //noinspection unchecked + return new IgniteFutureImpl(fa); + } + + @Override public void write(BinaryRawWriterEx writer, Object result) { + writer.writeString((String) result); + } + }; + } + case 2: { + // Exception. + throw new PlatformTestPluginException("123"); + } + case 3: { + // Async exception. + final GridFutureAdapter<String> fa = new GridFutureAdapter<>(); + + new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(500L); + fa.onDone(new PlatformTestPluginException("x")); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + + return new PlatformAsyncResult() { + @Override public IgniteFuture future() { + //noinspection unchecked + return new IgniteFutureImpl(fa); + } + + @Override public void write(BinaryRawWriterEx writer, Object result) { + // No-op. + } + }; + } + } + + return null; + } + + /** {@inheritDoc} */ + @Override public Exception convertException(Exception e) { + return e; + } + /** * Gets the plugin config. * http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/include/ignite/jni/exports.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h index a93f580..06be75d 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h @@ -66,6 +66,7 @@ extern "C" { void* IGNITE_CALL IgniteTargetInObjectStreamOutObjectStream(gcj::JniContext* ctx, void* obj, int opType, void* arg, long long inMemPtr, long long outMemPtr); void IGNITE_CALL IgniteTargetOutStream(gcj::JniContext* ctx, void* obj, int opType, long long memPtr); void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType); + void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr); void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ); void IGNITE_CALL IgniteTargetListenFutureForOperation(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId); http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index a07b844..7c5d684 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -208,6 +208,7 @@ namespace ignite jmethodID m_PlatformTarget_inStreamOutObject; jmethodID m_PlatformTarget_outStream; jmethodID m_PlatformTarget_outObject; + jmethodID m_PlatformTarget_inStreamAsync; jmethodID m_PlatformTarget_inStreamOutStream; jmethodID m_PlatformTarget_inObjectStreamOutObjectStream; jmethodID m_PlatformTarget_listenFuture; @@ -387,6 +388,7 @@ namespace ignite jobject TargetInObjectStreamOutObjectStream(jobject obj, int opType, void* arg, long long inMemPtr, long long outMemPtr, JniErrorInfo* errInfo = NULL); void TargetOutStream(jobject obj, int opType, long long memPtr, JniErrorInfo* errInfo = NULL); jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL); + void TargetInStreamAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); void TargetListenFuture(jobject obj, long long futId, int typ); void TargetListenFutureForOperation(jobject obj, long long futId, int typ, int opId); http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/project/vs/module.def ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def index 45a5bff..8159f8d 100644 --- a/modules/platforms/cpp/jni/project/vs/module.def +++ b/modules/platforms/cpp/jni/project/vs/module.def @@ -23,6 +23,7 @@ IgniteTargetInObjectStreamOutObjectStream @21 IgniteTargetListenFuture @22 IgniteTargetListenFutureForOperation @23 IgniteTargetInLongOutLong @24 +IgniteTargetInStreamAsync @25 IgniteProcessorCompute @64 IgniteProcessorMessage @65 IgniteProcessorEvents @66 http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/src/exports.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp index 17fed71..6c590e4 100644 --- a/modules/platforms/cpp/jni/src/exports.cpp +++ b/modules/platforms/cpp/jni/src/exports.cpp @@ -182,6 +182,10 @@ extern "C" { return ctx->TargetOutObject(static_cast<jobject>(obj), opType); } + void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr) { + ctx->TargetInStreamAsync(static_cast<jobject>(obj), opType, memPtr); + } + void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ) { ctx->TargetListenFuture(static_cast<jobject>(obj), futId, typ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 1988a86..004a99c 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -258,6 +258,7 @@ namespace ignite JniMethod M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM = JniMethod("inObjectStreamOutObjectStream", "(ILjava/lang/Object;JJ)Ljava/lang/Object;", false); JniMethod M_PLATFORM_TARGET_OUT_STREAM = JniMethod("outStream", "(IJ)V", false); JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false); + JniMethod M_PLATFORM_TARGET_IN_STREAM_ASYNC = JniMethod("inStreamAsync", "(IJ)V", false); JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE = JniMethod("listenFuture", "(JI)V", false); JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION = JniMethod("listenFutureForOperation", "(JII)V", false); @@ -590,6 +591,7 @@ namespace ignite m_PlatformTarget_outObject = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_OUT_OBJECT); m_PlatformTarget_inStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_STREAM); m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM); + m_PlatformTarget_inStreamAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_ASYNC); m_PlatformTarget_listenFuture = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE); m_PlatformTarget_listenFutureForOperation = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION); @@ -1386,6 +1388,14 @@ namespace ignite return LocalToGlobal(env, res); } + void JniContext::TargetInStreamAsync(jobject obj, int opType, long long memPtr, JniErrorInfo* err) { + JNIEnv* env = Attach(); + + env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformTarget_inStreamAsync, opType, memPtr); + + ExceptionCheck(env, err); + } + void JniContext::TargetListenFuture(jobject obj, long long futId, int typ) { JNIEnv* env = Attach(); http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs index b6c00b5..8256bba 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests.Plugin using System; using System.Collections.Generic; using System.IO; + using System.Linq; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Interop; @@ -117,6 +118,22 @@ namespace Apache.Ignite.Core.Tests.Plugin var resCopy = res.Item2.OutObject(1); Assert.AreEqual("name1_abc", resCopy.OutStream(1, r => r.ReadString())); + // Async operation. + var task = target.DoOutOpAsync(1, w => w.WriteString("foo"), r => r.ReadString()); + Assert.IsFalse(task.IsCompleted); + var asyncRes = task.Result; + Assert.IsTrue(task.IsCompleted); + Assert.AreEqual("FOO", asyncRes); + + // Async operation with exception in entry point. + Assert.Throws<TestIgnitePluginException>(() => target.DoOutOpAsync<object>(2, null, null)); + + // Async operation with exception in future. + var errTask = target.DoOutOpAsync<object>(3, null, null); + Assert.IsFalse(errTask.IsCompleted); + var aex = Assert.Throws<AggregateException>(() => errTask.Wait()); + Assert.IsInstanceOf<IgniteException>(aex.InnerExceptions.Single()); + // Throws custom mapped exception. var ex = Assert.Throws<TestIgnitePluginException>(() => target.InLongOutLong(-1, 0)); Assert.AreEqual("Baz", ex.Message); http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index f115042..621bfa5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -962,6 +962,35 @@ namespace Apache.Ignite.Core.Impl return GetPlatformTarget(DoOutOpObject(type)); } + /** <inheritdoc /> */ + public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction = null, + Func<IBinaryRawReader, T> readAction = null) + { + var convertFunc = readAction != null + ? r => readAction(r) + : (Func<BinaryReader, T>) null; + + return GetFuture((futId, futType) => + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + stream.WriteLong(futId); + stream.WriteInt(futType); + + if (writeAction != null) + { + var writer = _marsh.StartMarshal(stream); + + writeAction(writer); + + FinishMarshal(writer); + } + + UU.TargetInStreamAsync(_target, type, stream.SynchronizeOutput()); + } + }, false, convertFunc).Task; + } + /// <summary> /// Gets the platform target. /// </summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs index a6a3a31..289589f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs @@ -149,6 +149,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetOutObject")] public static extern void* TargetOutObject(void* ctx, void* target, int opType); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetInStreamAsync")] + public static extern void TargetInStreamAsync(void* ctx, void* target, int opType, long memPtr); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAcquire")] public static extern void* Acquire(void* ctx, void* target); http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs index 90e5230..986972f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs @@ -464,6 +464,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged return target.ChangeTarget(res); } + internal static void TargetInStreamAsync(IUnmanagedTarget target, int opType, long memPtr) + { + JNI.TargetInStreamAsync(target.Context, target.Target, opType, memPtr); + } + #endregion #region NATIVE METHODS: MISCELANNEOUS http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs index 8b8963f..e8f8bfb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs @@ -18,7 +18,9 @@ namespace Apache.Ignite.Core.Interop { using System; + using System.Threading.Tasks; using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Binary; /// <summary> /// Interface to interoperate with @@ -87,5 +89,16 @@ namespace Apache.Ignite.Core.Interop /// <param name="type">Operation type code.</param> /// <returns>Result.</returns> IPlatformTarget OutObject(int type); + + /// <summary> + /// Performs asynchronous operation. + /// </summary> + /// <typeparam name="T">Result type</typeparam> + /// <param name="type">Operation type code.</param> + /// <param name="writeAction">Write action (can be null).</param> + /// <param name="readAction">Read function (can be null).</param> + /// <returns>Task.</returns> + Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction, + Func<IBinaryRawReader, T> readAction); } }
