Repository: ignite Updated Branches: refs/heads/ignite-2.0 f52ba0f5d -> f2328a459
IGNITE-4628 Add Java callback support for platform plugins This closes #1543 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f2328a45 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f2328a45 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f2328a45 Branch: refs/heads/ignite-2.0 Commit: f2328a4593a6e4902dd3f4c946357fa054c3f1e2 Parents: f52ba0f Author: Pavel Tupitsyn <[email protected]> Authored: Thu Feb 16 17:46:02 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Thu Feb 16 17:46:02 2017 +0300 ---------------------------------------------------------------------- .../callback/PlatformCallbackGateway.java | 23 +++++ .../platform/callback/PlatformCallbackOp.java | 3 + .../plugin/PlatformTestPluginTarget.java | 29 ++++++- .../Plugin/PluginTest.cs | 10 ++- .../Plugin/TestIgnitePluginProvider.cs | 14 ++++ .../Apache.Ignite.Core.csproj | 1 + .../Impl/Plugin/PluginContext.cs | 8 ++ .../Impl/Plugin/PluginProcessor.cs | 88 ++++++++++++++++++++ .../Impl/Unmanaged/UnmanagedCallbackOp.cs | 3 +- .../Impl/Unmanaged/UnmanagedCallbacks.cs | 6 ++ .../Apache.Ignite.Core/Plugin/IPluginContext.cs | 7 ++ .../Apache.Ignite.Core/Plugin/PluginCallback.cs | 29 +++++++ 12 files changed, 215 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index fc311da..aee14d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.callback; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.util.GridStripedSpinBusyLock; /** @@ -1223,6 +1224,28 @@ public class PlatformCallbackGateway { } /** + * Invoke plugin callback by id. + * + * @param callbackId Id of a callback registered in Platform. + * @param outMem Out memory (Java writes, platform reads). + * @param inMem In memory (platform writes, Java reads). + */ + public void pluginCallback(long callbackId, PlatformMemory outMem, PlatformMemory inMem) { + enter(); + + try { + long outPtr = outMem == null ? 0 : outMem.pointer(); + long inPtr = inMem == null ? 0 : inMem.pointer(); + + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.PluginCallbackInLongLongOutLong, callbackId, outPtr, inPtr, null); + } + finally { + leave(); + } + } + + /** * Enter gateway. */ protected void enter() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java index 500a4f3..d77d2d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java @@ -221,4 +221,7 @@ class PlatformCallbackOp { /** */ public static final int CachePluginIgniteStop = 67; + + /** */ + public static final int PluginCallbackInLongLongOutLong = 68; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java index 94a21a3..e80a23f 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java @@ -24,12 +24,15 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformTarget; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.plugin.PluginConfiguration; import org.jetbrains.annotations.Nullable; /** * Test target. */ +@SuppressWarnings("ConstantConditions") class PlatformTestPluginTarget extends PlatformAbstractTarget { /** */ private final String name; @@ -87,11 +90,35 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget { throws IgniteCheckedException { PlatformTestPluginTarget t = (PlatformTestPluginTarget)arg; - writer.writeString(t.name); + writer.writeString(invokeCallback(t.name)); return new PlatformTestPluginTarget(platformCtx, t.name + reader.readString()); } + /** + * Invokes the platform callback. + * + * @param val Value to send. + * @return Result. + */ + private String invokeCallback(String val) { + PlatformMemory outMem = platformCtx.memory().allocate(); + PlatformMemory inMem = platformCtx.memory().allocate(); + + PlatformOutputStream outStream = outMem.output(); + BinaryRawWriterEx writer = platformCtx.writer(outStream); + + writer.writeString(val); + + outStream.synchronize(); + + platformCtx.gateway().pluginCallback(1, outMem, inMem); + + BinaryRawReaderEx reader = platformCtx.reader(inMem); + + return reader.readString(); + } + /** {@inheritDoc} */ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { writer.writeString(name); http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs index 0af7b10..efb14ff 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs @@ -73,7 +73,7 @@ namespace Apache.Ignite.Core.Tests.Plugin var extension = plugin.Provider.Context.GetExtension(0); Assert.IsNotNull(extension); - CheckPluginTarget(extension, "barbaz"); + CheckPluginTarget(extension, "barbaz", plugin.Provider); } Assert.AreEqual(true, plugin.Provider.Stopped); @@ -83,7 +83,8 @@ namespace Apache.Ignite.Core.Tests.Plugin /// <summary> /// Checks the plugin target operations. /// </summary> - private static void CheckPluginTarget(IPlatformTarget target, string expectedName) + private static void CheckPluginTarget(IPlatformTarget target, string expectedName, + TestIgnitePluginProvider provider) { // Returns name. Assert.AreEqual(expectedName, target.OutStream(1, r => r.ReadString())); @@ -104,12 +105,13 @@ namespace Apache.Ignite.Core.Tests.Plugin var newTarget = target.InStreamOutObject(1, w => w.WriteString("name1")); Assert.AreEqual("name1", newTarget.OutStream(1, r => r.ReadString())); - // Returns target with specified name appended. + // Invokes callback to modify name, returns target with specified name appended. var res = target.InObjectStreamOutObjectStream(1, newTarget, w => w.WriteString("_abc"), (reader, t) => Tuple.Create(reader.ReadString(), t)); - Assert.AreEqual("name1", res.Item1); // Old name + Assert.AreEqual("NAME1", res.Item1); // Old name converted by callback. Assert.AreEqual("name1_abc", res.Item2.OutStream(1, r => r.ReadString())); + Assert.AreEqual("name1", provider.CallbackResult); // Old name. // Returns a copy with same name. var resCopy = res.Item2.OutObject(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs index d86750f..161d797 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Core.Tests.Plugin { using System; + using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Plugin; using NUnit.Framework; @@ -69,6 +70,17 @@ namespace Apache.Ignite.Core.Tests.Plugin (className, message, inner, ignite) => new TestIgnitePluginException(className, message, ignite, inner)); + context.RegisterCallback(1, (input, output) => + { + CallbackResult = input.ReadString(); + output.WriteString(CallbackResult.ToUpper()); + + return CallbackResult.Length; + }); + + var ex = Assert.Throws<IgniteException>(() => context.RegisterCallback(1, (input, output) => 0)); + Assert.AreEqual("Plugin callback with id 1 is already registered", ex.Message); + Context = context; EnsureIgniteWorks(); @@ -113,6 +125,8 @@ namespace Apache.Ignite.Core.Tests.Plugin /// </summary> public bool? IgniteStopped { get; set; } + public string CallbackResult { get; private set; } + /// <summary> /// Gets the context. /// </summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 7c86429..58002db 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -492,6 +492,7 @@ <Compile Include="Plugin\IPluginConfiguration.cs" /> <Compile Include="Plugin\IPluginContext.cs" /> <Compile Include="Plugin\IPluginProvider.cs" /> + <Compile Include="Plugin\PluginCallback.cs" /> <Compile Include="Plugin\PluginNotFoundException.cs" /> <Compile Include="Plugin\PluginProviderTypeAttribute.cs" /> <Compile Include="Plugin\Cache\ICachePluginConfiguration.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs index 0e01244..fd7033c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs @@ -79,5 +79,13 @@ namespace Apache.Ignite.Core.Impl.Plugin _pluginProcessor.RegisterExceptionMapping(className, factory); } + + /** <inheritdoc /> */ + public void RegisterCallback(long callbackId, PluginCallback callback) + { + IgniteArgumentCheck.NotNull(callback, "callback"); + + _pluginProcessor.RegisterCallback(callbackId, callback); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs index 7cafcc0..7ed7141 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs @@ -21,8 +21,12 @@ namespace Apache.Ignite.Core.Impl.Plugin using System.Collections.Generic; using System.Diagnostics; using System.Linq; + using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Memory; using Apache.Ignite.Core.Log; using Apache.Ignite.Core.Plugin; @@ -42,6 +46,10 @@ namespace Apache.Ignite.Core.Impl.Plugin private readonly CopyOnWriteConcurrentDictionary<string, ExceptionFactory> _exceptionMappings = new CopyOnWriteConcurrentDictionary<string, ExceptionFactory>(); + /** Plugin callbacks. */ + private readonly CopyOnWriteConcurrentDictionary<long, PluginCallback> _callbacks + = new CopyOnWriteConcurrentDictionary<long, PluginCallback>(); + /** */ private readonly Ignite _ignite; @@ -144,6 +152,86 @@ namespace Apache.Ignite.Core.Impl.Plugin } /// <summary> + /// Registers the callback. + /// </summary> + /// <param name="callbackId">Calback id.</param> + /// <param name="callback">Callback delegate</param> + public void RegisterCallback(long callbackId, PluginCallback callback) + { + Debug.Assert(callback != null); + + var res = _callbacks.GetOrAdd(callbackId, _ => callback); + + if (res != callback) + { + throw new IgniteException(string.Format( + "Plugin callback with id {0} is already registered", callbackId)); + } + } + + /// <summary> + /// Invokes the callback. + /// </summary> + public long InvokeCallback(long callbackId, long inPtr, long outPtr) + { + PluginCallback callback; + + if (!_callbacks.TryGetValue(callbackId, out callback)) + { + throw new IgniteException(string.Format( + "Plugin callback with id {0} is not registered", callbackId)); + } + + using (var inStream = GetStream(inPtr)) + using (var outStream = GetStream(outPtr)) + { + var reader = GetReader(inStream); + var writer = GetWriter(outStream); + + var res = callback(reader, writer); + + if (writer != null) + { + outStream.SynchronizeOutput(); + writer.Marshaller.FinishMarshal(writer); + } + + return res; + } + } + + /// <summary> + /// Gets the stream. + /// </summary> + private static PlatformMemoryStream GetStream(long ptr) + { + return ptr == 0 ? null : IgniteManager.Memory.Get(ptr).GetStream(); + } + + /// <summary> + /// Gets the reader. + /// </summary> + private IBinaryRawReader GetReader(IBinaryStream stream) + { + return stream == null ? null : Ignite.Marshaller.StartUnmarshal(stream).GetRawReader(); + } + + /// <summary> + /// Gets the writer. + /// </summary> + private BinaryWriter GetWriter(IBinaryStream stream) + { + if (stream == null) + return null; + + var res = Ignite.Marshaller.StartMarshal(stream); + + res.GetRawWriter(); + + return res; + } + + /// <summary> /// Loads the plugins. /// </summary> private void LoadPlugins() http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs index 27d1124..91df822 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs @@ -87,6 +87,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged CachePluginCreate = 64, CachePluginDestroy = 65, CachePluginIgniteStart = 66, - CachePluginIgniteStop = 67 + CachePluginIgniteStop = 67, + PluginCallbackInLongLongOutLong = 68 } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index 1d88f76..91ffabb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -246,6 +246,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged AddHandler(UnmanagedCallbackOp.CachePluginDestroy, CachePluginDestroy); AddHandler(UnmanagedCallbackOp.CachePluginIgniteStart, CachePluginIgniteStart); AddHandler(UnmanagedCallbackOp.CachePluginIgniteStop, CachePluginIgniteStop); + AddHandler(UnmanagedCallbackOp.PluginCallbackInLongLongOutLong, PluginCallbackInLongLongOutLong); } /// <summary> @@ -1285,6 +1286,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged return 0; } + private long PluginCallbackInLongLongOutLong(long callbackId, long inPtr, long outPtr, void* arg) + { + return _ignite.PluginProcessor.InvokeCallback(callbackId, inPtr, outPtr); + } + #endregion #region HELPERS http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs index 97b566c..03d130b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs @@ -57,5 +57,12 @@ namespace Apache.Ignite.Core.Plugin /// <param name="className">Name of the Java exception class to be mapped.</param> /// <param name="factory">Exception factory delegate.</param> void RegisterExceptionMapping(string className, ExceptionFactory factory); + + /// <summary> + /// Registers Java->.NET callback. + /// </summary> + /// <param name="callbackId">Callback id.</param> + /// <param name="callback">Callback delegate.</param> + void RegisterCallback(long callbackId, PluginCallback callback); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs new file mode 100644 index 0000000..5ba675e --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Plugin +{ + using Apache.Ignite.Core.Binary; + + /// <summary> + /// Plugin callback delegate. + /// </summary> + /// <param name="input">Input reader. May be null.</param> + /// <param name="output">Output writer. May be null.</param> + /// <returns>Result code.</returns> + public delegate long PluginCallback(IBinaryRawReader input, IBinaryRawWriter output); +}
