IGNITE-1662: Renamed IMessageFilter to IMessageListener.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a77dd3a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a77dd3a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a77dd3a Branch: refs/heads/ignite-1655 Commit: 2a77dd3a7f01208d7172b66f6520cfc0e6615570 Parents: 81feb95 Author: Pavel Tupitsyn <[email protected]> Authored: Thu Oct 15 12:20:58 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Oct 15 12:20:58 2015 +0300 ---------------------------------------------------------------------- .../Apache.Ignite.Core.Tests/EventsTest.cs | 2 +- .../IgniteStartStopTest.cs | 4 +- .../Apache.Ignite.Core.Tests/MessagingTest.cs | 16 +- .../Apache.Ignite.Core.csproj | 4 +- .../dotnet/Apache.Ignite.Core/Events/IEvents.cs | 2 +- .../Impl/Common/DelegateTypeDescriptor.cs | 16 +- .../Apache.Ignite.Core/Impl/Events/Events.cs | 2 +- .../Impl/Messaging/MessageFilterHolder.cs | 177 ------------------- .../Impl/Messaging/MessageListenerHolder.cs | 177 +++++++++++++++++++ .../Impl/Messaging/Messaging.cs | 22 +-- .../Impl/Portable/PortableMarshaller.cs | 2 +- .../Impl/Portable/PortableUtils.cs | 2 +- .../Impl/Unmanaged/UnmanagedCallbacks.cs | 4 +- .../Messaging/IMessageFilter.cs | 35 ---- .../Messaging/IMessageListener.cs | 38 ++++ .../Apache.Ignite.Core/Messaging/IMessaging.cs | 15 +- .../Events/LocalListener.cs | 2 +- .../Messaging/LocalListener.cs | 2 +- .../Messaging/RemoteOrderedListener.cs | 2 +- .../Messaging/RemoteUnorderedListener.cs | 2 +- 20 files changed, 266 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs index c271aa6..b325d36 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs @@ -383,7 +383,7 @@ namespace Apache.Ignite.Core.Tests var expectedType = EventType.JobStarted; var remoteFilter = portable - ? (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType) + ? (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType) : new RemoteEventFilter(expectedType); var localListener = EventsTestHelper.GetListener(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs index bd776ce..d16063f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs @@ -384,7 +384,7 @@ namespace Apache.Ignite.Core.Tests // to test race conditions during processor init on remote node var listenTask = Task.Factory.StartNew(() => { - var filter = new MessageFilter(); + var filter = new MessageListener(); while (!token.IsCancellationRequested) { @@ -410,7 +410,7 @@ namespace Apache.Ignite.Core.Tests /// Noop message filter. /// </summary> [Serializable] - private class MessageFilter : IMessageFilter<int> + private class MessageListener : IMessageListener<int> { /** <inheritdoc /> */ public bool Invoke(Guid nodeId, int message) http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs index 95e48d3..55f2e6c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs @@ -155,7 +155,7 @@ namespace Apache.Ignite.Core.Tests { var grid3GotMessage = false; - var grid3Listener = new MessageFilter<string>((id, x) => + var grid3Listener = new MessageListener<string>((id, x) => { grid3GotMessage = true; return true; @@ -199,7 +199,7 @@ namespace Apache.Ignite.Core.Tests var sharedReceived = 0; - var sharedListener = new MessageFilter<string>((id, x) => + var sharedListener = new MessageListener<string>((id, x) => { Interlocked.Increment(ref sharedReceived); Thread.MemoryBarrier(); @@ -220,7 +220,7 @@ namespace Apache.Ignite.Core.Tests var localReceived = 0; var stopLocal = 0; - var localListener = new MessageFilter<string>((id, x) => + var localListener = new MessageListener<string>((id, x) => { Interlocked.Increment(ref localReceived); Thread.MemoryBarrier(); @@ -569,9 +569,9 @@ namespace Apache.Ignite.Core.Tests /// Gets the message listener. /// </summary> /// <returns>New instance of message listener.</returns> - public static IMessageFilter<string> GetListener() + public static IMessageListener<string> GetListener() { - return new MessageFilter<string>(Listen); + return new MessageListener<string>(Listen); } /// <summary> @@ -616,7 +616,7 @@ namespace Apache.Ignite.Core.Tests /// Test message filter. /// </summary> [Serializable] - public class MessageFilter<T> : IMessageFilter<T> + public class MessageListener<T> : IMessageListener<T> { /** */ private readonly Func<Guid, T, bool> _invoke; @@ -628,10 +628,10 @@ namespace Apache.Ignite.Core.Tests #pragma warning restore 649 /// <summary> - /// Initializes a new instance of the <see cref="MessageFilter{T}"/> class. + /// Initializes a new instance of the <see cref="MessageListener{T}"/> class. /// </summary> /// <param name="invoke">The invoke delegate.</param> - public MessageFilter(Func<Guid, T, bool> invoke) + public MessageListener(Func<Guid, T, bool> invoke) { _invoke = invoke; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 848ce49..a10a0a5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -238,7 +238,7 @@ <Compile Include="Impl\Memory\PlatformPooledMemory.cs" /> <Compile Include="Impl\Memory\PlatformRawMemory.cs" /> <Compile Include="Impl\Memory\PlatformUnpooledMemory.cs" /> - <Compile Include="Impl\Messaging\MessageFilterHolder.cs" /> + <Compile Include="Impl\Messaging\MessageListenerHolder.cs" /> <Compile Include="Impl\Messaging\Messaging.cs" /> <Compile Include="Impl\Messaging\MessagingAsync.cs" /> <Compile Include="Impl\NativeMethods.cs" /> @@ -307,7 +307,7 @@ <Compile Include="Impl\Unmanaged\UnmanagedUtils.cs" /> <Compile Include="Lifecycle\ILifecycleBean.cs" /> <Compile Include="Lifecycle\LifecycleEventType.cs" /> - <Compile Include="Messaging\IMessageFilter.cs" /> + <Compile Include="Messaging\IMessageListener.cs" /> <Compile Include="Messaging\IMessaging.cs" /> <Compile Include="Portable\IPortableBuilder.cs" /> <Compile Include="Portable\IPortableIdMapper.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs index be38104..b2f07d4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs @@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Events /// </returns> [AsyncSupported] Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true, - IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types) + IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types) where T : IEvent; /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs index 8b97884..0f2b3c1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs @@ -58,7 +58,7 @@ namespace Apache.Ignite.Core.Impl.Common _cacheEntryProcessor; /** */ - private readonly Func<object, Guid, object, bool> _messageFilter; + private readonly Func<object, Guid, object, bool> _messageLsnr; /** */ private readonly Func<object, object> _computeJobExecute; @@ -136,13 +136,13 @@ namespace Apache.Ignite.Core.Impl.Common } /// <summary> - /// Gets the <see cref="IMessageFilter{T}" /> invocator. + /// Gets the <see cref="IMessageListener{T}" /> invocator. /// </summary> /// <param name="type">Type.</param> /// <returns>Precompiled invocator delegate.</returns> - public static Func<object, Guid, object, bool> GetMessageFilter(Type type) + public static Func<object, Guid, object, bool> GetMessageListener(Type type) { - return Get(type)._messageFilter; + return Get(type)._messageLsnr; } /// <summary> @@ -286,18 +286,18 @@ namespace Apache.Ignite.Core.Impl.Common _streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType, new[] {iface}); } - else if (genericTypeDefinition == typeof (IMessageFilter<>)) + else if (genericTypeDefinition == typeof (IMessageListener<>)) { - ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>)); + ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IMessageListener<>)); var arg = iface.GetGenericArguments()[0]; - _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface, + _messageLsnr = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface, new[] { typeof(Guid), arg }, new[] { false, true, false }); } else if (genericTypeDefinition == typeof (IComputeJob<>)) { - ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>)); + ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IComputeJob<>)); _computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0], methodName: "Execute"); http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs index f4cc341..08936e4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs @@ -164,7 +164,7 @@ namespace Apache.Ignite.Core.Impl.Events /** <inheritDoc /> */ public Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true, - IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null) + IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null) where T : IEvent { return RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, TypesToArray(types)); http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs deleted file mode 100644 index 8666e9b..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Messaging -{ - using System; - using System.Diagnostics; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Handle; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Messaging; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// Non-generic portable filter wrapper. - /// </summary> - internal class MessageFilterHolder : IPortableWriteAware, IHandle - { - /** Invoker function that takes key and value and invokes wrapped IMessageFilter */ - private readonly Func<Guid, object, bool> _invoker; - - /** Current Ignite instance. */ - private readonly Ignite _ignite; - - /** Underlying filter. */ - private readonly object _filter; - - /// <summary> - /// Initializes a new instance of the <see cref="MessageFilterHolder" /> class. - /// </summary> - /// <param name="grid">Grid.</param> - /// <param name="filter">The <see cref="IMessageFilter{T}" /> to wrap.</param> - /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageFilter.</param> - private MessageFilterHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker) - { - Debug.Assert(filter != null); - Debug.Assert(invoker != null); - - _invoker = invoker; - - _filter = filter; - - // 1. Set fields. - Debug.Assert(grid != null); - - _ignite = grid; - _invoker = invoker; - - // 2. Perform injections. - ResourceProcessor.Inject(filter, grid); - } - - /// <summary> - /// Invoke the filter. - /// </summary> - /// <param name="input">Input.</param> - /// <returns></returns> - public int Invoke(IPortableStream input) - { - var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader(); - - var nodeId = rawReader.ReadGuid(); - - Debug.Assert(nodeId != null); - - return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0; - } - - /// <summary> - /// Wrapped <see cref="IMessageFilter{T}" />. - /// </summary> - public object Filter - { - get { return _filter; } - } - - /// <summary> - /// Destroy callback. - /// </summary> - public Action DestroyAction { private get; set; } - - /** <inheritDoc /> */ - public void Release() - { - if (DestroyAction != null) - DestroyAction(); - } - - /** <inheritDoc /> */ - public bool Released - { - get { return false; } // Multiple releases are allowed. - } - - /// <summary> - /// Creates local holder instance. - /// </summary> - /// <param name="grid">Ignite instance.</param> - /// <param name="filter">Filter.</param> - /// <returns> - /// New instance of <see cref="MessageFilterHolder" /> - /// </returns> - public static MessageFilterHolder CreateLocal<T>(Ignite grid, IMessageFilter<T> filter) - { - Debug.Assert(filter != null); - - return new MessageFilterHolder(grid, filter, (id, msg) => filter.Invoke(id, (T)msg)); - } - - /// <summary> - /// Creates remote holder instance. - /// </summary> - /// <param name="grid">Grid.</param> - /// <param name="memPtr">Memory pointer.</param> - /// <returns>Deserialized instance of <see cref="MessageFilterHolder"/></returns> - public static MessageFilterHolder CreateRemote(Ignite grid, long memPtr) - { - Debug.Assert(grid != null); - - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - return grid.Marshaller.Unmarshal<MessageFilterHolder>(stream); - } - } - - /// <summary> - /// Gets the invoker func. - /// </summary> - private static Func<Guid, object, bool> GetInvoker(object pred) - { - var func = DelegateTypeDescriptor.GetMessageFilter(pred.GetType()); - - return (id, msg) => func(pred, id, msg); - } - - /** <inheritdoc /> */ - public void WritePortable(IPortableWriter writer) - { - var writer0 = (PortableWriterImpl)writer.GetRawWriter(); - - writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter)); - } - - /// <summary> - /// Initializes a new instance of the <see cref="MessageFilterHolder"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public MessageFilterHolder(IPortableReader reader) - { - var reader0 = (PortableReaderImpl)reader.GetRawReader(); - - _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0); - - _invoker = GetInvoker(_filter); - - _ignite = reader0.Marshaller.Ignite; - - ResourceProcessor.Inject(_filter, _ignite); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs new file mode 100644 index 0000000..412a84e --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Messaging +{ + using System; + using System.Diagnostics; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Handle; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Messaging; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Non-generic portable message listener wrapper. + /// </summary> + internal class MessageListenerHolder : IPortableWriteAware, IHandle + { + /** Invoker function that takes key and value and invokes wrapped IMessageListener */ + private readonly Func<Guid, object, bool> _invoker; + + /** Current Ignite instance. */ + private readonly Ignite _ignite; + + /** Underlying filter. */ + private readonly object _filter; + + /// <summary> + /// Initializes a new instance of the <see cref="MessageListenerHolder" /> class. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="filter">The <see cref="IMessageListener{T}" /> to wrap.</param> + /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageListener.</param> + private MessageListenerHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker) + { + Debug.Assert(filter != null); + Debug.Assert(invoker != null); + + _invoker = invoker; + + _filter = filter; + + // 1. Set fields. + Debug.Assert(grid != null); + + _ignite = grid; + _invoker = invoker; + + // 2. Perform injections. + ResourceProcessor.Inject(filter, grid); + } + + /// <summary> + /// Invoke the filter. + /// </summary> + /// <param name="input">Input.</param> + /// <returns></returns> + public int Invoke(IPortableStream input) + { + var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader(); + + var nodeId = rawReader.ReadGuid(); + + Debug.Assert(nodeId != null); + + return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0; + } + + /// <summary> + /// Wrapped <see cref="IMessageListener{T}" />. + /// </summary> + public object Filter + { + get { return _filter; } + } + + /// <summary> + /// Destroy callback. + /// </summary> + public Action DestroyAction { private get; set; } + + /** <inheritDoc /> */ + public void Release() + { + if (DestroyAction != null) + DestroyAction(); + } + + /** <inheritDoc /> */ + public bool Released + { + get { return false; } // Multiple releases are allowed. + } + + /// <summary> + /// Creates local holder instance. + /// </summary> + /// <param name="grid">Ignite instance.</param> + /// <param name="listener">Filter.</param> + /// <returns> + /// New instance of <see cref="MessageListenerHolder" /> + /// </returns> + public static MessageListenerHolder CreateLocal<T>(Ignite grid, IMessageListener<T> listener) + { + Debug.Assert(listener != null); + + return new MessageListenerHolder(grid, listener, (id, msg) => listener.Invoke(id, (T)msg)); + } + + /// <summary> + /// Creates remote holder instance. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="memPtr">Memory pointer.</param> + /// <returns>Deserialized instance of <see cref="MessageListenerHolder"/></returns> + public static MessageListenerHolder CreateRemote(Ignite grid, long memPtr) + { + Debug.Assert(grid != null); + + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) + { + return grid.Marshaller.Unmarshal<MessageListenerHolder>(stream); + } + } + + /// <summary> + /// Gets the invoker func. + /// </summary> + private static Func<Guid, object, bool> GetInvoker(object pred) + { + var func = DelegateTypeDescriptor.GetMessageListener(pred.GetType()); + + return (id, msg) => func(pred, id, msg); + } + + /** <inheritdoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl)writer.GetRawWriter(); + + writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter)); + } + + /// <summary> + /// Initializes a new instance of the <see cref="MessageListenerHolder"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public MessageListenerHolder(IPortableReader reader) + { + var reader0 = (PortableReaderImpl)reader.GetRawReader(); + + _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0); + + _invoker = GetInvoker(_filter); + + _ignite = reader0.Marshaller.Ignite; + + ResourceProcessor.Inject(_filter, _ignite); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs index 8170a91..4ccbc3e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs @@ -113,17 +113,17 @@ namespace Apache.Ignite.Core.Impl.Messaging } /** <inheritdoc /> */ - public void LocalListen<T>(IMessageFilter<T> filter, object topic = null) + public void LocalListen<T>(IMessageListener<T> listener, object topic = null) { - IgniteArgumentCheck.NotNull(filter, "filter"); + IgniteArgumentCheck.NotNull(listener, "filter"); - ResourceProcessor.Inject(filter, _ignite); + ResourceProcessor.Inject(listener, _ignite); lock (_funcMap) { - var key = GetKey(filter, topic); + var key = GetKey(listener, topic); - MessageFilterHolder filter0 = MessageFilterHolder.CreateLocal(_ignite, filter); + MessageListenerHolder filter0 = MessageListenerHolder.CreateLocal(_ignite, listener); var filterHnd = _ignite.HandleRegistry.Allocate(filter0); @@ -155,16 +155,16 @@ namespace Apache.Ignite.Core.Impl.Messaging } /** <inheritdoc /> */ - public void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null) + public void StopLocalListen<T>(IMessageListener<T> listener, object topic = null) { - IgniteArgumentCheck.NotNull(filter, "filter"); + IgniteArgumentCheck.NotNull(listener, "filter"); long filterHnd; bool removed; lock (_funcMap) { - removed = _funcMap.TryRemove(GetKey(filter, topic), out filterHnd); + removed = _funcMap.TryRemove(GetKey(listener, topic), out filterHnd); } if (removed) @@ -178,11 +178,11 @@ namespace Apache.Ignite.Core.Impl.Messaging } /** <inheritdoc /> */ - public Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null) + public Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null) { - IgniteArgumentCheck.NotNull(filter, "filter"); + IgniteArgumentCheck.NotNull(listener, "filter"); - var filter0 = MessageFilterHolder.CreateLocal(_ignite, filter); + var filter0 = MessageListenerHolder.CreateLocal(_ignite, listener); var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0); try http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs index 67d8f2b..6499946 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs @@ -516,7 +516,7 @@ namespace Apache.Ignite.Core.Impl.Portable AddSystemType(PortableUtils.TypeSerializableHolder, w => new SerializableObjectHolder(w)); AddSystemType(PortableUtils.TypeCacheEntryProcessorHolder, w => new CacheEntryProcessorHolder(w)); AddSystemType(PortableUtils.TypeCacheEntryPredicateHolder, w => new CacheEntryFilterHolder(w)); - AddSystemType(PortableUtils.TypeMessageFilterHolder, w => new MessageFilterHolder(w)); + AddSystemType(PortableUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w)); AddSystemType(PortableUtils.TypePortableOrSerializableHolder, w => new PortableOrSerializableObjectHolder(w)); AddSystemType(PortableUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs index f80a199..c7be496 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs @@ -200,7 +200,7 @@ namespace Apache.Ignite.Core.Impl.Portable public const byte TypeProductLicense = 78; /** Type: message filter holder. */ - public const byte TypeMessageFilterHolder = 92; + public const byte TypeMessageListenerHolder = 92; /** Type: message filter holder. */ public const byte TypePortableOrSerializableHolder = 93; http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index f9949f3..3295904 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -788,7 +788,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged { return SafeCall(() => { - MessageFilterHolder holder = MessageFilterHolder.CreateRemote(_ignite, memPtr); + MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr); return _ignite.HandleRegistry.AllocateSafe(holder); }); @@ -798,7 +798,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged { return SafeCall(() => { - var holder = _ignite.HandleRegistry.Get<MessageFilterHolder>(ptr, false); + var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false); if (holder == null) return 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs deleted file mode 100644 index 456c5e6..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Messaging -{ - using System; - - /// <summary> - /// Represents messaging filter predicate. - /// </summary> - public interface IMessageFilter<in T> - { - /// <summary> - /// Returns a value indicating whether provided message and node id satisfy this predicate. - /// </summary> - /// <param name="nodeId">Node identifier.</param> - /// <param name="message">Message.</param> - /// <returns>Value indicating whether provided message and node id satisfy this predicate.</returns> - bool Invoke(Guid nodeId, T message); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs new file mode 100644 index 0000000..393a670 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Messaging +{ + using System; + + /// <summary> + /// Represents messaging filter predicate. + /// </summary> + public interface IMessageListener<in T> + { + /// <summary> + /// Invokes the message listener when a message arrives. + /// </summary> + /// <param name="nodeId">Message source node identifier.</param> + /// <param name="message">Message.</param> + /// <returns> + /// Value indicating whether this instance should remain subscribed. + /// Returning <c>false</c> will unsubscribe this message listener from further notifications. + /// </returns> + bool Invoke(Guid nodeId, T message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs index 96f46b9..f846745 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs @@ -67,19 +67,19 @@ namespace Apache.Ignite.Core.Messaging /// node within the cluster group will send a message for a given topic to this node. Local listen /// subscription will happen regardless of whether local node belongs to this cluster group or not. /// </summary> - /// <param name="filter"> + /// <param name="listener"> /// Predicate that is called on each received message. If predicate returns false, /// then it will be unsubscribed from any further notifications. /// </param> /// <param name="topic">Topic to subscribe to.</param> - void LocalListen<T>(IMessageFilter<T> filter, object topic = null); + void LocalListen<T>(IMessageListener<T> listener, object topic = null); /// <summary> /// Unregisters local listener for given topic on local node only. /// </summary> - /// <param name="filter">Listener predicate.</param> + /// <param name="listener">Listener predicate.</param> /// <param name="topic">Topic to unsubscribe from.</param> - void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null); + void StopLocalListen<T>(IMessageListener<T> listener, object topic = null); /// <summary> /// Adds a message listener for a given topic to all nodes in the cluster group (possibly including @@ -87,13 +87,16 @@ namespace Apache.Ignite.Core.Messaging /// group can send a message for a given topic and all nodes within the cluster group will receive /// listener notifications. /// </summary> - /// <param name="filter">Listener predicate.</param> + /// <param name="listener"> + /// Predicate that is called on each received message. If predicate returns false, + /// then it will be unsubscribed from any further notifications. + /// </param> /// <param name="topic">Topic to unsubscribe from.</param> /// <returns> /// Operation ID that can be passed to <see cref="StopRemoteListen"/> method to stop listening. /// </returns> [AsyncSupported] - Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null); + Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null); /// <summary> /// Unregisters all listeners identified with provided operation ID on all nodes in the cluster group. http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs index 5cdb20c..067bd2a 100644 --- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs +++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs index 7659bb4..591d426 100644 --- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs +++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs @@ -24,7 +24,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging /// <summary> /// Local message listener which signals countdown event on each received message. /// </summary> - public class LocalListener : IMessageFilter<int> + public class LocalListener : IMessageListener<int> { /** Countdown event. */ private readonly CountdownEvent _countdown; http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs index 8ae5ac1..85538c2 100644 --- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs +++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs @@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging /// Listener for Ordered topic. /// </summary> [Serializable] - public class RemoteOrderedListener : IMessageFilter<int> + public class RemoteOrderedListener : IMessageListener<int> { /** Injected Ignite instance. */ [InstanceResource] http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs index 166dbd6..ab23e8b 100644 --- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs +++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs @@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging /// Listener for Unordered topic. /// </summary> [Serializable] - public class RemoteUnorderedListener : IMessageFilter<int> + public class RemoteUnorderedListener : IMessageListener<int> { /** Injected Ignite instance. */ [InstanceResource]
