http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs new file mode 100644 index 0000000..3c38ef9 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl; + using Apache.Ignite.Core.Lifecycle; + using Apache.Ignite.Core.Resource; + using NUnit.Framework; + + /// <summary> + /// Lifecycle beans test. + /// </summary> + public class LifecycleTest + { + /** Configuration: without Java beans. */ + private const string CfgNoBeans = "config//lifecycle//lifecycle-no-beans.xml"; + + /** Configuration: with Java beans. */ + private const string CfgBeans = "config//lifecycle//lifecycle-beans.xml"; + + /** Whether to throw an error on lifecycle event. */ + internal static bool ThrowErr; + + /** Events: before start. */ + internal static IList<Event> BeforeStartEvts; + + /** Events: after start. */ + internal static IList<Event> AfterStartEvts; + + /** Events: before stop. */ + internal static IList<Event> BeforeStopEvts; + + /** Events: after stop. */ + internal static IList<Event> AfterStopEvts; + + /// <summary> + /// Set up routine. + /// </summary> + [SetUp] + public void SetUp() + { + ThrowErr = false; + + BeforeStartEvts = new List<Event>(); + AfterStartEvts = new List<Event>(); + BeforeStopEvts = new List<Event>(); + AfterStopEvts = new List<Event>(); + } + + /// <summary> + /// Tear down routine. + /// </summary> + [TearDown] + public void TearDown() + { + Ignition.StopAll(true); + } + + /// <summary> + /// Test without Java beans. + /// </summary> + [Test] + public void TestWithoutBeans() + { + // 1. Test start events. + IIgnite grid = Start(CfgNoBeans); + + Assert.AreEqual(2, BeforeStartEvts.Count); + CheckEvent(BeforeStartEvts[0], null, null, 0, null); + CheckEvent(BeforeStartEvts[1], null, null, 0, null); + + Assert.AreEqual(2, AfterStartEvts.Count); + CheckEvent(AfterStartEvts[0], grid, grid, 0, null); + CheckEvent(AfterStartEvts[1], grid, grid, 0, null); + + // 2. Test stop events. + Ignition.Stop(grid.Name, false); + + Assert.AreEqual(2, BeforeStartEvts.Count); + Assert.AreEqual(2, AfterStartEvts.Count); + + Assert.AreEqual(2, BeforeStopEvts.Count); + CheckEvent(BeforeStopEvts[0], grid, grid, 0, null); + CheckEvent(BeforeStopEvts[1], grid, grid, 0, null); + + Assert.AreEqual(2, AfterStopEvts.Count); + CheckEvent(AfterStopEvts[0], grid, grid, 0, null); + CheckEvent(AfterStopEvts[1], grid, grid, 0, null); + } + + /// <summary> + /// Test with Java beans. + /// </summary> + [Test] + public void TestWithBeans() + { + // 1. Test .Net start events. + IIgnite grid = Start(CfgBeans); + + Assert.AreEqual(4, BeforeStartEvts.Count); + CheckEvent(BeforeStartEvts[0], null, null, 0, null); + CheckEvent(BeforeStartEvts[1], null, null, 1, "1"); + CheckEvent(BeforeStartEvts[2], null, null, 0, null); + CheckEvent(BeforeStartEvts[3], null, null, 0, null); + + Assert.AreEqual(4, AfterStartEvts.Count); + CheckEvent(AfterStartEvts[0], grid, grid, 0, null); + CheckEvent(AfterStartEvts[1], grid, grid, 1, "1"); + CheckEvent(AfterStartEvts[2], grid, grid, 0, null); + CheckEvent(AfterStartEvts[3], grid, grid, 0, null); + + // 2. Test Java start events. + IList<int> res = grid.Compute().ExecuteJavaTask<IList<int>>( + "org.apache.ignite.platform.lifecycle.PlatformJavaLifecycleTask", null); + + Assert.AreEqual(2, res.Count); + Assert.AreEqual(3, res[0]); + Assert.AreEqual(3, res[1]); + + // 3. Test .Net stop events. + Ignition.Stop(grid.Name, false); + + Assert.AreEqual(4, BeforeStartEvts.Count); + Assert.AreEqual(4, AfterStartEvts.Count); + + Assert.AreEqual(4, BeforeStopEvts.Count); + CheckEvent(BeforeStopEvts[0], grid, grid, 0, null); + CheckEvent(BeforeStopEvts[1], grid, grid, 1, "1"); + CheckEvent(BeforeStopEvts[2], grid, grid, 0, null); + CheckEvent(BeforeStopEvts[3], grid, grid, 0, null); + + Assert.AreEqual(4, AfterStopEvts.Count); + CheckEvent(AfterStopEvts[0], grid, grid, 0, null); + CheckEvent(AfterStopEvts[1], grid, grid, 1, "1"); + CheckEvent(AfterStopEvts[2], grid, grid, 0, null); + CheckEvent(AfterStopEvts[3], grid, grid, 0, null); + } + + /// <summary> + /// Test behavior when error is thrown from lifecycle beans. + /// </summary> + [Test] + public void TestError() + { + ThrowErr = true; + + try + { + Start(CfgNoBeans); + + Assert.Fail("Should not reach this place."); + } + catch (Exception e) + { + Assert.AreEqual(typeof(IgniteException), e.GetType()); + } + } + + /// <summary> + /// Start grid. + /// </summary> + /// <param name="cfgPath">Spring configuration path.</param> + /// <returns>Grid.</returns> + private static IIgnite Start(string cfgPath) + { + TestUtils.JvmDebug = true; + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.JvmClasspath = TestUtils.CreateTestClasspath(); + cfg.JvmOptions = TestUtils.TestJavaOptions(); + cfg.SpringConfigUrl = cfgPath; + + cfg.LifecycleBeans = new List<ILifecycleBean> { new Bean(), new Bean() }; + + return Ignition.Start(cfg); + } + + /// <summary> + /// Check event. + /// </summary> + /// <param name="evt">Event.</param> + /// <param name="expGrid1">Expected grid 1.</param> + /// <param name="expGrid2">Expected grid 2.</param> + /// <param name="expProp1">Expected property 1.</param> + /// <param name="expProp2">Expected property 2.</param> + private static void CheckEvent(Event evt, IIgnite expGrid1, IIgnite expGrid2, int expProp1, string expProp2) + { + if (evt.Grid1 != null && evt.Grid1 is IgniteProxy) + evt.Grid1 = (evt.Grid1 as IgniteProxy).Target; + + if (evt.Grid2 != null && evt.Grid2 is IgniteProxy) + evt.Grid2 = (evt.Grid2 as IgniteProxy).Target; + + Assert.AreEqual(expGrid1, evt.Grid1); + Assert.AreEqual(expGrid2, evt.Grid2); + Assert.AreEqual(expProp1, evt.Prop1); + Assert.AreEqual(expProp2, evt.Prop2); + } + } + + public abstract class AbstractBean + { + [InstanceResource] + public IIgnite Grid1; + + public int Property1 + { + get; + set; + } + } + + public class Bean : AbstractBean, ILifecycleBean + { + [InstanceResource] + public IIgnite Grid2; + + public string Property2 + { + get; + set; + } + + /** <inheritDoc /> */ + public void OnLifecycleEvent(LifecycleEventType evtType) + { + if (LifecycleTest.ThrowErr) + throw new Exception("Lifecycle exception."); + + Event evt = new Event(); + + evt.Grid1 = Grid1; + evt.Grid2 = Grid2; + evt.Prop1 = Property1; + evt.Prop2 = Property2; + + switch (evtType) + { + case LifecycleEventType.BeforeNodeStart: + LifecycleTest.BeforeStartEvts.Add(evt); + + break; + + case LifecycleEventType.AfterNodeStart: + LifecycleTest.AfterStartEvts.Add(evt); + + break; + + case LifecycleEventType.BeforeNodeStop: + LifecycleTest.BeforeStopEvts.Add(evt); + + break; + + case LifecycleEventType.AfterNodeStop: + LifecycleTest.AfterStopEvts.Add(evt); + + break; + } + } + } + + public class Event + { + public IIgnite Grid1; + public IIgnite Grid2; + public int Prop1; + public string Prop2; + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs new file mode 100644 index 0000000..af9387c --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests +{ + using System; + using System.CodeDom.Compiler; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using Apache.Ignite.Core.Common; + using Microsoft.CSharp; + using NUnit.Framework; + + /// <summary> + /// Dll loading test. + /// </summary> + public class LoadDllTest + { + /// <summary> + /// + /// </summary> + [SetUp] + public void SetUp() + { + TestUtils.KillProcesses(); + } + + /// <summary> + /// + /// </summary> + [TearDown] + public void TearDown() + { + Ignition.StopAll(true); + } + + /// <summary> + /// + /// </summary> + [Test] + public void TestLoadFromGac() + { + Assert.False(IsLoaded("System.Data.Linq")); + + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\start-test-grid3.xml", + Assemblies = + new List<string> + { + "System.Data.Linq,Culture=neutral,Version=1.0.0.0,PublicKeyToken=b77a5c561934e089" + }, + JvmClasspath = TestUtils.CreateTestClasspath() + }; + + + var grid = Ignition.Start(cfg); + + Assert.IsNotNull(grid); + + Assert.True(IsLoaded("System.Data.Linq")); + } + + /// <summary> + /// + /// </summary> + [Test] + public void TestLoadFromCurrentDir() + { + Assert.False(IsLoaded("testDll")); + + GenerateDll("testDll.dll"); + + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\start-test-grid3.xml", + Assemblies = new List<string> {"testDll.dll"}, + JvmClasspath = TestUtils.CreateTestClasspath() + }; + + var grid = Ignition.Start(cfg); + + Assert.IsNotNull(grid); + + Assert.True(IsLoaded("testDll")); + } + + /// <summary> + /// + /// </summary> + [Test] + public void TestLoadAllDllInDir() + { + var dirInfo = Directory.CreateDirectory(Path.GetTempPath() + "/testDlls"); + + Assert.False(IsLoaded("dllFromDir1")); + Assert.False(IsLoaded("dllFromDir2")); + + GenerateDll(dirInfo.FullName + "/dllFromDir1.dll"); + GenerateDll(dirInfo.FullName + "/dllFromDir2.dll"); + File.WriteAllText(dirInfo.FullName + "/notADll.txt", "notADll"); + + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\start-test-grid3.xml", + Assemblies = new List<string> {dirInfo.FullName}, + JvmClasspath = TestUtils.CreateTestClasspath() + }; + + var grid = Ignition.Start(cfg); + + Assert.IsNotNull(grid); + + Assert.True(IsLoaded("dllFromDir1")); + Assert.True(IsLoaded("dllFromDir2")); + } + + /// <summary> + /// + /// </summary> + [Test] + public void TestLoadFromCurrentDirByName() + { + Assert.False(IsLoaded("testDllByName")); + + GenerateDll("testDllByName.dll"); + + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\start-test-grid3.xml", + Assemblies = new List<string> {"testDllByName"}, + JvmClasspath = TestUtils.CreateTestClasspath() + }; + + var grid = Ignition.Start(cfg); + + Assert.IsNotNull(grid); + + Assert.True(IsLoaded("testDllByName")); + } + + /// <summary> + /// + /// </summary> + [Test] + public void TestLoadByAbsoluteUri() + { + var dllPath = Path.GetTempPath() + "/tempDll.dll"; + Assert.False(IsLoaded("tempDll")); + + GenerateDll(dllPath); + + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\start-test-grid3.xml", + Assemblies = new List<string> {dllPath}, + JvmClasspath = TestUtils.CreateTestClasspath() + }; + + var grid = Ignition.Start(cfg); + + Assert.IsNotNull(grid); + + Assert.True(IsLoaded("tempDll")); + } + + /// <summary> + /// + /// </summary> + [Test] + public void TestLoadUnexistingLibrary() + { + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\start-test-grid3.xml", + Assemblies = new List<string> {"unexistingAssembly.820482.dll"}, + JvmClasspath = TestUtils.CreateTestClasspath() + }; + + try + { + Ignition.Start(cfg); + + Assert.Fail("Grid has been started with broken configuration."); + } + catch (IgniteException) + { + + } + } + + /// <summary> + /// + /// </summary> + /// <param name="outputPath"></param> + private void GenerateDll(string outputPath) + { + var codeProvider = new CSharpCodeProvider(); + +#pragma warning disable 0618 + + var icc = codeProvider.CreateCompiler(); + +#pragma warning restore 0618 + + var parameters = new CompilerParameters + { + GenerateExecutable = false, + OutputAssembly = outputPath + }; + + var src = "namespace GridGain.Client.Test { public class Foo {}}"; + + var results = icc.CompileAssemblyFromSource(parameters, src); + + Assert.False(results.Errors.HasErrors); + } + + /// <summary> + /// Determines whether the specified assembly is loaded. + /// </summary> + /// <param name="asmName">Name of the assembly.</param> + private static bool IsLoaded(string asmName) + { + return AppDomain.CurrentDomain.GetAssemblies().Any(a => a.GetName().Name == asmName); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs new file mode 100644 index 0000000..d3af288 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests +{ + using Apache.Ignite.Core.Common; + using NUnit.Framework; + + /// <summary> + /// Test marshaller initialization. + /// </summary> + public class MarshallerTest + { + /// <summary> + /// Tests the default marhsaller. + /// By default, portable marshaller is used. + /// </summary> + [Test] + public void TestDefaultMarhsaller() + { + using (var grid = Ignition.Start("config\\marshaller-default.xml")) + { + var cache = grid.GetOrCreateCache<int, int>(null); + + cache.Put(1, 1); + + Assert.AreEqual(1, cache.Get(1)); + } + } + + /// <summary> + /// Tests the portable marhsaller. + /// PortableMarshaller can be specified explicitly in config. + /// </summary> + [Test] + public void TestPortableMarhsaller() + { + using (var grid = Ignition.Start("config\\marshaller-portable.xml")) + { + var cache = grid.GetOrCreateCache<int, int>(null); + + cache.Put(1, 1); + + Assert.AreEqual(1, cache.Get(1)); + } + } + + /// <summary> + /// Tests the invalid marshaller. + /// </summary> + [Test] + public void TestInvalidMarshaller() + { + Assert.Throws<IgniteException>(() => Ignition.Start("config\\marshaller-invalid.xml")); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs new file mode 100644 index 0000000..abb8e2f --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Messaging; + using Apache.Ignite.Core.Resource; + using NUnit.Framework; + + /// <summary> + /// <see cref="IMessaging"/> tests. + /// </summary> + public class MessagingTest + { + /** */ + private IIgnite _grid1; + + /** */ + private IIgnite _grid2; + + /** */ + private IIgnite _grid3; + + /** */ + public static int MessageId; + + /// <summary> + /// Executes before each test. + /// </summary> + [SetUp] + public void SetUp() + { + _grid1 = Ignition.Start(Configuration("config\\compute\\compute-grid1.xml")); + _grid2 = Ignition.Start(Configuration("config\\compute\\compute-grid2.xml")); + _grid3 = Ignition.Start(Configuration("config\\compute\\compute-grid3.xml")); + } + + /// <summary> + /// Executes after each test. + /// </summary> + [TearDown] + public virtual void TearDown() + { + try + { + TestUtils.AssertHandleRegistryIsEmpty(1000, _grid1, _grid2, _grid3); + + MessagingTestHelper.AssertFailures(); + } + finally + { + // Stop all grids between tests to drop any hanging messages + Ignition.StopAll(true); + } + } + + /// <summary> + /// Tests LocalListen. + /// </summary> + [Test] + public void TestLocalListen() + { + TestLocalListen(null); + TestLocalListen("string topic"); + TestLocalListen(NextId()); + } + + /// <summary> + /// Tests LocalListen. + /// </summary> + [SuppressMessage("ReSharper", "AccessToModifiedClosure")] + public void TestLocalListen(object topic) + { + var messaging = _grid1.Message(); + var listener = MessagingTestHelper.GetListener(); + messaging.LocalListen(listener, topic); + + // Test sending + CheckSend(topic); + CheckSend(topic, _grid2); + CheckSend(topic, _grid3); + + // Test different topic + CheckNoMessage(NextId()); + CheckNoMessage(NextId(), _grid2); + + // Test multiple subscriptions for the same filter + messaging.LocalListen(listener, topic); + messaging.LocalListen(listener, topic); + CheckSend(topic, repeatMultiplier: 3); // expect all messages repeated 3 times + + messaging.StopLocalListen(listener, topic); + CheckSend(topic, repeatMultiplier: 2); // expect all messages repeated 2 times + + messaging.StopLocalListen(listener, topic); + CheckSend(topic); // back to 1 listener + + // Test message type mismatch + var ex = Assert.Throws<IgniteException>(() => messaging.Send(1.1, topic)); + Assert.AreEqual("Unable to cast object of type 'System.Double' to type 'System.String'.", ex.Message); + + // Test end listen + MessagingTestHelper.ListenResult = false; + CheckSend(topic, single: true); // we'll receive one more and then unsubscribe because of delegate result. + CheckNoMessage(topic); + + // Start again + MessagingTestHelper.ListenResult = true; + messaging.LocalListen(listener, topic); + CheckSend(topic); + + // Stop + messaging.StopLocalListen(listener, topic); + CheckNoMessage(topic); + } + + /// <summary> + /// Tests LocalListen with projection. + /// </summary> + [Test] + public void TestLocalListenProjection() + { + TestLocalListenProjection(null); + TestLocalListenProjection("prj"); + TestLocalListenProjection(NextId()); + } + + /// <summary> + /// Tests LocalListen with projection. + /// </summary> + private void TestLocalListenProjection(object topic) + { + var grid3GotMessage = false; + + var grid3Listener = new MessageFilter<string>((id, x) => + { + grid3GotMessage = true; + return true; + }); + + _grid3.Message().LocalListen(grid3Listener, topic); + + var clusterMessaging = _grid1.Cluster.ForNodes(_grid1.Cluster.LocalNode, _grid2.Cluster.LocalNode).Message(); + var clusterListener = MessagingTestHelper.GetListener(); + clusterMessaging.LocalListen(clusterListener, topic); + + CheckSend(msg: clusterMessaging, topic: topic); + Assert.IsFalse(grid3GotMessage, "Grid3 should not get messages"); + + CheckSend(grid: _grid2, msg: clusterMessaging, topic: topic); + Assert.IsFalse(grid3GotMessage, "Grid3 should not get messages"); + + clusterMessaging.StopLocalListen(clusterListener, topic); + _grid3.Message().StopLocalListen(grid3Listener, topic); + } + + /// <summary> + /// Tests LocalListen in multithreaded mode. + /// </summary> + [Test] + [SuppressMessage("ReSharper", "AccessToModifiedClosure")] + [Category(TestUtils.CategoryIntensive)] + public void TestLocalListenMultithreaded() + { + const int threadCnt = 20; + const int runSeconds = 20; + + var messaging = _grid1.Message(); + + var senders = Task.Factory.StartNew(() => TestUtils.RunMultiThreaded(() => + { + messaging.Send((object) NextMessage()); + Thread.Sleep(50); + }, threadCnt, runSeconds)); + + + var sharedReceived = 0; + + var sharedListener = new MessageFilter<string>((id, x) => + { + Interlocked.Increment(ref sharedReceived); + Thread.MemoryBarrier(); + return true; + }); + + TestUtils.RunMultiThreaded(() => + { + // Check that listen/stop work concurrently + messaging.LocalListen(sharedListener); + + for (int i = 0; i < 100; i++) + { + messaging.LocalListen(sharedListener); + messaging.StopLocalListen(sharedListener); + } + + var localReceived = 0; + var stopLocal = 0; + + var localListener = new MessageFilter<string>((id, x) => + { + Interlocked.Increment(ref localReceived); + Thread.MemoryBarrier(); + return Thread.VolatileRead(ref stopLocal) == 0; + }); + + messaging.LocalListen(localListener); + + Thread.Sleep(100); + + Thread.VolatileWrite(ref stopLocal, 1); + + Thread.Sleep(1000); + + var result = Thread.VolatileRead(ref localReceived); + + Thread.Sleep(100); + + // Check that unsubscription worked properly + Assert.AreEqual(result, Thread.VolatileRead(ref localReceived)); + + messaging.StopLocalListen(sharedListener); + + }, threadCnt, runSeconds); + + senders.Wait(); + + Thread.Sleep(100); + + var sharedResult = Thread.VolatileRead(ref sharedReceived); + + messaging.Send((object)NextMessage()); + + Thread.Sleep(MessagingTestHelper.MessageTimeout); + + // Check that unsubscription worked properly + Assert.AreEqual(sharedResult, Thread.VolatileRead(ref sharedReceived)); + } + + /// <summary> + /// Tests RemoteListen. + /// </summary> + [Test] + public void TestRemoteListen() + { + TestRemoteListen(null); + TestRemoteListen("string topic"); + TestRemoteListen(NextId()); + } + + /// <summary> + /// Tests RemoteListen with async mode enabled. + /// </summary> + [Test] + public void TestRemoteListenAsync() + { + TestRemoteListen(null, true); + TestRemoteListen("string topic", true); + TestRemoteListen(NextId(), true); + } + + /// <summary> + /// Tests RemoteListen. + /// </summary> + public void TestRemoteListen(object topic, bool async = false) + { + var messaging = async ? _grid1.Message().WithAsync() : _grid1.Message(); + + var listener = MessagingTestHelper.GetListener(); + var listenId = messaging.RemoteListen(listener, topic); + + if (async) + listenId = messaging.GetFuture<Guid>().Get(); + + // Test sending + CheckSend(topic, msg: messaging, remoteListen: true); + + // Test different topic + CheckNoMessage(NextId()); + + // Test multiple subscriptions for the same filter + var listenId2 = messaging.RemoteListen(listener, topic); + + if (async) + listenId2 = messaging.GetFuture<Guid>().Get(); + + CheckSend(topic, msg: messaging, remoteListen: true, repeatMultiplier: 2); // expect twice the messages + + messaging.StopRemoteListen(listenId2); + + if (async) + messaging.GetFuture().Get(); + + CheckSend(topic, msg: messaging, remoteListen: true); // back to normal after unsubscription + + // Test message type mismatch + var ex = Assert.Throws<IgniteException>(() => messaging.Send(1.1, topic)); + Assert.AreEqual("Unable to cast object of type 'System.Double' to type 'System.String'.", ex.Message); + + // Test end listen + messaging.StopRemoteListen(listenId); + + if (async) + messaging.GetFuture().Get(); + + CheckNoMessage(topic); + } + + /// <summary> + /// Tests RemoteListen with a projection. + /// </summary> + [Test] + public void TestRemoteListenProjection() + { + TestRemoteListenProjection(null); + TestRemoteListenProjection("string topic"); + TestRemoteListenProjection(NextId()); + } + + /// <summary> + /// Tests RemoteListen with a projection. + /// </summary> + private void TestRemoteListenProjection(object topic) + { + var clusterMessaging = _grid1.Cluster.ForNodes(_grid1.Cluster.LocalNode, _grid2.Cluster.LocalNode).Message(); + var clusterListener = MessagingTestHelper.GetListener(); + var listenId = clusterMessaging.RemoteListen(clusterListener, topic); + + CheckSend(msg: clusterMessaging, topic: topic, remoteListen: true); + + clusterMessaging.StopRemoteListen(listenId); + + CheckNoMessage(topic); + } + + /// <summary> + /// Tests LocalListen in multithreaded mode. + /// </summary> + [Test] + [Category(TestUtils.CategoryIntensive)] + public void TestRemoteListenMultithreaded() + { + const int threadCnt = 20; + const int runSeconds = 20; + + var messaging = _grid1.Message(); + + var senders = Task.Factory.StartNew(() => TestUtils.RunMultiThreaded(() => + { + MessagingTestHelper.ClearReceived(int.MaxValue); + messaging.Send((object) NextMessage()); + Thread.Sleep(50); + }, threadCnt, runSeconds)); + + + var sharedListener = MessagingTestHelper.GetListener(); + + for (int i = 0; i < 100; i++) + messaging.RemoteListen(sharedListener); // add some listeners to be stopped by filter result + + TestUtils.RunMultiThreaded(() => + { + // Check that listen/stop work concurrently + messaging.StopRemoteListen(messaging.RemoteListen(sharedListener)); + + }, threadCnt, runSeconds); + + MessagingTestHelper.ListenResult = false; + + messaging.Send((object) NextMessage()); // send a message to make filters return false + + Thread.Sleep(MessagingTestHelper.MessageTimeout); // wait for all to unsubscribe + + MessagingTestHelper.ListenResult = true; + + senders.Wait(); // wait for senders to stop + + var sharedResult = MessagingTestHelper.ReceivedMessages.Count; + + messaging.Send((object) NextMessage()); + + Thread.Sleep(MessagingTestHelper.MessageTimeout); + + // Check that unsubscription worked properly + Assert.AreEqual(sharedResult, MessagingTestHelper.ReceivedMessages.Count); + + } + + /// <summary> + /// Sends messages in various ways and verefies correct receival. + /// </summary> + /// <param name="topic">Topic.</param> + /// <param name="grid">The grid to use.</param> + /// <param name="msg">Messaging to use.</param> + /// <param name="remoteListen">Whether to expect remote listeners.</param> + /// <param name="single">When true, only check one message.</param> + /// <param name="repeatMultiplier">Expected message count multiplier.</param> + private void CheckSend(object topic = null, IIgnite grid = null, + IMessaging msg = null, bool remoteListen = false, bool single = false, int repeatMultiplier = 1) + { + IClusterGroup cluster; + + if (msg != null) + cluster = msg.ClusterGroup; + else + { + grid = grid ?? _grid1; + msg = grid.Message(); + cluster = grid.Cluster.ForLocal(); + } + + // Messages will repeat due to multiple nodes listening + var expectedRepeat = repeatMultiplier * (remoteListen ? cluster.Nodes().Count : 1); + + var messages = Enumerable.Range(1, 10).Select(x => NextMessage()).OrderBy(x => x).ToList(); + + // Single message + MessagingTestHelper.ClearReceived(expectedRepeat); + msg.Send((object) messages[0], topic); + MessagingTestHelper.VerifyReceive(cluster, messages.Take(1), m => m.ToList(), expectedRepeat); + + if (single) + return; + + // Multiple messages (receive order is undefined) + MessagingTestHelper.ClearReceived(messages.Count * expectedRepeat); + msg.Send(messages, topic); + MessagingTestHelper.VerifyReceive(cluster, messages, m => m.OrderBy(x => x), expectedRepeat); + + // Multiple messages, ordered + MessagingTestHelper.ClearReceived(messages.Count * expectedRepeat); + messages.ForEach(x => msg.SendOrdered(x, topic, MessagingTestHelper.MessageTimeout)); + + if (remoteListen) // in remote scenario messages get mixed up due to different timing on different nodes + MessagingTestHelper.VerifyReceive(cluster, messages, m => m.OrderBy(x => x), expectedRepeat); + else + MessagingTestHelper.VerifyReceive(cluster, messages, m => m.Reverse(), expectedRepeat); + } + + /// <summary> + /// Checks that no message has arrived. + /// </summary> + private void CheckNoMessage(object topic, IIgnite grid = null) + { + // this will result in an exception in case of a message + MessagingTestHelper.ClearReceived(0); + + (grid ?? _grid1).Message().Send(NextMessage(), topic); + + Thread.Sleep(MessagingTestHelper.MessageTimeout); + + MessagingTestHelper.AssertFailures(); + } + + /// <summary> + /// Gets the Ignite configuration. + /// </summary> + private static IgniteConfiguration Configuration(string springConfigUrl) + { + return new IgniteConfiguration + { + SpringConfigUrl = springConfigUrl, + JvmClasspath = TestUtils.CreateTestClasspath(), + JvmOptions = TestUtils.TestJavaOptions() + }; + } + + /// <summary> + /// Generates next message with sequential ID and current test name. + /// </summary> + private static string NextMessage() + { + var id = NextId(); + return id + "_" + TestContext.CurrentContext.Test.Name; + } + + /// <summary> + /// Generates next sequential ID. + /// </summary> + private static int NextId() + { + return Interlocked.Increment(ref MessageId); + } + } + + /// <summary> + /// Messaging test helper class. + /// </summary> + [Serializable] + public static class MessagingTestHelper + { + /** */ + public static readonly ConcurrentStack<string> ReceivedMessages = new ConcurrentStack<string>(); + + /** */ + public static readonly ConcurrentStack<string> Failures = new ConcurrentStack<string>(); + + /** */ + public static readonly CountdownEvent ReceivedEvent = new CountdownEvent(0); + + /** */ + public static readonly ConcurrentStack<Guid> LastNodeIds = new ConcurrentStack<Guid>(); + + /** */ + public static volatile bool ListenResult = true; + + /** */ + public static readonly TimeSpan MessageTimeout = TimeSpan.FromMilliseconds(700); + + /// <summary> + /// Clears received message information. + /// </summary> + /// <param name="expectedCount">The expected count of messages to be received.</param> + public static void ClearReceived(int expectedCount) + { + ReceivedMessages.Clear(); + ReceivedEvent.Reset(expectedCount); + LastNodeIds.Clear(); + } + + /// <summary> + /// Verifies received messages against expected messages. + /// </summary> + /// <param name="cluster">Cluster.</param> + /// <param name="expectedMessages">Expected messages.</param> + /// <param name="resultFunc">Result transform function.</param> + /// <param name="expectedRepeat">Expected repeat count.</param> + public static void VerifyReceive(IClusterGroup cluster, IEnumerable<string> expectedMessages, + Func<IEnumerable<string>, IEnumerable<string>> resultFunc, int expectedRepeat) + { + // check if expected message count has been received; Wait returns false if there were none. + Assert.IsTrue(ReceivedEvent.Wait(MessageTimeout)); + + expectedMessages = expectedMessages.SelectMany(x => Enumerable.Repeat(x, expectedRepeat)); + + Assert.AreEqual(expectedMessages, resultFunc(ReceivedMessages)); + + // check that all messages came from local node. + var localNodeId = cluster.Ignite.Cluster.LocalNode.Id; + Assert.AreEqual(localNodeId, LastNodeIds.Distinct().Single()); + + AssertFailures(); + } + + /// <summary> + /// Gets the message listener. + /// </summary> + /// <returns>New instance of message listener.</returns> + public static IMessageFilter<string> GetListener() + { + return new MessageFilter<string>(Listen); + } + + /// <summary> + /// Combines accumulated failures and throws an assertion, if there are any. + /// Clears accumulated failures. + /// </summary> + public static void AssertFailures() + { + if (Failures.Any()) + Assert.Fail(Failures.Reverse().Aggregate((x, y) => string.Format("{0}\n{1}", x, y))); + + Failures.Clear(); + } + + /// <summary> + /// Listen method. + /// </summary> + /// <param name="id">Originating node ID.</param> + /// <param name="msg">Message.</param> + private static bool Listen(Guid id, string msg) + { + try + { + LastNodeIds.Push(id); + ReceivedMessages.Push(msg); + + ReceivedEvent.Signal(); + + return ListenResult; + } + catch (Exception ex) + { + // When executed on remote nodes, these exceptions will not go to sender, + // so we have to accumulate them. + Failures.Push(string.Format("Exception in Listen (msg: {0}, id: {1}): {2}", msg, id, ex)); + throw; + } + } + } + + /// <summary> + /// Test message filter. + /// </summary> + [Serializable] + public class MessageFilter<T> : IMessageFilter<T> + { + /** */ + private readonly Func<Guid, T, bool> _invoke; + + #pragma warning disable 649 + /** Grid. */ + [InstanceResource] + private IIgnite _grid; + #pragma warning restore 649 + + /// <summary> + /// Initializes a new instance of the <see cref="MessageFilter{T}"/> class. + /// </summary> + /// <param name="invoke">The invoke delegate.</param> + public MessageFilter(Func<Guid, T, bool> invoke) + { + _invoke = invoke; + } + + /** <inheritdoc /> */ + public bool Invoke(Guid nodeId, T message) + { + Assert.IsNotNull(_grid); + return _invoke(nodeId, message); + } + } +}
