IGNITE-1391: Fixed deadlock in discovery message processing caused by platform latch.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a7631980 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a7631980 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a7631980 Branch: refs/heads/ignite-1093-2 Commit: a7631980ef5c0c359e8d74ae77f5d085287da139 Parents: f7230da Author: Pavel Tupitsyn <[email protected]> Authored: Wed Sep 9 13:31:08 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Sep 9 13:31:08 2015 +0300 ---------------------------------------------------------------------- .../callback/PlatformCallbackGateway.java | 5 +- .../callback/PlatformCallbackUtils.java | 3 +- .../cpp/common/include/ignite/common/java.h | 4 +- .../platform/src/main/cpp/common/src/java.cpp | 8 +-- .../main/dotnet/Apache.Ignite.Core/Ignition.cs | 28 ++++++-- .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 8 ++- .../Impl/Unmanaged/UnmanagedCallbacks.cs | 8 ++- .../Impl/Unmanaged/UnmanagedUtils.cs | 2 +- .../platform/PlatformProcessorImpl.java | 2 +- .../IgniteStartStopTest.cs | 70 ++++++++++++++++++++ 10 files changed, 114 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index a348888..5d5cdb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -737,13 +737,14 @@ public class PlatformCallbackGateway { /** * Kernal start callback. * + * @param proc Platform processor. * @param memPtr Memory pointer. */ - public void onStart(long memPtr) { + public void onStart(Object proc, long memPtr) { enter(); try { - PlatformCallbackUtils.onStart(envPtr, memPtr); + PlatformCallbackUtils.onStart(envPtr, proc, memPtr); } finally { leave(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index dd43e0d..64749ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -384,9 +384,10 @@ public class PlatformCallbackUtils { * Kernal start callback. * * @param envPtr Environment pointer. + * @param proc Platform processor. * @param memPtr Memory pointer. */ - static native void onStart(long envPtr, long memPtr); + static native void onStart(long envPtr, Object proc, long memPtr); /* * Kernal stop callback. http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/cpp/common/include/ignite/common/java.h ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/common/include/ignite/common/java.h b/modules/platform/src/main/cpp/common/include/ignite/common/java.h index e2d23b2..01ecbe3 100644 --- a/modules/platform/src/main/cpp/common/include/ignite/common/java.h +++ b/modules/platform/src/main/cpp/common/include/ignite/common/java.h @@ -96,7 +96,7 @@ namespace ignite typedef long long(JNICALL *NodeInfoHandler)(void* target, long long memPtr); - typedef void(JNICALL *OnStartHandler)(void* target, long long memPtr); + typedef void(JNICALL *OnStartHandler)(void* target, void* proc, long long memPtr); typedef void(JNICALL *OnStopHandler)(void* target); typedef void(JNICALL *ErrorHandler)(void* target, int errCode, const char* errClsChars, int errClsCharsLen, const char* errMsgChars, int errMsgCharsLen, void* errData, int errDataLen); @@ -640,7 +640,7 @@ namespace ignite JNIEXPORT jlong JNICALL JniNodeInfo(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr); - JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr); + JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jobject proc, jlong memPtr); JNIEXPORT void JNICALL JniOnStop(JNIEnv *env, jclass cls, jlong envPtr); JNIEXPORT jlong JNICALL JniExtensionCallbackInLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/cpp/common/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/common/src/java.cpp b/modules/platform/src/main/cpp/common/src/java.cpp index bb4cc20..492e7b6 100644 --- a/modules/platform/src/main/cpp/common/src/java.cpp +++ b/modules/platform/src/main/cpp/common/src/java.cpp @@ -311,7 +311,7 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_MEMORY_REALLOCATE = JniMethod("memoryReallocate", "(JJI)V", true); - JniMethod M_PLATFORM_CALLBACK_UTILS_ON_START = JniMethod("onStart", "(JJ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_ON_START = JniMethod("onStart", "(JLjava/lang/Object;J)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_ON_STOP = JniMethod("onStop", "(J)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG = JniMethod("extensionCallbackInLongOutLong", "(JIJ)J", true); @@ -322,7 +322,7 @@ namespace ignite JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true); const char* C_PLATFORM_IGNITION = "org/apache/ignite/internal/processors/platform/PlatformIgnition"; - JniMethod M_PLATFORM_IGNITION_START = JniMethod("start", "(Ljava/lang/String;Ljava/lang/String;IJJ)Lorg/apache/ignite/internal/processors/platform/PlatformProcessor;", true); + JniMethod M_PLATFORM_IGNITION_START = JniMethod("start", "(Ljava/lang/String;Ljava/lang/String;IJJ)V", true); JniMethod M_PLATFORM_IGNITION_INSTANCE = JniMethod("instance", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformProcessor;", true); JniMethod M_PLATFORM_IGNITION_ENVIRONMENT_POINTER = JniMethod("environmentPointer", "(Ljava/lang/String;)J", true); JniMethod M_PLATFORM_IGNITION_STOP = JniMethod("stop", "(Ljava/lang/String;Z)Z", true); @@ -2185,8 +2185,8 @@ namespace ignite IGNITE_SAFE_FUNC(env, envPtr, NodeInfoHandler, nodeInfo, memPtr); } - JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr) { - IGNITE_SAFE_PROC(env, envPtr, OnStartHandler, onStart, memPtr); + JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jobject proc, jlong memPtr) { + IGNITE_SAFE_PROC(env, envPtr, OnStartHandler, onStart, env->NewGlobalRef(proc), memPtr); } JNIEXPORT void JNICALL JniOnStop(JNIEnv *env, jclass cls, jlong envPtr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs index ef79008..c9de62a 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs @@ -172,19 +172,19 @@ namespace Apache.Ignite.Core sbyte* gridName0 = IgniteUtils.StringToUtf8Unmanaged(gridName); // 3. Create startup object which will guide us through the rest of the process. - _startup = new Startup(cfg) { Context = ctx }; + _startup = new Startup(cfg, cbs) { Context = ctx }; IUnmanagedTarget interopProc = null; try { // 4. Initiate Ignite start. - interopProc = UU.IgnitionStart(cbs.Context, cfg.SpringConfigUrl ?? DefaultCfg, + UU.IgnitionStart(cbs.Context, cfg.SpringConfigUrl ?? DefaultCfg, cfgEx != null ? cfgEx.GridName : null, ClientMode); // 5. At this point start routine is finished. We expect STARTUP object to have all necessary data. - Ignite node = new Ignite(cfg, _startup.Name, interopProc, _startup.Marshaller, - _startup.LifecycleBeans, cbs); + var node = _startup.Ignite; + interopProc = node.InteropProcessor; // 6. On-start callback (notify lifecycle components). node.OnStart(); @@ -351,8 +351,9 @@ namespace Apache.Ignite.Core /// <summary> /// Kernal start callback. /// </summary> + /// <param name="interopProc">Interop processor.</param> /// <param name="stream">Stream.</param> - internal static void OnStart(IPortableStream stream) + internal static void OnStart(IUnmanagedTarget interopProc, IPortableStream stream) { try { @@ -368,6 +369,8 @@ namespace Apache.Ignite.Core if (Nodes.ContainsKey(new NodeKey(name))) throw new IgniteException("Ignite with the same name already started: " + name); + _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name, interopProc, _startup.Marshaller, + _startup.LifecycleBeans, _startup.Callbacks); } catch (Exception e) { @@ -604,17 +607,23 @@ namespace Apache.Ignite.Core /// Constructor. /// </summary> /// <param name="cfg">Configuration.</param> - internal Startup(IgniteConfiguration cfg) + /// <param name="cbs"></param> + internal Startup(IgniteConfiguration cfg, UnmanagedCallbacks cbs) { Configuration = cfg; + Callbacks = cbs; } - /// <summary> /// Configuration. /// </summary> internal IgniteConfiguration Configuration { get; private set; } /// <summary> + /// Gets unmanaged callbacks. + /// </summary> + internal UnmanagedCallbacks Callbacks { get; private set; } + + /// <summary> /// Lifecycle beans. /// </summary> internal IList<LifecycleBeanHolder> LifecycleBeans { get; set; } @@ -638,6 +647,11 @@ namespace Apache.Ignite.Core /// Gets or sets the context. /// </summary> internal void* Context { get; set; } + + /// <summary> + /// Gets or sets the ignite. + /// </summary> + internal Ignite Ignite { get; set; } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index 7e33416..c5025b2 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -74,7 +74,7 @@ namespace Apache.Ignite.Core.Impl private IClusterNode _locNode; /** Transactions facade. */ - private readonly TransactionsImpl _transactions; + private readonly Lazy<TransactionsImpl> _transactions; /** Callbacks */ private readonly UnmanagedCallbacks _cbs; @@ -119,7 +119,9 @@ namespace Apache.Ignite.Core.Impl cbs.Initialize(this); - _transactions = new TransactionsImpl(UU.ProcessorTransactions(proc), marsh, LocalNode.Id); + // Grid is not completely started here, can't initialize interop transactions right away. + _transactions = new Lazy<TransactionsImpl>( + () => new TransactionsImpl(UU.ProcessorTransactions(proc), marsh, LocalNode.Id)); } /// <summary> @@ -423,7 +425,7 @@ namespace Apache.Ignite.Core.Impl /** <inheritdoc /> */ public ITransactions Transactions { - get { return _transactions; } + get { return _transactions.Value; } } /** <inheritdoc /> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index 80b33df..9edf2ef 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -153,7 +153,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged private delegate void NodeInfoCallbackDelegate(void* target, long memPtr); - private delegate void OnStartCallbackDelegate(void* target, long memPtr); + private delegate void OnStartCallbackDelegate(void* target, void* proc, long memPtr); private delegate void OnStopCallbackDelegate(void* target); private delegate void ErrorCallbackDelegate(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, int errMsgCharsLen, void* errData, int errDataLen); @@ -1001,11 +1001,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged }, true); } - private void OnStart(void* target, long memPtr) + private void OnStart(void* target, void* proc, long memPtr) { SafeCall(() => { - Ignition.OnStart(IgniteManager.Memory.Get(memPtr).Stream()); + var proc0 = new UnmanagedTarget(_ctx, proc); + + Ignition.OnStart(proc0, IgniteManager.Memory.Get(memPtr).Stream()); }, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs index 9ec2668..4bea392 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs @@ -527,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged #region NATIVE METHODS: PROCESSOR - internal static IUnmanagedTarget IgnitionStart(UnmanagedContext ctx, string cfgPath, string gridName, + internal static IUnmanagedTarget IgnitionStart(UnmanagedContext ctx, string cfgPath, string gridName, bool clientMode) { using (var mem = IgniteManager.Memory.Allocate().Stream()) http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index e5e7a57..40b1334 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -126,7 +126,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf out.synchronize(); - platformCtx.gateway().onStart(mem.pointer()); + platformCtx.gateway().onStart(this, mem.pointer()); } // At this moment all necessary native libraries must be loaded, so we can process with store creation. http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs index ec7e157..2db1781 100644 --- a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs @@ -19,8 +19,12 @@ namespace Apache.Ignite.Core.Tests { using System; using System.Collections.Generic; + using System.IO; using System.Threading; + using System.Threading.Tasks; using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Messaging; + using Apache.Ignite.Core.Tests.Process; using NUnit.Framework; /// <summary> @@ -44,6 +48,7 @@ namespace Apache.Ignite.Core.Tests [TearDown] public void TearDown() { + TestUtils.KillProcesses(); Ignition.StopAll(true); } @@ -348,5 +353,70 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(1, cache.Get(1)); } + + /// <summary> + /// Tests the processor initialization and grid usage right after topology enter. + /// </summary> + [Test] + public void TestProcessorInit() + { + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\start-test-grid1.xml", + JvmOptions = TestUtils.TestJavaOptions(), + JvmClasspath = TestUtils.CreateTestClasspath() + }; + + // Start local node + var grid = Ignition.Start(cfg); + + // Start remote node in a separate process + // ReSharper disable once UnusedVariable + var proc = new IgniteProcess( + "-jvmClasspath=" + TestUtils.CreateTestClasspath(), + "-springConfigUrl=" + Path.GetFullPath(cfg.SpringConfigUrl), + "-J-Xms512m", "-J-Xmx512m"); + + var cts = new CancellationTokenSource(); + var token = cts.Token; + + // Spam message subscriptions on a separate thread + // to test race conditions during processor init on remote node + var listenTask = Task.Factory.StartNew(() => + { + var filter = new MessageFilter(); + + while (!token.IsCancellationRequested) + { + var listenId = grid.Message().RemoteListen(filter); + + grid.Message().StopRemoteListen(listenId); + } + // ReSharper disable once FunctionNeverReturns + }); + + // Wait for remote node to join + Assert.IsTrue(grid.WaitTopology(2, 30000)); + + // Wait some more for initialization + Thread.Sleep(1000); + + // Cancel listen task and check that it finishes + cts.Cancel(); + Assert.IsTrue(listenTask.Wait(5000)); + } + + /// <summary> + /// Noop message filter. + /// </summary> + [Serializable] + private class MessageFilter : IMessageFilter<int> + { + /** <inheritdoc /> */ + public bool Invoke(Guid nodeId, int message) + { + return true; + } + } } }
