http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs index 094c6a5..f151763 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs @@ -15,13 +15,12 @@ * limitations under the License. */ -using Apache.Ignite.Core.Impl.Portable.IO; - namespace Apache.Ignite.Core.Impl.Memory { using System; using System.IO; using System.Text; + using Apache.Ignite.Core.Impl.Portable.IO; /// <summary> /// Platform memory stream.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryUtils.cs index fc942a0..2b0277a 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryUtils.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryUtils.cs @@ -21,7 +21,7 @@ namespace Apache.Ignite.Core.Impl.Memory using System.Diagnostics.CodeAnalysis; using System.Reflection; using System.Runtime.InteropServices; - + /// <summary> /// Utility methods for platform memory management. /// </summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformPooledMemory.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformPooledMemory.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformPooledMemory.cs index 7709ca4..206df4b 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformPooledMemory.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformPooledMemory.cs @@ -35,7 +35,7 @@ namespace Apache.Ignite.Core.Impl.Memory /// <param name="memPtr">Memory pointer.</param> public PlatformPooledMemory(PlatformMemoryPool pool, long memPtr) : base(memPtr) { - this._pool = pool; + _pool = pool; } /** <inheritdoc /> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs new file mode 100644 index 0000000..21c66bf --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Messaging +{ + using System; + using System.Diagnostics; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Handle; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Messaging; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Non-generic portable filter wrapper. + /// </summary> + internal class MessageFilterHolder : IPortableWriteAware, IHandle + { + /** Invoker function that takes key and value and invokes wrapped IMessageFilter */ + private readonly Func<Guid, object, bool> _invoker; + + /** Current Ignite instance. */ + private readonly Ignite _ignite; + + /** Underlying filter. */ + private readonly object _filter; + + /// <summary> + /// Initializes a new instance of the <see cref="MessageFilterHolder" /> class. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="filter">The <see cref="IMessageFilter{T}" /> to wrap.</param> + /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageFilter.</param> + private MessageFilterHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker) + { + Debug.Assert(filter != null); + Debug.Assert(invoker != null); + + _invoker = invoker; + + _filter = filter; + + // 1. Set fields. + Debug.Assert(grid != null); + + _ignite = grid; + _invoker = invoker; + + // 2. Perform injections. + ResourceProcessor.Inject(filter, grid); + } + + /// <summary> + /// Invoke the filter. + /// </summary> + /// <param name="input">Input.</param> + /// <returns></returns> + public int Invoke(IPortableStream input) + { + var rawReader = _ignite.Marshaller.StartUnmarshal(input).RawReader(); + + var nodeId = rawReader.ReadGuid(); + + Debug.Assert(nodeId != null); + + return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0; + } + + /// <summary> + /// Wrapped <see cref="IMessageFilter{T}" />. + /// </summary> + public object Filter + { + get { return _filter; } + } + + /// <summary> + /// Destroy callback. + /// </summary> + public Action DestroyAction { private get; set; } + + /** <inheritDoc /> */ + public void Release() + { + if (DestroyAction != null) + DestroyAction(); + } + + /** <inheritDoc /> */ + public bool Released + { + get { return false; } // Multiple releases are allowed. + } + + /// <summary> + /// Creates local holder instance. + /// </summary> + /// <param name="grid">Ignite instance.</param> + /// <param name="filter">Filter.</param> + /// <returns> + /// New instance of <see cref="MessageFilterHolder" /> + /// </returns> + public static MessageFilterHolder CreateLocal<T>(Ignite grid, IMessageFilter<T> filter) + { + Debug.Assert(filter != null); + + return new MessageFilterHolder(grid, filter, (id, msg) => filter.Invoke(id, (T)msg)); + } + + /// <summary> + /// Creates remote holder instance. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="memPtr">Memory pointer.</param> + /// <returns>Deserialized instance of <see cref="MessageFilterHolder"/></returns> + public static MessageFilterHolder CreateRemote(Ignite grid, long memPtr) + { + Debug.Assert(grid != null); + + var stream = IgniteManager.Memory.Get(memPtr).Stream(); + + var holder = grid.Marshaller.Unmarshal<MessageFilterHolder>(stream); + + return holder; + } + + /// <summary> + /// Gets the invoker func. + /// </summary> + private static Func<Guid, object, bool> GetInvoker(object pred) + { + var func = DelegateTypeDescriptor.GetMessageFilter(pred.GetType()); + + return (id, msg) => func(pred, id, msg); + } + + /** <inheritdoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl)writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, Filter); + } + + /// <summary> + /// Initializes a new instance of the <see cref="MessageFilterHolder"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public MessageFilterHolder(IPortableReader reader) + { + var reader0 = (PortableReaderImpl)reader.RawReader(); + + _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0); + + _invoker = GetInvoker(_filter); + + _ignite = reader0.Marshaller.Ignite; + + ResourceProcessor.Inject(_filter, _ignite); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs new file mode 100644 index 0000000..e8c4b4b --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Messaging +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Collections; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Messaging; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Messaging functionality. + /// </summary> + internal class Messaging : PlatformTarget, IMessaging + { + /// <summary> + /// Opcodes. + /// </summary> + private enum Op + { + LocalListen = 1, + RemoteListen = 2, + Send = 3, + SendMulti = 4, + SendOrdered = 5, + StopLocalListen = 6, + StopRemoteListen = 7 + } + + /** Map from user (func+topic) -> id, needed for unsubscription. */ + private readonly MultiValueDictionary<KeyValuePair<object, object>, long> _funcMap = + new MultiValueDictionary<KeyValuePair<object, object>, long>(); + + /** Grid */ + private readonly Ignite _ignite; + + /// <summary> + /// Initializes a new instance of the <see cref="Messaging" /> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="prj">Cluster group.</param> + public Messaging(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup prj) + : base(target, marsh) + { + Debug.Assert(prj != null); + + ClusterGroup = prj; + + _ignite = (Ignite) prj.Ignite; + } + + /** <inheritdoc /> */ + public IClusterGroup ClusterGroup { get; private set; } + + /** <inheritdoc /> */ + public void Send(object message, object topic = null) + { + IgniteArgumentCheck.NotNull(message, "message"); + + DoOutOp((int) Op.Send, topic, message); + } + + /** <inheritdoc /> */ + public void Send(IEnumerable messages, object topic = null) + { + IgniteArgumentCheck.NotNull(messages, "messages"); + + DoOutOp((int) Op.SendMulti, writer => + { + writer.Write(topic); + + WriteEnumerable(writer, messages.OfType<object>()); + }); + } + + /** <inheritdoc /> */ + public void SendOrdered(object message, object topic = null, TimeSpan? timeout = null) + { + IgniteArgumentCheck.NotNull(message, "message"); + + DoOutOp((int) Op.SendOrdered, writer => + { + writer.Write(topic); + writer.Write(message); + + writer.WriteLong((long)(timeout == null ? 0 : timeout.Value.TotalMilliseconds)); + }); + } + + /** <inheritdoc /> */ + public void LocalListen<T>(IMessageFilter<T> filter, object topic = null) + { + IgniteArgumentCheck.NotNull(filter, "filter"); + + ResourceProcessor.Inject(filter, _ignite); + + lock (_funcMap) + { + var key = GetKey(filter, topic); + + MessageFilterHolder filter0 = MessageFilterHolder.CreateLocal(_ignite, filter); + + var filterHnd = _ignite.HandleRegistry.Allocate(filter0); + + filter0.DestroyAction = () => + { + lock (_funcMap) + { + _funcMap.Remove(key, filterHnd); + } + }; + + try + { + DoOutOp((int) Op.LocalListen, writer => + { + writer.WriteLong(filterHnd); + writer.Write(topic); + }); + } + catch (Exception) + { + _ignite.HandleRegistry.Release(filterHnd); + + throw; + } + + _funcMap.Add(key, filterHnd); + } + } + + /** <inheritdoc /> */ + public void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null) + { + IgniteArgumentCheck.NotNull(filter, "filter"); + + long filterHnd; + bool removed; + + lock (_funcMap) + { + removed = _funcMap.TryRemove(GetKey(filter, topic), out filterHnd); + } + + if (removed) + { + DoOutOp((int) Op.StopLocalListen, writer => + { + writer.WriteLong(filterHnd); + writer.Write(topic); + }); + } + } + + /** <inheritdoc /> */ + public Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null) + { + IgniteArgumentCheck.NotNull(filter, "filter"); + + var filter0 = MessageFilterHolder.CreateLocal(_ignite, filter); + var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0); + + try + { + Guid id = Guid.Empty; + + DoOutInOp((int) Op.RemoteListen, writer => + { + writer.Write(filter0); + writer.WriteLong(filterHnd); + writer.Write(topic); + }, + input => + { + var id0 = Marshaller.StartUnmarshal(input).RawReader().ReadGuid(); + + Debug.Assert(IsAsync || id0.HasValue); + + if (id0.HasValue) + id = id0.Value; + }); + + return id; + } + catch (Exception) + { + _ignite.HandleRegistry.Release(filterHnd); + + throw; + } + } + + /** <inheritdoc /> */ + public void StopRemoteListen(Guid opId) + { + DoOutOp((int) Op.StopRemoteListen, writer => + { + writer.WriteGuid(opId); + }); + } + + /** <inheritdoc /> */ + public virtual IMessaging WithAsync() + { + return new MessagingAsync(UU.MessagingWithASync(Target), Marshaller, ClusterGroup); + } + + /** <inheritdoc /> */ + public virtual bool IsAsync + { + get { return false; } + } + + /** <inheritdoc /> */ + public virtual IFuture GetFuture() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /** <inheritdoc /> */ + public virtual IFuture<TResult> GetFuture<TResult>() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /// <summary> + /// Gets the key for user-provided filter and topic. + /// </summary> + /// <param name="filter">Filter.</param> + /// <param name="topic">Topic.</param> + /// <returns>Compound dictionary key.</returns> + private static KeyValuePair<object, object> GetKey(object filter, object topic) + { + return new KeyValuePair<object, object>(filter, topic); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessagingAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessagingAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessagingAsync.cs new file mode 100644 index 0000000..e899d4e --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Messaging/MessagingAsync.cs @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Messaging +{ + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Messaging; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Async messaging implementation. + /// </summary> + internal class MessagingAsync : Messaging + { + /// <summary> + /// Initializes a new instance of the <see cref="MessagingAsync" /> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="prj">Cluster group.</param> + public MessagingAsync(IUnmanagedTarget target, PortableMarshaller marsh, + IClusterGroup prj) : base(target, marsh, prj) + { + // No-op. + } + + /** <inheritdoc /> */ + public override IMessaging WithAsync() + { + return this; + } + + /** <inheritdoc /> */ + public override bool IsAsync + { + get { return true; } + } + + /** <inheritdoc /> */ + public override IFuture GetFuture() + { + return GetFuture<object>(); + } + + /** <inheritdoc /> */ + public override IFuture<T> GetFuture<T>() + { + return GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/NativeMethods.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/NativeMethods.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/NativeMethods.cs new file mode 100644 index 0000000..6e25e7e --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/NativeMethods.cs @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl +{ + using System; + using System.Runtime.InteropServices; + + /// <summary> + /// Native methods. + /// </summary> + internal static class NativeMethods + { + /// <summary> + /// Load DLL with WinAPI. + /// </summary> + /// <param name="path">Path to dll.</param> + /// <returns></returns> + [DllImport("kernel32.dll", SetLastError = true, CharSet = CharSet.Ansi, BestFitMapping = false, + ThrowOnUnmappableChar = true)] + internal static extern IntPtr LoadLibrary(string path); + + /// <summary> + /// Get procedure address with WinAPI. + /// </summary> + /// <param name="ptr">DLL pointer.</param> + /// <param name="name">Procedure name.</param> + /// <returns>Procedure address.</returns> + [DllImport("kernel32.dll", SetLastError = true, CharSet = CharSet.Ansi, BestFitMapping = false, + ThrowOnUnmappableChar = true)] + internal static extern IntPtr GetProcAddress(IntPtr ptr, string name); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs new file mode 100644 index 0000000..67f631a --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl +{ + using System; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.IO; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Memory; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Portable.Metadata; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Portable; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Base class for interop targets. + /// </summary> + [SuppressMessage("ReSharper", "LocalVariableHidesMember")] + internal abstract class PlatformTarget + { + /** */ + protected const int True = 1; + + /** */ + private const int OpMeta = -1; + + /** */ + public const int OpNone = -2; + + /** */ + private static readonly Dictionary<Type, FutureType> IgniteFutureTypeMap + = new Dictionary<Type, FutureType> + { + {typeof(bool), FutureType.Bool}, + {typeof(byte), FutureType.Byte}, + {typeof(char), FutureType.Char}, + {typeof(double), FutureType.Double}, + {typeof(float), FutureType.Float}, + {typeof(int), FutureType.Int}, + {typeof(long), FutureType.Long}, + {typeof(short), FutureType.Short} + }; + + /** Unmanaged target. */ + private readonly IUnmanagedTarget _target; + + /** Marshaller. */ + private readonly PortableMarshaller _marsh; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + protected PlatformTarget(IUnmanagedTarget target, PortableMarshaller marsh) + { + _target = target; + _marsh = marsh; + } + + /// <summary> + /// Unmanaged target. + /// </summary> + internal IUnmanagedTarget Target + { + get { return _target; } + } + + /// <summary> + /// Marshaller. + /// </summary> + internal PortableMarshaller Marshaller + { + get { return _marsh; } + } + + #region Static Helpers + + /// <summary> + /// Write collection. + /// </summary> + /// <param name="writer">Portable writer.</param> + /// <param name="vals">Values.</param> + /// <returns>The same writer for chaining.</returns> + protected static PortableWriterImpl WriteCollection<T>(PortableWriterImpl writer, ICollection<T> vals) + { + return WriteCollection<T, T>(writer, vals, null); + } + + /// <summary> + /// Write nullable collection. + /// </summary> + /// <param name="writer">Portable writer.</param> + /// <param name="vals">Values.</param> + /// <returns>The same writer for chaining.</returns> + protected static PortableWriterImpl WriteNullableCollection<T>(PortableWriterImpl writer, ICollection<T> vals) + { + return WriteNullable(writer, vals, WriteCollection); + } + + /// <summary> + /// Write collection. + /// </summary> + /// <param name="writer">Portable writer.</param> + /// <param name="vals">Values.</param> + /// <param name="selector">A transform function to apply to each element.</param> + /// <returns>The same writer for chaining.</returns> + protected static PortableWriterImpl WriteCollection<T1, T2>(PortableWriterImpl writer, + ICollection<T1> vals, Func<T1, T2> selector) + { + writer.WriteInt(vals.Count); + + if (selector == null) + { + foreach (var val in vals) + writer.Write(val); + } + else + { + foreach (var val in vals) + writer.Write(selector(val)); + } + + return writer; + } + + /// <summary> + /// Write enumerable. + /// </summary> + /// <param name="writer">Portable writer.</param> + /// <param name="vals">Values.</param> + /// <returns>The same writer for chaining.</returns> + protected static PortableWriterImpl WriteEnumerable<T>(PortableWriterImpl writer, IEnumerable<T> vals) + { + return WriteEnumerable<T, T>(writer, vals, null); + } + + /// <summary> + /// Write enumerable. + /// </summary> + /// <param name="writer">Portable writer.</param> + /// <param name="vals">Values.</param> + /// <param name="selector">A transform function to apply to each element.</param> + /// <returns>The same writer for chaining.</returns> + protected static PortableWriterImpl WriteEnumerable<T1, T2>(PortableWriterImpl writer, + IEnumerable<T1> vals, Func<T1, T2> selector) + { + var col = vals as ICollection<T1>; + + if (col != null) + return WriteCollection(writer, col, selector); + + var stream = writer.Stream; + + var pos = stream.Position; + + stream.Seek(4, SeekOrigin.Current); + + var size = 0; + + if (selector == null) + { + foreach (var val in vals) + { + writer.Write(val); + + size++; + } + } + else + { + foreach (var val in vals) + { + writer.Write(selector(val)); + + size++; + } + } + + stream.WriteInt(pos, size); + + return writer; + } + + /// <summary> + /// Write dictionary. + /// </summary> + /// <param name="writer">Portable writer.</param> + /// <param name="vals">Values.</param> + /// <returns>The same writer.</returns> + protected static PortableWriterImpl WriteDictionary<T1, T2>(PortableWriterImpl writer, + IDictionary<T1, T2> vals) + { + writer.WriteInt(vals.Count); + + foreach (KeyValuePair<T1, T2> pair in vals) + { + writer.Write(pair.Key); + writer.Write(pair.Value); + } + + return writer; + } + + /// <summary> + /// Write a nullable item. + /// </summary> + /// <param name="writer">Portable writer.</param> + /// <param name="item">Item.</param> + /// <param name="writeItem">Write action to perform on item when it is not null.</param> + /// <returns>The same writer for chaining.</returns> + protected static PortableWriterImpl WriteNullable<T>(PortableWriterImpl writer, T item, + Func<PortableWriterImpl, T, PortableWriterImpl> writeItem) + { + if (item == null) + { + writer.WriteBoolean(false); + + return writer; + } + + writer.WriteBoolean(true); + + return writeItem(writer, item); + } + + #endregion + + #region OUT operations + + /// <summary> + /// Perform out operation. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="action">Action to be performed on the stream.</param> + /// <returns></returns> + protected long DoOutOp(int type, Action<IPortableStream> action) + { + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + action(stream); + + return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + } + } + + /// <summary> + /// Perform out operation. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="action">Action to be performed on the stream.</param> + /// <returns></returns> + protected long DoOutOp(int type, Action<PortableWriterImpl> action) + { + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + var writer = _marsh.StartMarshal(stream); + + action(writer); + + FinishMarshal(writer); + + return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + } + } + + /// <summary> + /// Perform simple output operation accepting single argument. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="val1">Value.</param> + /// <returns>Result.</returns> + protected long DoOutOp<T1>(int type, T1 val1) + { + return DoOutOp(type, writer => + { + writer.Write(val1); + }); + } + + /// <summary> + /// Perform simple output operation accepting two arguments. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="val1">Value 1.</param> + /// <param name="val2">Value 2.</param> + /// <returns>Result.</returns> + protected long DoOutOp<T1, T2>(int type, T1 val1, T2 val2) + { + return DoOutOp(type, writer => + { + writer.Write(val1); + writer.Write(val2); + }); + } + + /// <summary> + /// Perform simple output operation accepting three arguments. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="val1">Value 1.</param> + /// <param name="val2">Value 2.</param> + /// <param name="val3">Value 3.</param> + /// <returns>Result.</returns> + protected long DoOutOp<T1, T2, T3>(int type, T1 val1, T2 val2, T3 val3) + { + return DoOutOp(type, writer => + { + writer.Write(val1); + writer.Write(val2); + writer.Write(val3); + }); + } + + #endregion + + #region IN operations + + /// <summary> + /// Perform in operation. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="action">Action.</param> + protected void DoInOp(int type, Action<IPortableStream> action) + { + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + UU.TargetOutStream(_target, type, stream.MemoryPointer); + + stream.SynchronizeInput(); + + action(stream); + } + } + + /// <summary> + /// Perform in operation. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="action">Action.</param> + /// <returns>Result.</returns> + protected T DoInOp<T>(int type, Func<IPortableStream, T> action) + { + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + UU.TargetOutStream(_target, type, stream.MemoryPointer); + + stream.SynchronizeInput(); + + return action(stream); + } + } + + /// <summary> + /// Perform simple in operation returning immediate result. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Result.</returns> + protected T DoInOp<T>(int type) + { + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + UU.TargetOutStream(_target, type, stream.MemoryPointer); + + stream.SynchronizeInput(); + + return Unmarshal<T>(stream); + } + } + + #endregion + + #region OUT-IN operations + + /// <summary> + /// Perform out-in operation. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="outAction">Out action.</param> + /// <param name="inAction">In action.</param> + protected void DoOutInOp(int type, Action<PortableWriterImpl> outAction, Action<IPortableStream> inAction) + { + using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().Stream()) + { + using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().Stream()) + { + PortableWriterImpl writer = _marsh.StartMarshal(outStream); + + outAction(writer); + + FinishMarshal(writer); + + UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + inAction(inStream); + } + } + } + + /// <summary> + /// Perform out-in operation. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="outAction">Out action.</param> + /// <param name="inAction">In action.</param> + /// <returns>Result.</returns> + protected TR DoOutInOp<TR>(int type, Action<PortableWriterImpl> outAction, Func<IPortableStream, TR> inAction) + { + using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().Stream()) + { + using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().Stream()) + { + PortableWriterImpl writer = _marsh.StartMarshal(outStream); + + outAction(writer); + + FinishMarshal(writer); + + UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + return inAction(inStream); + } + } + } + + /// <summary> + /// Perform out-in operation. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="outAction">Out action.</param> + /// <param name="inAction">In action.</param> + /// <param name="arg">Argument.</param> + /// <returns>Result.</returns> + protected unsafe TR DoOutInOp<TR>(int type, Action<PortableWriterImpl> outAction, Func<IPortableStream, TR> inAction, void* arg) + { + using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().Stream()) + { + using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().Stream()) + { + PortableWriterImpl writer = _marsh.StartMarshal(outStream); + + outAction(writer); + + FinishMarshal(writer); + + UU.TargetInObjectStreamOutStream(_target, type, arg, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + return inAction(inStream); + } + } + } + + /// <summary> + /// Perform out-in operation. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="outAction">Out action.</param> + /// <returns>Result.</returns> + protected TR DoOutInOp<TR>(int type, Action<PortableWriterImpl> outAction) + { + using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().Stream()) + { + using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().Stream()) + { + PortableWriterImpl writer = _marsh.StartMarshal(outStream); + + outAction(writer); + + FinishMarshal(writer); + + UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + return Unmarshal<TR>(inStream); + } + } + } + + /// <summary> + /// Perform simple out-in operation accepting single argument. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="val">Value.</param> + /// <returns>Result.</returns> + protected TR DoOutInOp<T1, TR>(int type, T1 val) + { + using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().Stream()) + { + using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().Stream()) + { + PortableWriterImpl writer = _marsh.StartMarshal(outStream); + + writer.WriteObject(val); + + FinishMarshal(writer); + + UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + return Unmarshal<TR>(inStream); + } + } + } + + /// <summary> + /// Perform simple out-in operation accepting two arguments. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="val1">Value.</param> + /// <param name="val2">Value.</param> + /// <returns>Result.</returns> + protected TR DoOutInOp<T1, T2, TR>(int type, T1 val1, T2 val2) + { + using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().Stream()) + { + using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().Stream()) + { + PortableWriterImpl writer = _marsh.StartMarshal(outStream); + + writer.WriteObject(val1); + writer.WriteObject(val2); + + FinishMarshal(writer); + + UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + return Unmarshal<TR>(inStream); + } + } + } + + #endregion + + #region Miscelanneous + + /// <summary> + /// Finish marshaling. + /// </summary> + /// <param name="writer">Portable writer.</param> + internal void FinishMarshal(PortableWriterImpl writer) + { + _marsh.FinishMarshal(writer); + } + + /// <summary> + /// Put metadata to Grid. + /// </summary> + /// <param name="metas">Metadatas.</param> + internal void PutMetadata(IDictionary<int, IPortableMetadata> metas) + { + DoOutOp(OpMeta, stream => + { + PortableWriterImpl metaWriter = _marsh.StartMarshal(stream); + + metaWriter.WriteInt(metas.Count); + + foreach (var meta in metas.Values) + { + PortableMetadataImpl meta0 = (PortableMetadataImpl)meta; + + metaWriter.WriteInt(meta0.TypeId); + metaWriter.WriteString(meta0.TypeName); + metaWriter.WriteString(meta0.AffinityKeyFieldName); + + IDictionary<string, int> fields = meta0.FieldsMap(); + + metaWriter.WriteInt(fields.Count); + + foreach (var field in fields) + { + metaWriter.WriteString(field.Key); + metaWriter.WriteInt(field.Value); + } + } + + _marsh.FinishMarshal(metaWriter); + }); + + _marsh.OnMetadataSent(metas); + } + + /// <summary> + /// Unmarshal object using the given stream. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Unmarshalled object.</returns> + protected virtual T Unmarshal<T>(IPortableStream stream) + { + return _marsh.Unmarshal<T>(stream); + } + + /// <summary> + /// Creates a future and starts listening. + /// </summary> + /// <typeparam name="T">Future result type</typeparam> + /// <param name="listenAction">The listen action.</param> + /// <param name="keepPortable">Keep portable flag, only applicable to object futures. False by default.</param> + /// <param name="convertFunc">The function to read future result from stream.</param> + /// <returns>Created future.</returns> + protected IFuture<T> GetFuture<T>(Action<long, int> listenAction, bool keepPortable = false, + Func<PortableReaderImpl, T> convertFunc = null) + { + var futType = FutureType.Object; + + var type = typeof(T); + + if (type.IsPrimitive) + IgniteFutureTypeMap.TryGetValue(type, out futType); + + var fut = convertFunc == null && futType != FutureType.Object + ? new Future<T>() + : new Future<T>(new FutureConverter<T>(_marsh, keepPortable, convertFunc)); + + var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); + + listenAction(futHnd, (int)futType); + + return fut; + } + + #endregion + } + + /// <summary> + /// PlatformTarget with IDisposable pattern. + /// </summary> + internal abstract class PlatformDisposableTarget : PlatformTarget, IDisposable + { + /** Disposed flag. */ + private volatile bool _disposed; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + protected PlatformDisposableTarget(IUnmanagedTarget target, PortableMarshaller marsh) : base(target, marsh) + { + // No-op. + } + + /** <inheritdoc /> */ + public void Dispose() + { + lock (this) + { + if (_disposed) + return; + + Dispose(true); + + GC.SuppressFinalize(this); + + _disposed = true; + } + } + + /// <summary> + /// Releases unmanaged and - optionally - managed resources. + /// </summary> + /// <param name="disposing"> + /// <c>true</c> when called from Dispose; <c>false</c> when called from finalizer. + /// </param> + protected virtual void Dispose(bool disposing) + { + Target.Dispose(); + } + + /// <summary> + /// Throws <see cref="ObjectDisposedException"/> if this instance has been disposed. + /// </summary> + protected void ThrowIfDisposed() + { + if (_disposed) + throw new ObjectDisposedException(GetType().Name, "Object has been disposed."); + } + + /// <summary> + /// Gets a value indicating whether this instance is disposed. + /// </summary> + protected bool IsDisposed + { + get { return _disposed; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableSystemTypeSerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableSystemTypeSerializer.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableSystemTypeSerializer.cs new file mode 100644 index 0000000..3fee3ca --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableSystemTypeSerializer.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Serializer for system types that can create instances directly from a stream and does not support handles. + /// </summary> + internal interface IPortableSystemTypeSerializer : IPortableSerializer + { + /// <summary> + /// Reads the instance from a reader. + /// </summary> + /// <param name="reader">The reader.</param> + /// <returns>Deserialized instance.</returns> + object ReadInstance(PortableReaderImpl reader); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs new file mode 100644 index 0000000..4a4f0dc --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Type descriptor. + /// </summary> + internal interface IPortableTypeDescriptor + { + /// <summary> + /// Type. + /// </summary> + Type Type + { + get; + } + + /// <summary> + /// Type ID. + /// </summary> + int TypeId + { + get; + } + + /// <summary> + /// Type name. + /// </summary> + string TypeName + { + get; + } + + /// <summary> + /// User type flag. + /// </summary> + bool UserType + { + get; + } + + /// <summary> + /// Metadata enabled flag. + /// </summary> + bool MetadataEnabled + { + get; + } + + /// <summary> + /// Whether to cache deserialized value in IPortableObject + /// </summary> + bool KeepDeserialized + { + get; + } + + /// <summary> + /// Name converter. + /// </summary> + IPortableNameMapper NameConverter + { + get; + } + + /// <summary> + /// Mapper. + /// </summary> + IPortableIdMapper Mapper + { + get; + } + + /// <summary> + /// Serializer. + /// </summary> + IPortableSerializer Serializer + { + get; + } + + /// <summary> + /// Affinity key field name. + /// </summary> + string AffinityKeyFieldName + { + get; + } + + /// <summary> + /// Typed handler. + /// </summary> + object TypedHandler + { + get; + } + + /// <summary> + /// Untyped handler. + /// </summary> + PortableSystemWriteDelegate UntypedHandler + { + get; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableWriteAware.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableWriteAware.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableWriteAware.cs new file mode 100644 index 0000000..d3c1521 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableWriteAware.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Represents an object that can write itself to a portable writer. + /// </summary> + internal interface IPortableWriteAware + { + /// <summary> + /// Writes this object to the given writer. + /// </summary> + /// <param name="writer">Writer.</param> + /// <exception cref="System.IO.IOException">If write failed.</exception> + void WritePortable(IPortableWriter writer); + } +} \ No newline at end of file
