IGNITE-5615 .NET: IgniteConfiguration.LocalEventListeners This closes #2754
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b364589d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b364589d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b364589d Branch: refs/heads/ignite-3478 Commit: b364589dffcb57aa2297fd1f2862f16065d44701 Parents: 7e4746a Author: Pavel Tupitsyn <[email protected]> Authored: Wed Sep 27 20:13:37 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Wed Sep 27 20:13:37 2017 +0300 ---------------------------------------------------------------------- .../callback/PlatformCallbackGateway.java | 15 ++ .../platform/callback/PlatformCallbackOp.java | 3 + .../platform/events/PlatformEvents.java | 17 +- .../events/PlatformLocalEventListener.java | 88 +++++++++ .../utils/PlatformConfigurationUtils.java | 30 +++ .../Apache.Ignite.Core.Tests.csproj | 1 + .../BinaryConfigurationTest.cs | 6 +- .../Apache.Ignite.Core.Tests/EventsTest.cs | 5 +- .../EventsTestLocalListeners.cs | 187 +++++++++++++++++++ .../IgniteConfigurationSerializerTest.cs | 43 ++++- .../Plugin/PluginTest.cs | 1 - .../Apache.Ignite.Core.csproj | 1 + .../Binary/BinaryConfiguration.cs | 46 ++--- .../Cache/Configuration/CacheConfiguration.cs | 41 +++- .../Cache/Configuration/QueryEntity.cs | 33 ++++ .../Cache/Configuration/QueryField.cs | 13 ++ .../Events/LocalEventListener.cs | 79 ++++++++ .../Apache.Ignite.Core/IgniteConfiguration.cs | 129 ++++++++++++- .../IgniteConfigurationSection.xsd | 45 +++++ .../dotnet/Apache.Ignite.Core/Ignition.cs | 2 + .../Common/IgniteConfigurationXmlSerializer.cs | 5 + .../Apache.Ignite.Core/Impl/Events/Events.cs | 31 ++- .../Impl/Unmanaged/UnmanagedCallbackOp.cs | 3 +- .../Impl/Unmanaged/UnmanagedCallbacks.cs | 24 +++ .../dotnet/Apache.Ignite.sln.DotSettings | 2 + 25 files changed, 794 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index 7a5c0a4..fb8564c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -748,6 +748,21 @@ public class PlatformCallbackGateway { } /** + * @param memPtr Memory pointer. + * @return Result. + */ + public long eventLocalListenerApply(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.EventLocalListenerApply, memPtr); + } + finally { + leave(); + } + } + + /** * @param ptr Pointer. */ public void eventFilterDestroy(long ptr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java index d77d2d7..783f05e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java @@ -224,4 +224,7 @@ class PlatformCallbackOp { /** */ public static final int PluginCallbackInLongLongOutLong = 68; + + /** */ + public static final int EventLocalListenerApply = 69; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index 845c06a..0af863c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.platform.events; -import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteEvents; import org.apache.ignite.events.Event; @@ -32,15 +31,16 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; -import org.jetbrains.annotations.Nullable; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.UUID; /** * Interop events. */ +@SuppressWarnings("unchecked") public class PlatformEvents extends PlatformAbstractTarget { /** */ private static final int OP_REMOTE_QUERY = 1; @@ -222,6 +222,19 @@ public class PlatformEvents extends PlatformAbstractTarget { break; } + case OP_STOP_LOCAL_LISTEN: { + int id = reader.readInt(); + int[] types = reader.readIntArray(); + + IgnitePredicate lsnr = new PlatformLocalEventListener(id); + + boolean res = events.stopLocalListen(lsnr, types); + + writer.writeBoolean(res); + + break; + } + default: super.processInStreamOutStream(type, reader, writer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformLocalEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformLocalEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformLocalEventListener.java new file mode 100644 index 0000000..56a19cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformLocalEventListener.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.events; + +import org.apache.ignite.Ignite; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * Platform local event filter. Delegates apply to native platform. + */ +public class PlatformLocalEventListener implements IgnitePredicate<Event> { + /** */ + private static final long serialVersionUID = 0L; + + /** Listener id. */ + private final int id; + + /** Ignite. */ + @SuppressWarnings("unused") + @IgniteInstanceResource + private transient Ignite ignite; + + /** + * Constructor. + * + * @param id Listener id. + */ + public PlatformLocalEventListener(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + assert ignite != null; + + PlatformContext ctx = PlatformUtils.platformContext(ignite); + + assert ctx != null; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeInt(id); + + ctx.writeEvent(writer, evt); + + out.synchronize(); + + long res = ctx.gateway().eventLocalListenerApply(mem.pointer()); + + return res != 0; + } + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || o != null && getClass() == o.getClass() && id == ((PlatformLocalEventListener) o).id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 513a463..fe214da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -65,11 +65,14 @@ import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.configuration.SqlConnectorConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; +import org.apache.ignite.internal.processors.platform.events.PlatformLocalEventListener; import org.apache.ignite.internal.processors.platform.plugin.cache.PlatformCachePluginConfiguration; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction; import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration; @@ -702,6 +705,8 @@ public class PlatformConfigurationUtils { cfg.setPersistentStoreConfiguration(readPersistentStoreConfiguration(in)); readPluginConfiguration(cfg, in); + + readLocalEventListeners(cfg, in); } /** @@ -1613,6 +1618,31 @@ public class PlatformConfigurationUtils { } } + /** + * Reads the plugin configuration. + * + * @param cfg Ignite configuration to update. + * @param in Reader. + */ + private static void readLocalEventListeners(IgniteConfiguration cfg, BinaryRawReader in) { + int cnt = in.readInt(); + + if (cnt == 0) { + return; + } + + Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(cnt); + + for (int i = 0; i < cnt; i++) { + int[] types = in.readIntArray(); + + lsnrs.put(new PlatformLocalEventListener(i), types); + } + + cfg.setLocalEventListeners(lsnrs); + } + + /** * Private constructor. http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index 375b6b8..7f5f4b8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -146,6 +146,7 @@ <Compile Include="Collections\ReadOnlyDictionaryTest.cs" /> <Compile Include="Common\IgniteGuidTest.cs" /> <Compile Include="Deployment\RuntimeDependencyFunc.cs" /> + <Compile Include="EventsTestLocalListeners.cs" /> <Compile Include="Log\ConcurrentMemoryTarget.cs" /> <Compile Include="Log\DefaultLoggerTest.cs" /> <Compile Include="Log\Log4NetLoggerTest.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/BinaryConfigurationTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/BinaryConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/BinaryConfigurationTest.cs index d549d1d..972b078 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/BinaryConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/BinaryConfigurationTest.cs @@ -93,10 +93,12 @@ namespace Apache.Ignite.Core.Tests [Test] public void TestCodeConfiguration() { - StartGrid(new BinaryConfiguration + var cfg = new BinaryConfiguration { TypeConfigurations = TestTypes.Select(x => new BinaryTypeConfiguration(x)).ToList() - }); + }; + + StartGrid(new BinaryConfiguration(cfg)); CheckBinarizableTypes(TestTypes); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs index c05511c..672ff9e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs @@ -634,13 +634,10 @@ namespace Apache.Ignite.Core.Tests }; var ex = Assert.Throws<IgniteException>(() => Ignition.Start(igniteCfg)); - Assert.AreEqual("Failed to start Ignite.NET, check inner exception for details", ex.Message); - - Assert.IsNotNull(ex.InnerException); Assert.AreEqual("Unsupported IgniteConfiguration.EventStorageSpi: " + "'Apache.Ignite.Core.Tests.MyEventStorage'. Supported implementations: " + "'Apache.Ignite.Core.Events.NoopEventStorageSpi', " + - "'Apache.Ignite.Core.Events.MemoryEventStorageSpi'.", ex.InnerException.Message); + "'Apache.Ignite.Core.Events.MemoryEventStorageSpi'.", ex.Message); } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs new file mode 100644 index 0000000..ca98801 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests +{ + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Events; + using NUnit.Framework; + + /// <summary> + /// Tests <see cref="IgniteConfiguration.LocalEventListeners" />. + /// </summary> + public class EventsTestLocalListeners + { + /** Cache name. */ + private const string CacheName = "cache"; + + /// <summary> + /// Tests the rebalance events which occur during node startup. + /// </summary> + [Test] + public void TestRebalanceEvents() + { + var listener = new Listener<CacheRebalancingEvent>(); + + using (Ignition.Start(GetConfig(listener, EventType.CacheRebalanceAll))) + { + var events = listener.GetEvents(); + + Assert.AreEqual(2, events.Count); + + var rebalanceStart = events.First(); + + Assert.AreEqual(CacheName, rebalanceStart.CacheName); + Assert.AreEqual(EventType.CacheRebalanceStarted, rebalanceStart.Type); + + var rebalanceStop = events.Last(); + + Assert.AreEqual(CacheName, rebalanceStop.CacheName); + Assert.AreEqual(EventType.CacheRebalanceStopped, rebalanceStop.Type); + } + } + + /// <summary> + /// Tests the unsubscription. + /// </summary> + [Test] + public void TestUnsubscribe() + { + var listener = new Listener<CacheEvent>(); + + using (var ignite = Ignition.Start(GetConfig(listener, EventType.CacheAll))) + { + Assert.AreEqual(0, listener.GetEvents().Count); + + var cache = ignite.GetCache<int, int>(CacheName); + + // Put causes 3 events: EntryCreated, ObjectPut, EntryDestroyed. + cache.Put(1, 1); + Assert.AreEqual(3, listener.GetEvents().Count); + + // Remove listener from one of the event types. + var res = ignite.GetEvents().StopLocalListen(listener, EventType.CacheEntryCreated); + Assert.IsTrue(res); + + cache.Put(2, 2); + Assert.AreEqual(2, listener.GetEvents().Count); + + // Remove from all event types. + res = ignite.GetEvents().StopLocalListen(listener); + Assert.IsTrue(res); + + cache.Put(3, 3); + Assert.AreEqual(0, listener.GetEvents().Count); + + // Remove when not subscribed. + res = ignite.GetEvents().StopLocalListen(listener); + Assert.IsFalse(res); + } + } + + /// <summary> + /// Tests the configuration validation. + /// </summary> + [Test] + public void TestConfigValidation() + { + var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + LocalEventListeners = new LocalEventListener<IEvent>[1] + }; + + // Null collection element. + var ex = Assert.Throws<IgniteException>(() => Ignition.Start(cfg)); + Assert.AreEqual("LocalEventListeners can't contain nulls.", ex.Message); + + // Null listener property. + cfg.LocalEventListeners = new[] {new LocalEventListener<IEvent>()}; + ex = Assert.Throws<IgniteException>(() => Ignition.Start(cfg)); + Assert.AreEqual("LocalEventListener.Listener can't be null.", ex.Message); + + // Null event types. + cfg.LocalEventListeners = new[] {new LocalEventListener<IEvent> {Listener = new Listener<IEvent>()}}; + ex = Assert.Throws<IgniteException>(() => Ignition.Start(cfg)); + Assert.AreEqual("LocalEventListener.EventTypes can't be null or empty.", ex.Message); + + // Empty event types. + cfg.LocalEventListeners = new[] + {new LocalEventListener<IEvent> {Listener = new Listener<IEvent>(), EventTypes = new int[0]}}; + ex = Assert.Throws<IgniteException>(() => Ignition.Start(cfg)); + Assert.AreEqual("LocalEventListener.EventTypes can't be null or empty.", ex.Message); + } + + /// <summary> + /// Gets the configuration. + /// </summary> + private static IgniteConfiguration GetConfig<T>(IEventListener<T> listener, ICollection<int> eventTypes) + where T : IEvent + { + return new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + LocalEventListeners = new[] + { + new LocalEventListener<T> + { + Listener = listener, + EventTypes = eventTypes + } + }, + IncludedEventTypes = eventTypes, + CacheConfiguration = new[] { new CacheConfiguration(CacheName) } + }; + } + + /// <summary> + /// Listener. + /// </summary> + private class Listener<T> : IEventListener<T> where T : IEvent + { + /** Listen action. */ + private readonly List<T> _events = new List<T>(); + + /// <summary> + /// Gets the events. + /// </summary> + public ICollection<T> GetEvents() + { + lock (_events) + { + var res = _events.ToArray(); + + _events.Clear(); + + return res; + } + } + + /** <inheritdoc /> */ + public bool Invoke(T evt) + { + lock (_events) + { + _events.Add(evt); + } + + return true; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs index ec87bfe..2530243 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs @@ -145,6 +145,22 @@ namespace Apache.Ignite.Core.Tests <clientConnectorConfiguration host='bar' port='10' portRange='11' socketSendBufferSize='12' socketReceiveBufferSize='13' tcpNoDelay='true' maxOpenCursorsPerConnection='14' threadPoolSize='15' /> <persistentStoreConfiguration alwaysWriteFullPages='true' checkpointingFrequency='00:00:1' checkpointingPageBufferSize='2' checkpointingThreads='3' lockWaitTime='00:00:04' persistentStorePath='foo' tlbSize='5' walArchivePath='bar' walFlushFrequency='00:00:06' walFsyncDelayNanos='7' walHistorySize='8' walMode='None' walRecordIteratorBufferSize='9' walSegments='10' walSegmentSize='11' walStorePath='baz' metricsEnabled='true' rateTimeInterval='0:0:6' subIntervals='3' checkpointWriteOrder='Random' /> <consistentId type='System.String'>someId012</consistentId> + <localEventListeners> + <localEventListener type='Apache.Ignite.Core.Events.LocalEventListener`1[[Apache.Ignite.Core.Events.CacheRebalancingEvent]]'> + <eventTypes> + <int>CacheObjectPut</int> + <int>81</int> + </eventTypes> + <listener type='Apache.Ignite.Core.Tests.EventsTestLocalListeners+Listener`1[[Apache.Ignite.Core.Events.CacheRebalancingEvent]]' /> + </localEventListener> + <localEventListener type='Apache.Ignite.Core.Events.LocalEventListener`1[[Apache.Ignite.Core.Events.IEvent]]'> + <eventTypes> + <int>CacheObjectPut</int> + <int>81</int> + </eventTypes> + <listener type='Apache.Ignite.Core.Tests.IgniteConfigurationSerializerTest+MyEventListener' /> + </localEventListener> + </localEventListeners> </igniteConfig>"; var cfg = IgniteConfiguration.FromXml(xml); @@ -333,6 +349,15 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(3, pers.SubIntervals); Assert.AreEqual(TimeSpan.FromSeconds(6), pers.RateTimeInterval); Assert.AreEqual(CheckpointWriteOrder.Random, pers.CheckpointWriteOrder); + + var listeners = cfg.LocalEventListeners; + Assert.AreEqual(2, listeners.Count); + + var rebalListener = (LocalEventListener<CacheRebalancingEvent>) listeners.First(); + Assert.AreEqual(new[] {EventType.CacheObjectPut, 81}, rebalListener.EventTypes); + Assert.AreEqual("Apache.Ignite.Core.Tests.EventsTestLocalListeners+Listener`1" + + "[Apache.Ignite.Core.Events.CacheRebalancingEvent]", + rebalListener.Listener.GetType().ToString()); } /// <summary> @@ -884,7 +909,15 @@ namespace Apache.Ignite.Core.Tests CheckpointWriteOrder = CheckpointWriteOrder.Random }, IsActiveOnStart = false, - ConsistentId = "myId123" + ConsistentId = "myId123", + LocalEventListeners = new[] + { + new LocalEventListener<IEvent> + { + EventTypes = new[] {1, 2}, + Listener = new MyEventListener() + } + } }; } @@ -1089,5 +1122,13 @@ namespace Apache.Ignite.Core.Tests throw new NotImplementedException(); } } + + public class MyEventListener : IEventListener<IEvent> + { + public bool Invoke(IEvent evt) + { + throw new NotImplementedException(); + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs index 1cb2fae..093b3d7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs @@ -68,7 +68,6 @@ namespace Apache.Ignite.Core.Tests.Plugin var ctx = prov.Context; Assert.IsNotNull(ctx.Ignite); - Assert.AreEqual(cfg, ctx.IgniteConfiguration); Assert.AreEqual("barbaz", ctx.PluginConfiguration.PluginProperty); CheckResourceInjection(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 9d4069c..67c540c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -111,6 +111,7 @@ <Compile Include="Impl\Binary\BinaryProcessorClient.cs" /> <Compile Include="Impl\Binary\IBinaryProcessor.cs" /> <Compile Include="Impl\Client\ClientStatus.cs" /> + <Compile Include="Events\LocalEventListener.cs" /> <Compile Include="Impl\IIgniteInternal.cs" /> <Compile Include="Impl\Client\Cache\CacheClient.cs" /> <Compile Include="Impl\Client\ClientOp.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryConfiguration.cs index a67244b..32d2c9d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryConfiguration.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Binary using System; using System.Collections.Generic; using System.ComponentModel; + using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using Apache.Ignite.Core.Impl.Common; @@ -58,10 +59,24 @@ namespace Apache.Ignite.Core.Binary { IgniteArgumentCheck.NotNull(cfg, "cfg"); + CopyLocalProperties(cfg); + } + + /// <summary> + /// Copies the local properties. + /// </summary> + internal void CopyLocalProperties(BinaryConfiguration cfg) + { + Debug.Assert(cfg != null); + IdMapper = cfg.IdMapper; NameMapper = cfg.NameMapper; KeepDeserialized = cfg.KeepDeserialized; - Serializer = cfg.Serializer; + + if (cfg.Serializer != null) + { + Serializer = cfg.Serializer; + } TypeConfigurations = cfg.TypeConfigurations == null ? null @@ -69,7 +84,10 @@ namespace Apache.Ignite.Core.Binary Types = cfg.Types == null ? null : cfg.Types.ToList(); - CompactFooter = cfg.CompactFooter; + if (cfg.CompactFooterInternal != null) + { + CompactFooter = cfg.CompactFooterInternal.Value; + } } /// <summary> @@ -141,29 +159,5 @@ namespace Apache.Ignite.Core.Binary { get { return _compactFooter; } } - - /// <summary> - /// Merges other config into this. - /// </summary> - internal void MergeTypes(BinaryConfiguration localConfig) - { - if (TypeConfigurations == null) - { - TypeConfigurations = localConfig.TypeConfigurations; - } - else if (localConfig.TypeConfigurations != null) - { - // Both configs are present. - // Local configuration is more complete and takes preference when it exists for a given type. - var localTypeNames = new HashSet<string>(localConfig.TypeConfigurations.Select(x => x.TypeName), - StringComparer.OrdinalIgnoreCase); - - var configs = new List<BinaryTypeConfiguration>(localConfig.TypeConfigurations); - - configs.AddRange(TypeConfigurations.Where(x=>!localTypeNames.Contains(x.TypeName))); - - TypeConfigurations = configs; - } - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs index 421f16f..9413ed5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs @@ -229,8 +229,7 @@ namespace Apache.Ignite.Core.Cache.Configuration Read(BinaryUtils.Marshaller.StartUnmarshal(stream)); } - // Plugins should be copied directly. - PluginConfigurations = other.PluginConfigurations; + CopyLocalProperties(other); } } @@ -309,8 +308,9 @@ namespace Apache.Ignite.Core.Cache.Configuration if (reader.ReadBoolean()) { // FactoryId-based plugin: skip. + reader.ReadInt(); // Skip factory id. var size = reader.ReadInt(); - reader.Stream.Seek(size, SeekOrigin.Current); + reader.Stream.Seek(size, SeekOrigin.Current); // Skip custom data. } else { @@ -413,7 +413,7 @@ namespace Apache.Ignite.Core.Cache.Configuration cachePlugin.WriteBinary(writer); - writer.Stream.WriteInt(pos, writer.Stream.Position - pos); // Write size. + writer.Stream.WriteInt(pos, writer.Stream.Position - pos - 4); // Write size. } else { @@ -429,6 +429,39 @@ namespace Apache.Ignite.Core.Cache.Configuration } /// <summary> + /// Copies the local properties (properties that are not written in Write method). + /// </summary> + internal void CopyLocalProperties(CacheConfiguration cfg) + { + Debug.Assert(cfg != null); + + PluginConfigurations = cfg.PluginConfigurations; + + if (QueryEntities != null && cfg.QueryEntities != null) + { + var entities = cfg.QueryEntities.Where(x => x != null).ToDictionary(x => GetQueryEntityKey(x), x => x); + + foreach (var entity in QueryEntities.Where(x => x != null)) + { + QueryEntity src; + + if (entities.TryGetValue(GetQueryEntityKey(entity), out src)) + { + entity.CopyLocalProperties(src); + } + } + } + } + + /// <summary> + /// Gets the query entity key. + /// </summary> + private static string GetQueryEntityKey(QueryEntity x) + { + return x.KeyTypeName + "^" + x.ValueTypeName; + } + + /// <summary> /// Validates this instance and outputs information to the log, if necessary. /// </summary> internal void Validate(ILogger log) http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryEntity.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryEntity.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryEntity.cs index ff61394..1273f08 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryEntity.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryEntity.cs @@ -334,6 +334,39 @@ namespace Apache.Ignite.Core.Cache.Configuration } /// <summary> + /// Copies the local properties (properties that are not written in Write method). + /// </summary> + internal void CopyLocalProperties(QueryEntity entity) + { + Debug.Assert(entity != null); + + if (entity._keyType != null) + { + _keyType = entity._keyType; + } + + if (entity._valueType != null) + { + _valueType = entity._valueType; + } + + if (Fields != null && entity.Fields != null) + { + var fields = entity.Fields.Where(x => x != null).ToDictionary(x => "_" + x.Name, x => x); + + foreach (var field in Fields) + { + QueryField src; + + if (fields.TryGetValue("_" + field.Name, out src)) + { + field.CopyLocalProperties(src); + } + } + } + } + + /// <summary> /// Rescans the attributes in <see cref="KeyType"/> and <see cref="ValueType"/>. /// </summary> private void RescanAttributes(Type keyType, Type valType) http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryField.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryField.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryField.cs index a1ebbd7..596837a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryField.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/QueryField.cs @@ -126,5 +126,18 @@ namespace Apache.Ignite.Core.Cache.Configuration JavaTypes.LogIndirectMappingWarning(_type, log, logInfo); } + + /// <summary> + /// Copies the local properties (properties that are not written in Write method). + /// </summary> + internal void CopyLocalProperties(QueryField field) + { + Debug.Assert(field != null); + + if (field._type != null) + { + _type = field._type; + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Events/LocalEventListener.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/LocalEventListener.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/LocalEventListener.cs new file mode 100644 index 0000000..253d166 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/LocalEventListener.cs @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Events +{ + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Impl.Binary; + + /// <summary> + /// Abstract local event listener holder for <see cref="IgniteConfiguration.LocalEventListeners"/>. + /// Use <see cref="LocalEventListener{T}"/> derived class. + /// </summary> + public abstract class LocalEventListener + { + /// <summary> + /// Initializes a new instance of the <see cref="LocalEventListener"/> class. + /// </summary> + protected internal LocalEventListener() + { + // No-op. + } + + /// <summary> + /// Gets or sets the event types. + /// </summary> + [SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly")] + public ICollection<int> EventTypes { get; set; } + + /// <summary> + /// Gets the original user listener object. + /// </summary> + internal abstract object ListenerObject { get; } + + /// <summary> + /// Invokes the specified reader. + /// </summary> + internal abstract bool Invoke(BinaryReader reader); + } + + /// <summary> + /// Generic local event listener holder, see <see cref="IgniteConfiguration.LocalEventListeners"/>. + /// </summary> + public class LocalEventListener<T> : LocalEventListener where T : IEvent + { + /// <summary> + /// Gets or sets the listener. + /// </summary> + public IEventListener<T> Listener { get; set; } + + /** <inheritdoc /> */ + internal override object ListenerObject + { + get { return Listener; } + } + + /** <inheritdoc /> */ + internal override bool Invoke(BinaryReader reader) + { + var evt = EventReader.Read<T>(reader); + + return Listener.Invoke(evt); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs index cb91ee1..a7a5ff4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs @@ -192,6 +192,12 @@ namespace Apache.Ignite.Core /** */ private bool? _isActiveOnStart; + /** Local event listeners. Stored as array to ensure index access. */ + private LocalEventListener[] _localEventListenersInternal; + + /** Map from user-defined listener to it's id. */ + private Dictionary<object, int> _localEventListenerIds; + /// <summary> /// Default network retry count. /// </summary> @@ -251,8 +257,8 @@ namespace Apache.Ignite.Core Debug.Assert(binaryReader != null); Debug.Assert(baseConfig != null); - CopyLocalProperties(baseConfig); Read(binaryReader); + CopyLocalProperties(baseConfig); } /// <summary> @@ -480,7 +486,7 @@ namespace Apache.Ignite.Core writer.WriteBoolean(false); } - // Plugins (should be last) + // Plugins (should be last). if (PluginConfigurations != null) { var pos = writer.Stream.Position; @@ -507,6 +513,46 @@ namespace Apache.Ignite.Core { writer.WriteInt(0); } + + // Local event listeners (should be last). + if (LocalEventListeners != null) + { + writer.WriteInt(LocalEventListeners.Count); + + foreach (var listener in LocalEventListeners) + { + ValidateLocalEventListener(listener); + + writer.WriteIntArray(listener.EventTypes.ToArray()); + } + } + else + { + writer.WriteInt(0); + } + } + + /// <summary> + /// Validates the local event listener. + /// </summary> + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local + // ReSharper disable once UnusedParameter.Local + private static void ValidateLocalEventListener(LocalEventListener listener) + { + if (listener == null) + { + throw new IgniteException("LocalEventListeners can't contain nulls."); + } + + if (listener.ListenerObject == null) + { + throw new IgniteException("LocalEventListener.Listener can't be null."); + } + + if (listener.EventTypes == null || listener.EventTypes.Count == 0) + { + throw new IgniteException("LocalEventListener.EventTypes can't be null or empty."); + } } /// <summary> @@ -684,13 +730,10 @@ namespace Apache.Ignite.Core if (BinaryConfiguration != null && cfg.BinaryConfiguration != null) { - BinaryConfiguration.MergeTypes(cfg.BinaryConfiguration); - } - else if (cfg.BinaryConfiguration != null) - { - BinaryConfiguration = new BinaryConfiguration(cfg.BinaryConfiguration); + BinaryConfiguration.CopyLocalProperties(cfg.BinaryConfiguration); } + SpringConfigUrl = cfg.SpringConfigUrl; JvmClasspath = cfg.JvmClasspath; JvmOptions = cfg.JvmOptions; Assemblies = cfg.Assemblies; @@ -701,6 +744,23 @@ namespace Apache.Ignite.Core JvmMaxMemoryMb = cfg.JvmMaxMemoryMb; PluginConfigurations = cfg.PluginConfigurations; AutoGenerateIgniteInstanceName = cfg.AutoGenerateIgniteInstanceName; + PeerAssemblyLoadingMode = cfg.PeerAssemblyLoadingMode; + LocalEventListeners = cfg.LocalEventListeners; + + if (CacheConfiguration != null && cfg.CacheConfiguration != null) + { + var caches = cfg.CacheConfiguration.Where(x => x != null).ToDictionary(x => "_" + x.Name, x => x); + + foreach (var cache in CacheConfiguration) + { + CacheConfiguration src; + + if (cache != null && caches.TryGetValue("_" + cache.Name, out src)) + { + cache.CopyLocalProperties(src); + } + } + } } /// <summary> @@ -856,6 +916,61 @@ namespace Apache.Ignite.Core public ICollection<int> IncludedEventTypes { get; set; } /// <summary> + /// Gets or sets pre-configured local event listeners. + /// <para /> + /// This is similar to calling <see cref="IEvents.LocalListen{T}(IEventListener{T},int[])"/>, + /// but important difference is that some events occur during startup and can be only received this way. + /// </summary> + [SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly")] + public ICollection<LocalEventListener> LocalEventListeners { get; set; } + + /// <summary> + /// Initializes the local event listeners collections. + /// </summary> + private void InitLocalEventListeners() + { + if (LocalEventListeners != null && _localEventListenersInternal == null) + { + _localEventListenersInternal = LocalEventListeners.ToArray(); + + _localEventListenerIds = new Dictionary<object, int>(); + + for (var i = 0; i < _localEventListenersInternal.Length; i++) + { + var listener = _localEventListenersInternal[i]; + ValidateLocalEventListener(listener); + _localEventListenerIds[listener.ListenerObject] = i; + } + } + } + + /// <summary> + /// Gets the local event listeners. + /// </summary> + internal LocalEventListener[] LocalEventListenersInternal + { + get + { + InitLocalEventListeners(); + + return _localEventListenersInternal; + } + } + + /// <summary> + /// Gets the local event listener ids. + /// </summary> + internal Dictionary<object, int> LocalEventListenerIds + { + get + { + InitLocalEventListeners(); + + return _localEventListenerIds; + } + } + + /// <summary> /// Gets or sets the time after which a certain metric value is considered expired. /// </summary> [DefaultValue(typeof(TimeSpan), "10675199.02:48:05.4775807")] http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index 9920956..fb3db95 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -1501,6 +1501,51 @@ <xs:documentation>Consistent globally unique node identifier which survives node restarts.</xs:documentation> </xs:annotation> </xs:element> + <xs:element name="localEventListeners"> + <xs:annotation> + <xs:documentation>Pre-configured local event listeners.</xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:sequence> + <xs:element name="localEventListener" maxOccurs="unbounded" minOccurs="0"> + <xs:annotation> + <xs:documentation>Listener holder.</xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:sequence> + <xs:element name="eventTypes"> + <xs:annotation> + <xs:documentation>Event types to listen for.</xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:sequence> + <xs:element type="xs:string" name="int" maxOccurs="unbounded" minOccurs="1"/> + </xs:sequence> + </xs:complexType> + </xs:element> + <xs:element name="listener"> + <xs:annotation> + <xs:documentation>Listener implementation.</xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:attribute type="xs:string" name="type" use="required"> + <xs:annotation> + <xs:documentation>Assembly-qualified type name.</xs:documentation> + </xs:annotation> + </xs:attribute> + </xs:complexType> + </xs:element> + </xs:sequence> + <xs:attribute type="xs:string" name="type" use="required"> + <xs:annotation> + <xs:documentation>Assembly-qualified type name.</xs:documentation> + </xs:annotation> + </xs:attribute> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> + </xs:element> </xs:all> <xs:attribute name="igniteInstanceName" type="xs:string"> <xs:annotation> http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs index 1bb6b3d..886dee9 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs @@ -213,6 +213,8 @@ namespace Apache.Ignite.Core { IgniteArgumentCheck.NotNull(cfg, "cfg"); + cfg = new IgniteConfiguration(cfg); // Create a copy so that config can be modified and reused. + lock (SyncRoot) { // 0. Init logger http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs index 8ee0c07..a2f7143 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs @@ -26,6 +26,7 @@ namespace Apache.Ignite.Core.Impl.Common using System.Linq; using System.Reflection; using System.Xml; + using Apache.Ignite.Core.Events; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Events; @@ -499,6 +500,10 @@ namespace Apache.Ignite.Core.Impl.Common property.DeclaringType == typeof (IgniteConfiguration) && property.Name == "IncludedEventTypes") return EventTypeConverter.Instance; + if (property != null && + property.DeclaringType == typeof (LocalEventListener) && property.Name == "EventTypes") + return EventTypeConverter.Instance; + if (propertyType == typeof (object)) return ObjectStringConverter.Instance; http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs index 3c7363e..55f5be8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs @@ -310,18 +310,33 @@ namespace Apache.Ignite.Core.Impl.Events { Dictionary<int, LocalHandledEventFilter> filters; - if (!_localFilters.TryGetValue(listener, out filters)) - return false; + if (_localFilters.TryGetValue(listener, out filters)) + { + var success = false; - var success = false; + // Should do this inside lock to avoid race with subscription + // ToArray is required because we are going to modify underlying dictionary during enumeration + foreach (var filter in GetLocalFilters(listener, types).ToArray()) + success |= (DoOutInOp((int) Op.StopLocalListen, filter.Handle) == True); + + return success; + } + } - // Should do this inside lock to avoid race with subscription - // ToArray is required because we are going to modify underlying dictionary during enumeration - foreach (var filter in GetLocalFilters(listener, types).ToArray()) - success |= (DoOutInOp((int) Op.StopLocalListen, filter.Handle) == True); + // Looks for a predefined filter (IgniteConfiguration.LocalEventListeners). + var ids = Ignite.Configuration.LocalEventListenerIds; - return success; + int predefinedListenerId; + if (ids != null && ids.TryGetValue(listener, out predefinedListenerId)) + { + return DoOutInOp((int) Op.StopLocalListen, w => + { + w.WriteInt(predefinedListenerId); + w.WriteIntArray(types); + }, s => s.ReadBool()); } + + return false; } /** <inheritDoc /> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs index e16c3c1..7d36e16 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs @@ -84,6 +84,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged ComputeJobExecuteLocal = 61, PluginProcessorStop = 62, PluginProcessorIgniteStop = 63, - PluginCallbackInLongLongOutLong = 68 + PluginCallbackInLongLongOutLong = 68, + EventLocalListenerApply = 69 } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index d783c38..b291b3d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -219,6 +219,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged AddHandler(UnmanagedCallbackOp.EventFilterCreate, EventFilterCreate); AddHandler(UnmanagedCallbackOp.EventFilterApply, EventFilterApply); AddHandler(UnmanagedCallbackOp.EventFilterDestroy, EventFilterDestroy); + AddHandler(UnmanagedCallbackOp.EventLocalListenerApply, EventLocalListenerApply); AddHandler(UnmanagedCallbackOp.ServiceInit, ServiceInit); AddHandler(UnmanagedCallbackOp.ServiceExecute, ServiceExecute); AddHandler(UnmanagedCallbackOp.ServiceCancel, ServiceCancel); @@ -898,6 +899,29 @@ namespace Apache.Ignite.Core.Impl.Unmanaged return 0; } + private long EventLocalListenerApply(long memPtr) + { + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) + { + var id = stream.ReadInt(); + + var listeners = _ignite.Configuration.LocalEventListenersInternal; + + if (listeners == null || id >= listeners.Length) + { + return 0; + } + + var listener = listeners[id]; + + var reader = _ignite.Marshaller.StartUnmarshal(stream); + + var res = listener.Invoke(reader); + + return res ? 1 : 0; + } + } + #endregion #region IMPLEMENTATION: SERVICES http://git-wip-us.apache.org/repos/asf/ignite/blob/b364589d/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings index 9d5b728..7d81184 100644 --- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings +++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings @@ -8,4 +8,6 @@ <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/QualifiedUsingAtNestedScope/@EntryValue">True</s:Boolean> <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String> <s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EXml_002ECodeStyle_002EFormatSettingsUpgrade_002EXmlMoveToCommonFormatterSettingsUpgrade/@EntryIndexedValue">True</s:Boolean> + <s:Boolean x:Key="/Default/Environment/UnitTesting/ShadowCopy/@EntryValue">False</s:Boolean> + <s:Boolean x:Key="/Default/Environment/UnitTesting/WrapLongLinesInUnitTestSessionOutput/@EntryValue">False</s:Boolean> </wpf:ResourceDictionary> \ No newline at end of file
