http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs new file mode 100644 index 0000000..888445a --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cache.Affinity.Fair; + using Apache.Ignite.Core.Cache.Affinity.Rendezvous; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Memory; + + /// <summary> + /// Affinity function read/write methods. + /// </summary> + internal static class AffinityFunctionSerializer + { + /** */ + private const byte TypeCodeNull = 0; + + /** */ + private const byte TypeCodeFair = 1; + + /** */ + private const byte TypeCodeRendezvous = 2; + + /** */ + private const byte TypeCodeUser = 3; + + /// <summary> + /// Writes the instance. + /// </summary> + internal static void Write(IBinaryRawWriter writer, IAffinityFunction fun, object userFuncOverride = null) + { + Debug.Assert(writer != null); + + if (fun == null) + { + writer.WriteByte(TypeCodeNull); + return; + } + + // 1) Type code + // 2) Partitions + // 3) ExcludeNeighbors + // 4) Override flags + // 5) User object + + var p = fun as AffinityFunctionBase; + + if (p != null) + { + writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous); + writer.WriteInt(p.Partitions); + writer.WriteBoolean(p.ExcludeNeighbors); + + var overrideFlags = GetOverrideFlags(p.GetType()); + writer.WriteByte((byte) overrideFlags); + + // Do not write user func if there is nothing overridden + WriteUserFunc(writer, overrideFlags != UserOverrides.None ? fun : null, userFuncOverride); + } + else + { + writer.WriteByte(TypeCodeUser); + writer.WriteInt(fun.Partitions); + writer.WriteBoolean(false); // Exclude neighbors + writer.WriteByte((byte) UserOverrides.All); + WriteUserFunc(writer, fun, userFuncOverride); + } + } + + /// <summary> + /// Reads the instance. + /// </summary> + internal static IAffinityFunction Read(IBinaryRawReader reader) + { + Debug.Assert(reader != null); + + var typeCode = reader.ReadByte(); + + if (typeCode == TypeCodeNull) + return null; + + var partitions = reader.ReadInt(); + var exclNeighbors = reader.ReadBoolean(); + var overrideFlags = (UserOverrides)reader.ReadByte(); + var userFunc = reader.ReadObjectEx<IAffinityFunction>(); + + if (userFunc != null) + { + Debug.Assert(overrideFlags != UserOverrides.None); + + var fair = userFunc as FairAffinityFunction; + if (fair != null) + { + fair.Partitions = partitions; + fair.ExcludeNeighbors = exclNeighbors; + } + + var rendezvous = userFunc as RendezvousAffinityFunction; + if (rendezvous != null) + { + rendezvous.Partitions = partitions; + rendezvous.ExcludeNeighbors = exclNeighbors; + } + + return userFunc; + } + + Debug.Assert(overrideFlags == UserOverrides.None); + AffinityFunctionBase fun; + + switch (typeCode) + { + case TypeCodeFair: + fun = new FairAffinityFunction(); + break; + case TypeCodeRendezvous: + fun = new RendezvousAffinityFunction(); + break; + default: + throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode); + } + + fun.Partitions = partitions; + fun.ExcludeNeighbors = exclNeighbors; + + return fun; + } + + + /// <summary> + /// Writes the partitions assignment to a stream. + /// </summary> + /// <param name="parts">The parts.</param> + /// <param name="stream">The stream.</param> + /// <param name="marsh">The marshaller.</param> + internal static void WritePartitions(IEnumerable<IEnumerable<IClusterNode>> parts, + PlatformMemoryStream stream, Marshaller marsh) + { + Debug.Assert(parts != null); + Debug.Assert(stream != null); + Debug.Assert(marsh != null); + + IBinaryRawWriter writer = marsh.StartMarshal(stream); + + var partCnt = 0; + writer.WriteInt(partCnt); // reserve size + + foreach (var part in parts) + { + if (part == null) + throw new IgniteException("IAffinityFunction.AssignPartitions() returned invalid partition: null"); + + partCnt++; + + var nodeCnt = 0; + var cntPos = stream.Position; + writer.WriteInt(nodeCnt); // reserve size + + foreach (var node in part) + { + nodeCnt++; + writer.WriteGuid(node.Id); + } + + var endPos = stream.Position; + stream.Seek(cntPos, SeekOrigin.Begin); + stream.WriteInt(nodeCnt); + stream.Seek(endPos, SeekOrigin.Begin); + } + + stream.SynchronizeOutput(); + stream.Seek(0, SeekOrigin.Begin); + writer.WriteInt(partCnt); + } + + /// <summary> + /// Reads the partitions assignment from a stream. + /// </summary> + /// <param name="stream">The stream.</param> + /// <param name="marsh">The marshaller.</param> + /// <returns>Partitions assignment.</returns> + internal static IEnumerable<IEnumerable<IClusterNode>> ReadPartitions(IBinaryStream stream, Marshaller marsh) + { + Debug.Assert(stream != null); + Debug.Assert(marsh != null); + + IBinaryRawReader reader = marsh.StartUnmarshal(stream); + + var partCnt = reader.ReadInt(); + + var res = new List<IEnumerable<IClusterNode>>(partCnt); + + for (var i = 0; i < partCnt; i++) + res.Add(IgniteUtils.ReadNodes(reader)); + + return res; + } + + /// <summary> + /// Gets the override flags. + /// </summary> + private static UserOverrides GetOverrideFlags(Type funcType) + { + var res = UserOverrides.None; + + var methods = new[] {UserOverrides.GetPartition, UserOverrides.AssignPartitions, UserOverrides.RemoveNode}; + + var map = funcType.GetInterfaceMap(typeof(IAffinityFunction)); + + foreach (var method in methods) + { + // Find whether user type overrides IAffinityFunction method from AffinityFunctionBase. + var methodName = method.ToString(); + + if (map.TargetMethods.Single(x => x.Name == methodName).DeclaringType != typeof(AffinityFunctionBase)) + res |= method; + } + + return res; + } + + /// <summary> + /// Writes the user function. + /// </summary> + private static void WriteUserFunc(IBinaryRawWriter writer, IAffinityFunction func, object funcOverride) + { + if (funcOverride != null) + { + writer.WriteObject(funcOverride); + return; + } + + if (func != null && !func.GetType().IsSerializable) + throw new IgniteException("AffinityFunction should be serializable."); + + writer.WriteObject(func); + } + + /// <summary> + /// Overridden function flags. + /// </summary> + [Flags] + private enum UserOverrides : byte + { + None = 0, + GetPartition = 1, + RemoveNode = 1 << 1, + AssignPartitions = 1 << 2, + All = GetPartition | RemoveNode | AssignPartitions + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs new file mode 100644 index 0000000..d335804 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// <summary> + /// Affinity function that delegates to Java. + /// </summary> + internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction + { + /** Opcodes. */ + private enum Op + { + Partition = 1, + RemoveNode = 2, + AssignPartitions = 3 + } + + /// <summary> + /// Initializes a new instance of the <see cref="PlatformAffinityFunction"/> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) + { + // No-op. + } + + /** <inheritdoc /> */ + public int Partitions + { + get { throw new NotSupportedException("PlatformAffinityFunction.Partitions is not supported."); } + } + + /** <inheritdoc /> */ + public int GetPartition(object key) + { + return (int) DoOutOp((int) Op.Partition, w => w.WriteObject(key)); + } + + /** <inheritdoc /> */ + public void RemoveNode(Guid nodeId) + { + DoOutOp((int) Op.RemoveNode, w => w.WriteGuid(nodeId)); + } + + /** <inheritdoc /> */ + public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context) + { + return DoInOp((int) Op.AssignPartitions, s => AffinityFunctionSerializer.ReadPartitions(s, Marshaller)); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs new file mode 100644 index 0000000..407fe0c --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Binary; + + /// <summary> + /// Holds the information to instantiate an object and set its properties. + /// Typically used for .NET objects defined in Spring XML. + /// </summary> + internal class ObjectInfoHolder : IBinaryWriteAware + { + /** Type name. */ + private readonly string _typeName; + + /** Properties. */ + private readonly Dictionary<string, object> _properties; + + /// <summary> + /// Initializes a new instance of the <see cref="ObjectInfoHolder"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ObjectInfoHolder(IBinaryRawReader reader) + { + Debug.Assert(reader != null); + + _typeName = reader.ReadString(); + _properties = reader.ReadDictionaryAsGeneric<string, object>(); + + Debug.Assert(!string.IsNullOrEmpty(_typeName)); + } + + /// <summary> + /// Gets the name of the type. + /// </summary> + public string TypeName + { + get { return _typeName; } + } + + /// <summary> + /// Gets the properties. + /// </summary> + public Dictionary<string, object> Properties + { + get { return _properties; } + } + + /// <summary> + /// Creates an instance according to type name and properties. + /// </summary> + public T CreateInstance<T>() + { + return IgniteUtils.CreateInstance<T>(TypeName, Properties); + } + + /** <inheritdoc /> */ + public void WriteBinary(IBinaryWriter writer) + { + Debug.Assert(writer != null); + + var w = writer.GetRawWriter(); + + w.WriteString(_typeName); + w.WriteDictionary(_properties); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs index 94f6166..fdadb1c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Impl using System; using System.Collections.Generic; using System.ComponentModel; + using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.IO; @@ -478,5 +479,25 @@ namespace Apache.Ignite.Core.Impl return res; } + + /// <summary> + /// Writes the node collection to a stream. + /// </summary> + /// <param name="writer">The writer.</param> + /// <param name="nodes">The nodes.</param> + public static void WriteNodes(IBinaryRawWriter writer, ICollection<IClusterNode> nodes) + { + Debug.Assert(writer != null); + + if (nodes != null) + { + writer.WriteInt(nodes.Count); + + foreach (var node in nodes) + writer.WriteGuid(node.Id); + } + else + writer.WriteInt(-1); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index 176d3b4..ef901cb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -21,7 +21,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; - using System.IO; using System.Runtime.InteropServices; using System.Threading; using Apache.Ignite.Core.Cache.Affinity; @@ -30,6 +29,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Cache; + using Apache.Ignite.Core.Impl.Cache.Affinity; using Apache.Ignite.Core.Impl.Cache.Query.Continuous; using Apache.Ignite.Core.Impl.Cache.Store; using Apache.Ignite.Core.Impl.Common; @@ -166,7 +166,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged private delegate void OnClientDisconnectedDelegate(void* target); private delegate void OnClientReconnectedDelegate(void* target, bool clusterRestarted); - private delegate long AffinityFunctionInitDelegate(void* target, long memPtr); + private delegate long AffinityFunctionInitDelegate(void* target, long memPtr, void* baseFunc); private delegate int AffinityFunctionPartitionDelegate(void* target, long ptr, long memPtr); private delegate void AffinityFunctionAssignPartitionsDelegate(void* target, long ptr, long inMemPtr, long outMemPtr); private delegate void AffinityFunctionRemoveNodeDelegate(void* target, long ptr, long memPtr); @@ -1109,21 +1109,23 @@ namespace Apache.Ignite.Core.Impl.Unmanaged #region AffinityFunction - private long AffinityFunctionInit(void* target, long memPtr) + private long AffinityFunctionInit(void* target, long memPtr, void* baseFunc) { return SafeCall(() => { using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { var reader = _ignite.Marshaller.StartUnmarshal(stream); + + var func = reader.ReadObjectEx<IAffinityFunction>(); - var funcOrTypeName = reader.ReadObject<object>(); + ResourceProcessor.Inject(func, _ignite); - var func = funcOrTypeName as IAffinityFunction - ?? IgniteUtils.CreateInstance<IAffinityFunction>((string) funcOrTypeName, - reader.ReadDictionaryAsGeneric<string, object>()); + var affBase = func as AffinityFunctionBase; - ResourceProcessor.Inject(func, _ignite); + if (affBase != null) + affBase.SetBaseFunction(new PlatformAffinityFunction( + _ignite.InteropProcessor.ChangeTarget(baseFunc), _ignite.Marshaller)); return _handleRegistry.Allocate(func); } @@ -1158,38 +1160,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream()) { - var writer = _ignite.Marshaller.StartMarshal(outStream); - - var partCnt = 0; - writer.WriteInt(partCnt); // reserve size - - foreach (var part in parts) - { - if (part == null) - throw new IgniteException(func.GetType() + - ".AssignPartitions() returned invalid partition: null"); - - partCnt++; - - var nodeCnt = 0; - var cntPos = outStream.Position; - writer.WriteInt(nodeCnt); // reserve size - - foreach (var node in part) - { - nodeCnt++; - writer.WriteGuid(node.Id); - } - - var endPos = outStream.Position; - outStream.Seek(cntPos, SeekOrigin.Begin); - outStream.WriteInt(nodeCnt); - outStream.Seek(endPos, SeekOrigin.Begin); - } - - outStream.SynchronizeOutput(); - outStream.Seek(0, SeekOrigin.Begin); - writer.WriteInt(partCnt); + AffinityFunctionSerializer.WritePartitions(parts, outStream, _ignite.Marshaller); } } });
